6060import org .apache .beam .sdk .extensions .avro .coders .AvroCoder ;
6161import org .apache .beam .sdk .io .parquet .ParquetIO ;
6262import org .apache .beam .sdk .testing .TestPipeline ;
63+ import org .apache .beam .sdk .testing .TestPipelineExtension ;
6364import org .apache .beam .sdk .transforms .Create ;
6465import org .apache .beam .sdk .transforms .DoFn ;
6566import org .apache .beam .sdk .transforms .GroupIntoBatches ;
7071import org .apache .beam .sdk .transforms .WithKeys ;
7172import org .apache .beam .sdk .values .KV ;
7273import org .apache .beam .sdk .values .PCollection ;
73- import org .checkerframework .checker .nullness .qual .NonNull ;
74- import org .junit .ClassRule ;
75- import org .junit .Rule ;
76- import org .junit .Test ;
74+ import org .junit .jupiter .api .Test ;
75+ import org .junit .jupiter .api .extension .ExtendWith ;
7776import org .neo4j .cypherdsl .core .Cypher ;
7877import org .neo4j .cypherdsl .core .Expression ;
7978import org .neo4j .cypherdsl .core .MapExpression ;
102101import org .neo4j .importer .v1 .sources .Source ;
103102import org .neo4j .importer .v1 .sources .SourceProvider ;
104103import org .neo4j .importer .v1 .targets .PropertyType ;
105- import org .testcontainers .containers .Neo4jContainer ;
104+ import org .testcontainers .junit .jupiter .Container ;
105+ import org .testcontainers .junit .jupiter .Testcontainers ;
106+ import org .testcontainers .neo4j .Neo4jContainer ;
106107import org .testcontainers .utility .DockerImageName ;
107108
109+ @ ExtendWith (TestPipelineExtension .class )
110+ @ Testcontainers
108111public class BeamExampleIT {
109112
110- @ Rule
111- public final TestPipeline pipeline = TestPipeline .create ();
112-
113- @ ClassRule
114- public static Neo4jContainer <?> NEO4J = new Neo4jContainer <>(DockerImageName .parse ("neo4j:5-enterprise" ))
113+ @ Container
114+ public static Neo4jContainer NEO4J = new Neo4jContainer (DockerImageName .parse ("neo4j:5-enterprise" ))
115115 .withEnv ("NEO4J_ACCEPT_LICENSE_AGREEMENT" , "yes" )
116116 .withAdminPassword ("letmein!" );
117117
118118 @ Test
119- public void imports_dvd_rental_data_set () throws Exception {
119+ public void imports_dvd_rental_data_set (TestPipeline pipeline ) throws Exception {
120120 try (InputStream stream = this .getClass ().getResourceAsStream ("/specs/dvd_rental.yaml" )) {
121121 assertThat (stream ).isNotNull ();
122122
@@ -125,9 +125,9 @@ public void imports_dvd_rental_data_set() throws Exception {
125125 var importPipeline = ImportPipeline .of (ImportSpecificationDeserializer .deserialize (reader ));
126126 importPipeline .forEach (step -> {
127127 switch (step ) {
128- case SourceStep source -> handleSource (source , outputs );
129- case ActionStep action -> handleAction (action , outputs );
130- case TargetStep target -> handleTarget (target , outputs );
128+ case SourceStep source -> handleSource (source , pipeline , outputs );
129+ case ActionStep action -> handleAction (action , pipeline , outputs );
130+ case TargetStep target -> handleTarget (target , pipeline , outputs );
131131 default -> throw new IllegalStateException ("Unexpected value: " + step );
132132 }
133133 });
@@ -152,7 +152,7 @@ public void imports_dvd_rental_data_set() throws Exception {
152152 }
153153 }
154154
155- private void handleSource (SourceStep step , Map <String , PCollection <?>> outputs ) {
155+ private void handleSource (SourceStep step , TestPipeline pipeline , Map <String , PCollection <?>> outputs ) {
156156 var name = step .name ();
157157 var source = step .source ();
158158 assertThat (source ).isInstanceOf (ParquetSource .class );
@@ -165,7 +165,7 @@ private void handleSource(SourceStep step, Map<String, PCollection<?>> outputs)
165165 outputs .put (source .getName (), output );
166166 }
167167
168- private void handleAction (ActionStep step , Map <String , PCollection <?>> outputs ) {
168+ private void handleAction (ActionStep step , TestPipeline pipeline , Map <String , PCollection <?>> outputs ) {
169169 var actionName = step .name ();
170170 var action = step .action ();
171171 assertThat (action ).isInstanceOf (CypherAction .class );
@@ -183,7 +183,7 @@ private void handleAction(ActionStep step, Map<String, PCollection<?>> outputs)
183183 }
184184
185185 @ SuppressWarnings ("unchecked" )
186- private void handleTarget (TargetStep step , Map <String , PCollection <?>> outputs ) {
186+ private void handleTarget (TargetStep step , TestPipeline pipeline , Map <String , PCollection <?>> outputs ) {
187187 var stepName = step .name ();
188188 assertThat (step ).isInstanceOf (EntityTargetStep .class );
189189 var entityTargetStep = (EntityTargetStep ) step ;
@@ -351,8 +351,7 @@ public String getName() {
351351 }
352352 }
353353
354- private static class TargetSchemaIO
355- extends PTransform <@ NonNull PCollection <Integer >, @ NonNull PCollection <WriteCounters >> {
354+ private static class TargetSchemaIO extends PTransform <PCollection <Integer >, PCollection <WriteCounters >> {
356355
357356 private final String url ;
358357
@@ -366,13 +365,13 @@ private TargetSchemaIO(String url, String password, EntityTargetStep target) {
366365 this .target = target ;
367366 }
368367
369- public static PTransform <@ NonNull PCollection <Integer >, @ NonNull PCollection <WriteCounters >> initSchema (
368+ public static PTransform <PCollection <Integer >, PCollection <WriteCounters >> initSchema (
370369 String url , String password , EntityTargetStep target ) {
371370 return new TargetSchemaIO (url , password , target );
372371 }
373372
374373 @ Override
375- public @ NonNull PCollection <WriteCounters > expand (@ NonNull PCollection <Integer > input ) {
374+ public PCollection <WriteCounters > expand (PCollection <Integer > input ) {
376375 return input .apply (ParDo .of (TargetSchemaWriteFn .of (url , password , target )));
377376 }
378377
@@ -556,8 +555,7 @@ private static String propertyType(PropertyType propertyType) {
556555 }
557556
558557 private static class TargetIO
559- extends PTransform <
560- @ NonNull PCollection <KV <Integer , Iterable <GenericRecord >>>, @ NonNull PCollection <WriteCounters >> {
558+ extends PTransform <PCollection <KV <Integer , Iterable <GenericRecord >>>, PCollection <WriteCounters >> {
561559
562560 private final String url ;
563561
@@ -571,14 +569,13 @@ private TargetIO(String url, String password, EntityTargetStep target) {
571569 this .target = target ;
572570 }
573571
574- public static PTransform <
575- @ NonNull PCollection <KV <Integer , Iterable <GenericRecord >>>, @ NonNull PCollection <WriteCounters >>
572+ public static PTransform <PCollection <KV <Integer , Iterable <GenericRecord >>>, PCollection <WriteCounters >>
576573 writeAll (String boltUrl , String adminPassword , EntityTargetStep target ) {
577574 return new TargetIO (boltUrl , adminPassword , target );
578575 }
579576
580577 @ Override
581- public @ NonNull PCollection <WriteCounters > expand (PCollection <KV <Integer , Iterable <GenericRecord >>> input ) {
578+ public PCollection <WriteCounters > expand (PCollection <KV <Integer , Iterable <GenericRecord >>> input ) {
582579 return input .apply (ParDo .of (TargetWriteFn .of (url , password , target )));
583580 }
584581
@@ -778,7 +775,7 @@ public static Coder<GenericRecord> create() {
778775 }
779776
780777 @ Override
781- public void encode (GenericRecord value , @ NonNull OutputStream outStream ) throws IOException {
778+ public void encode (GenericRecord value , OutputStream outStream ) throws IOException {
782779 assertThat (value ).isNotNull ();
783780 var schema = value .getSchema ();
784781 String schemaString = schema .toString ();
@@ -790,7 +787,7 @@ public void encode(GenericRecord value, @NonNull OutputStream outStream) throws
790787 }
791788
792789 @ Override
793- public GenericRecord decode (@ NonNull InputStream inStream ) throws IOException {
790+ public GenericRecord decode (InputStream inStream ) throws IOException {
794791 String schemaString = StringUtf8Coder .of ().decode (inStream );
795792 String schemaHash = StringUtf8Coder .of ().decode (inStream );
796793 AvroCoder <GenericRecord > coder =
@@ -1009,8 +1006,7 @@ public String toString() {
10091006 }
10101007 }
10111008
1012- private static class CypherActionIO
1013- extends PTransform <@ NonNull PCollection <Integer >, @ NonNull PCollection <Integer >> {
1009+ private static class CypherActionIO extends PTransform <PCollection <Integer >, PCollection <Integer >> {
10141010
10151011 private final CypherAction action ;
10161012
@@ -1024,13 +1020,13 @@ private CypherActionIO(CypherAction action, String url, String password) {
10241020 this .password = password ;
10251021 }
10261022
1027- public static PTransform <@ NonNull PCollection <Integer >, @ NonNull PCollection <Integer >> run (
1023+ public static PTransform <PCollection <Integer >, PCollection <Integer >> run (
10281024 CypherAction action , String url , String password ) {
10291025 return new CypherActionIO (action , url , password );
10301026 }
10311027
10321028 @ Override
1033- public @ NonNull PCollection <Integer > expand (@ NonNull PCollection <Integer > input ) {
1029+ public PCollection <Integer > expand (PCollection <Integer > input ) {
10341030 return input .apply (ParDo .of (new CypherActionFn (action , url , password )));
10351031 }
10361032 }
0 commit comments