Skip to content

Commit dd81e0c

Browse files
committed
NIFI-8269 - Add support for schema inference in ForkRecord processor when extracting array records
1 parent 64fa7c5 commit dd81e0c

File tree

7 files changed

+630
-63
lines changed

7 files changed

+630
-63
lines changed

nifi-extension-bundles/nifi-standard-bundle/nifi-standard-processors/pom.xml

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -624,6 +624,10 @@
624624
<exclude>src/test/resources/TestExtractGrok/simple_text.log</exclude>
625625
<exclude>src/test/resources/TestExtractRecordSchema/name_age_schema.avsc</exclude>
626626
<exclude>src/test/resources/TestForkRecord/input/complex-input-json.json</exclude>
627+
<exclude>src/test/resources/TestForkRecord/input/complex-input-json-for-inference.json</exclude>
628+
<exclude>src/test/resources/TestForkRecord/output/extract-address-without-parents.json</exclude>
629+
<exclude>src/test/resources/TestForkRecord/output/extract-address-with-parents.json</exclude>
630+
<exclude>src/test/resources/TestForkRecord/output/extract-bank-accounts-with-parents.json</exclude>
627631
<exclude>src/test/resources/TestForkRecord/output/extract-transactions.json</exclude>
628632
<exclude>src/test/resources/TestForkRecord/output/split-address.json</exclude>
629633
<exclude>src/test/resources/TestForkRecord/output/split-transactions.json</exclude>

nifi-extension-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ForkRecord.java

Lines changed: 170 additions & 63 deletions
Large diffs are not rendered by default.

nifi-extension-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestForkRecord.java

Lines changed: 90 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -451,6 +451,96 @@ public void testExtractMode() throws InitializationException, IOException {
451451
runner.getFlowFilesForRelationship(ForkRecord.REL_FORK).get(0).assertAttributeEquals("record.count", "6");
452452
}
453453

454+
@Test
455+
public void testExtractWithParentFieldsAndInferredSchema() throws Exception {
456+
TestRunner runner = TestRunners.newTestRunner(new ForkRecord());
457+
458+
final JsonTreeReader jsonReader = new JsonTreeReader();
459+
runner.addControllerService("record-reader", jsonReader);
460+
runner.enableControllerService(jsonReader);
461+
462+
final JsonRecordSetWriter jsonWriter = new JsonRecordSetWriter();
463+
runner.addControllerService("record-writer", jsonWriter);
464+
runner.setProperty(jsonWriter, "Pretty Print JSON", "true");
465+
runner.enableControllerService(jsonWriter);
466+
467+
runner.setProperty(ForkRecord.RECORD_READER, "record-reader");
468+
runner.setProperty(ForkRecord.RECORD_WRITER, "record-writer");
469+
runner.setProperty(ForkRecord.MODE, ForkRecord.MODE_EXTRACT);
470+
runner.setProperty(ForkRecord.INCLUDE_PARENT_FIELDS, "true");
471+
runner.setProperty("bankAccounts", "/bankAccounts");
472+
473+
runner.enqueue(Paths.get("src/test/resources/TestForkRecord/input/complex-input-json-for-inference.json"));
474+
runner.run();
475+
476+
runner.assertTransferCount(ForkRecord.REL_ORIGINAL, 1);
477+
runner.assertTransferCount(ForkRecord.REL_FORK, 1);
478+
479+
final String expectedOutput = new String(Files.readAllBytes(Paths.get("src/test/resources/TestForkRecord/output/extract-bank-accounts-with-parents.json")));
480+
runner.getFlowFilesForRelationship(ForkRecord.REL_FORK).get(0).assertAttributeEquals("record.count", "5");
481+
runner.getFlowFilesForRelationship(ForkRecord.REL_FORK).get(0).assertContentEquals(expectedOutput);
482+
}
483+
484+
@Test
485+
public void testExtractFieldsAndInferredSchema() throws Exception {
486+
TestRunner runner = TestRunners.newTestRunner(new ForkRecord());
487+
488+
final JsonTreeReader jsonReader = new JsonTreeReader();
489+
runner.addControllerService("record-reader", jsonReader);
490+
runner.enableControllerService(jsonReader);
491+
492+
final JsonRecordSetWriter jsonWriter = new JsonRecordSetWriter();
493+
runner.addControllerService("record-writer", jsonWriter);
494+
runner.setProperty(jsonWriter, "Pretty Print JSON", "true");
495+
runner.enableControllerService(jsonWriter);
496+
497+
runner.setProperty(ForkRecord.RECORD_READER, "record-reader");
498+
runner.setProperty(ForkRecord.RECORD_WRITER, "record-writer");
499+
runner.setProperty(ForkRecord.MODE, ForkRecord.MODE_EXTRACT);
500+
runner.setProperty(ForkRecord.INCLUDE_PARENT_FIELDS, "false");
501+
runner.setProperty("address", "/address");
502+
503+
runner.enqueue(Paths.get("src/test/resources/TestForkRecord/input/complex-input-json-for-inference.json"));
504+
runner.run();
505+
506+
runner.assertTransferCount(ForkRecord.REL_ORIGINAL, 1);
507+
runner.assertTransferCount(ForkRecord.REL_FORK, 1);
508+
509+
final String expectedOutput = new String(Files.readAllBytes(Paths.get("src/test/resources/TestForkRecord/output/extract-address-without-parents.json")));
510+
runner.getFlowFilesForRelationship(ForkRecord.REL_FORK).get(0).assertAttributeEquals("record.count", "5");
511+
runner.getFlowFilesForRelationship(ForkRecord.REL_FORK).get(0).assertContentEquals(expectedOutput);
512+
}
513+
514+
@Test
515+
public void testExtractFieldsWithParentsAndFieldConflictAndInferredSchema() throws Exception {
516+
TestRunner runner = TestRunners.newTestRunner(new ForkRecord());
517+
518+
final JsonTreeReader jsonReader = new JsonTreeReader();
519+
runner.addControllerService("record-reader", jsonReader);
520+
runner.enableControllerService(jsonReader);
521+
522+
final JsonRecordSetWriter jsonWriter = new JsonRecordSetWriter();
523+
runner.addControllerService("record-writer", jsonWriter);
524+
runner.setProperty(jsonWriter, "Pretty Print JSON", "true");
525+
runner.enableControllerService(jsonWriter);
526+
527+
runner.setProperty(ForkRecord.RECORD_READER, "record-reader");
528+
runner.setProperty(ForkRecord.RECORD_WRITER, "record-writer");
529+
runner.setProperty(ForkRecord.MODE, ForkRecord.MODE_EXTRACT);
530+
runner.setProperty(ForkRecord.INCLUDE_PARENT_FIELDS, "true");
531+
runner.setProperty("address", "/address");
532+
533+
runner.enqueue(Paths.get("src/test/resources/TestForkRecord/input/complex-input-json-for-inference.json"));
534+
runner.run();
535+
536+
runner.assertTransferCount(ForkRecord.REL_ORIGINAL, 1);
537+
runner.assertTransferCount(ForkRecord.REL_FORK, 1);
538+
539+
final String expectedOutput = new String(Files.readAllBytes(Paths.get("src/test/resources/TestForkRecord/output/extract-address-with-parents.json")));
540+
runner.getFlowFilesForRelationship(ForkRecord.REL_FORK).get(0).assertAttributeEquals("record.count", "5");
541+
runner.getFlowFilesForRelationship(ForkRecord.REL_FORK).get(0).assertContentEquals(expectedOutput);
542+
}
543+
454544
private class JsonRecordReader extends AbstractControllerService implements RecordReaderFactory {
455545

456546
private static final JsonParserFactory jsonParserFactory = new JsonParserFactory();
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,116 @@
1+
[
2+
{
3+
"id": 1,
4+
"name": {
5+
"last": "Doe",
6+
"first": "John"
7+
},
8+
"address": [
9+
{
10+
"id": "home",
11+
"street": "1 nifi street",
12+
"city": "nifi city"
13+
}
14+
],
15+
"bankAccounts": [
16+
{
17+
"bankID": "OneBank",
18+
"IBAN": "OneIBAN",
19+
"last5Transactions": [
20+
{
21+
"comment": "food",
22+
"amount": "-450"
23+
}
24+
]
25+
}
26+
]
27+
}, {
28+
"id": 2,
29+
"name": {
30+
"last": "Smith",
31+
"first": "John"
32+
},
33+
"address": [null],
34+
"bankAccounts": null
35+
}, {
36+
"id": 3,
37+
"name": {
38+
"last": "Smith",
39+
"first": "Jane"
40+
},
41+
"address": [
42+
{
43+
"id": "home",
44+
"street": "1 nifi street",
45+
"city": "nifi city"
46+
}, {
47+
"id": "work",
48+
"street": "1 nifi avenue",
49+
"city": "apache city"
50+
}
51+
],
52+
"bankAccounts": [
53+
{
54+
"bankID": "nifi bank",
55+
"IBAN": "myIBAN",
56+
"last5Transactions": null
57+
}, {
58+
"bankID": "apache bank",
59+
"IBAN": "myIBAN",
60+
"last5Transactions": [
61+
{
62+
"comment": "gas station",
63+
"amount": "-45"
64+
}, {
65+
"comment": "hair cut",
66+
"amount": "-19"
67+
}
68+
]
69+
}
70+
]
71+
}, {
72+
"id": 4,
73+
"name": {
74+
"last": "Clark",
75+
"first": "Jane"
76+
},
77+
"address": [
78+
{
79+
"id": "home",
80+
"street": "10 nifi street",
81+
"city": "nifi city"
82+
}, {
83+
"id": "work",
84+
"street": "10 nifi avenue",
85+
"city": "apache city"
86+
}
87+
],
88+
"bankAccounts": [
89+
{
90+
"bankID": "nifi bank",
91+
"IBAN": "myIBAN",
92+
"last5Transactions": [
93+
{
94+
"comment": "gift",
95+
"amount": "+100"
96+
}, {
97+
"comment": "flights",
98+
"amount": "-190"
99+
}
100+
]
101+
}, {
102+
"bankID": "apache bank",
103+
"IBAN": "myIBAN",
104+
"last5Transactions": [
105+
{
106+
"comment": "nifi tshirt",
107+
"amount": "0"
108+
}, {
109+
"comment": "theatre",
110+
"amount": "-19"
111+
}
112+
]
113+
}
114+
]
115+
}
116+
]
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,121 @@
1+
[ {
2+
"id" : "home",
3+
"street" : "1 nifi street",
4+
"city" : "nifi city",
5+
"name" : {
6+
"last" : "Doe",
7+
"first" : "John"
8+
},
9+
"bankAccounts" : [ {
10+
"bankID" : "OneBank",
11+
"IBAN" : "OneIBAN",
12+
"last5Transactions" : [ {
13+
"comment" : "food",
14+
"amount" : "-450"
15+
} ]
16+
} ]
17+
}, {
18+
"id" : "home",
19+
"street" : "1 nifi street",
20+
"city" : "nifi city",
21+
"name" : {
22+
"last" : "Smith",
23+
"first" : "Jane"
24+
},
25+
"bankAccounts" : [ {
26+
"bankID" : "nifi bank",
27+
"IBAN" : "myIBAN",
28+
"last5Transactions" : null
29+
}, {
30+
"bankID" : "apache bank",
31+
"IBAN" : "myIBAN",
32+
"last5Transactions" : [ {
33+
"comment" : "gas station",
34+
"amount" : "-45"
35+
}, {
36+
"comment" : "hair cut",
37+
"amount" : "-19"
38+
} ]
39+
} ]
40+
}, {
41+
"id" : "work",
42+
"street" : "1 nifi avenue",
43+
"city" : "apache city",
44+
"name" : {
45+
"last" : "Smith",
46+
"first" : "Jane"
47+
},
48+
"bankAccounts" : [ {
49+
"bankID" : "nifi bank",
50+
"IBAN" : "myIBAN",
51+
"last5Transactions" : null
52+
}, {
53+
"bankID" : "apache bank",
54+
"IBAN" : "myIBAN",
55+
"last5Transactions" : [ {
56+
"comment" : "gas station",
57+
"amount" : "-45"
58+
}, {
59+
"comment" : "hair cut",
60+
"amount" : "-19"
61+
} ]
62+
} ]
63+
}, {
64+
"id" : "home",
65+
"street" : "10 nifi street",
66+
"city" : "nifi city",
67+
"name" : {
68+
"last" : "Clark",
69+
"first" : "Jane"
70+
},
71+
"bankAccounts" : [ {
72+
"bankID" : "nifi bank",
73+
"IBAN" : "myIBAN",
74+
"last5Transactions" : [ {
75+
"comment" : "gift",
76+
"amount" : "+100"
77+
}, {
78+
"comment" : "flights",
79+
"amount" : "-190"
80+
} ]
81+
}, {
82+
"bankID" : "apache bank",
83+
"IBAN" : "myIBAN",
84+
"last5Transactions" : [ {
85+
"comment" : "nifi tshirt",
86+
"amount" : "0"
87+
}, {
88+
"comment" : "theatre",
89+
"amount" : "-19"
90+
} ]
91+
} ]
92+
}, {
93+
"id" : "work",
94+
"street" : "10 nifi avenue",
95+
"city" : "apache city",
96+
"name" : {
97+
"last" : "Clark",
98+
"first" : "Jane"
99+
},
100+
"bankAccounts" : [ {
101+
"bankID" : "nifi bank",
102+
"IBAN" : "myIBAN",
103+
"last5Transactions" : [ {
104+
"comment" : "gift",
105+
"amount" : "+100"
106+
}, {
107+
"comment" : "flights",
108+
"amount" : "-190"
109+
} ]
110+
}, {
111+
"bankID" : "apache bank",
112+
"IBAN" : "myIBAN",
113+
"last5Transactions" : [ {
114+
"comment" : "nifi tshirt",
115+
"amount" : "0"
116+
}, {
117+
"comment" : "theatre",
118+
"amount" : "-19"
119+
} ]
120+
} ]
121+
} ]
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,21 @@
1+
[ {
2+
"id" : "home",
3+
"street" : "1 nifi street",
4+
"city" : "nifi city"
5+
}, {
6+
"id" : "home",
7+
"street" : "1 nifi street",
8+
"city" : "nifi city"
9+
}, {
10+
"id" : "work",
11+
"street" : "1 nifi avenue",
12+
"city" : "apache city"
13+
}, {
14+
"id" : "home",
15+
"street" : "10 nifi street",
16+
"city" : "nifi city"
17+
}, {
18+
"id" : "work",
19+
"street" : "10 nifi avenue",
20+
"city" : "apache city"
21+
} ]

0 commit comments

Comments
 (0)