4848import org .junit .jupiter .api .AfterAll ;
4949import org .junit .jupiter .api .Assertions ;
5050import org .junit .jupiter .api .BeforeAll ;
51- import org .junit .jupiter .api .BeforeEach ;
5251import org .junit .jupiter .api .Test ;
5352import org .junit .jupiter .api .io .TempDir ;
5453import org .junit .jupiter .params .ParameterizedTest ;
6362import org .apache .hudi .common .model .HoodieTableType ;
6463import org .apache .hudi .common .table .timeline .HoodieInstant ;
6564
65+ import org .apache .iceberg .Snapshot ;
6666import org .apache .iceberg .Table ;
6767import org .apache .iceberg .hadoop .HadoopTables ;
6868
7777import io .onetable .hudi .HudiSourceClientProvider ;
7878import io .onetable .hudi .HudiSourceConfig ;
7979import io .onetable .hudi .HudiTestUtil ;
80+ import io .onetable .iceberg .IcebergSourceClientProvider ;
8081import io .onetable .model .storage .TableFormat ;
8182import io .onetable .model .sync .SyncMode ;
8283
@@ -87,8 +88,6 @@ public class ITOneTableClient {
8788
8889 private static JavaSparkContext jsc ;
8990 private static SparkSession sparkSession ;
90- private SourceClientProvider <HoodieInstant > hudiSourceClientProvider ;
91- private SourceClientProvider <Long > deltaSourceClientProvider ;
9291
9392 @ BeforeAll
9493 public static void setupOnce () {
@@ -102,14 +101,6 @@ public static void setupOnce() {
102101 jsc = JavaSparkContext .fromSparkContext (sparkSession .sparkContext ());
103102 }
104103
105- @ BeforeEach
106- public void setup () {
107- hudiSourceClientProvider = new HudiSourceClientProvider ();
108- hudiSourceClientProvider .init (jsc .hadoopConfiguration (), Collections .emptyMap ());
109- deltaSourceClientProvider = new DeltaSourceClientProvider ();
110- deltaSourceClientProvider .init (jsc .hadoopConfiguration (), Collections .emptyMap ());
111- }
112-
113104 @ AfterAll
114105 public static void teardown () {
115106 if (jsc != null ) {
@@ -126,7 +117,8 @@ private static Stream<Arguments> testCasesWithPartitioningAndSyncModes() {
126117
127118 private static Stream <Arguments > generateTestParametersForFormatsSyncModesAndPartitioning () {
128119 List <Arguments > arguments = new ArrayList <>();
129- for (TableFormat sourceTableFormat : Arrays .asList (TableFormat .HUDI , TableFormat .DELTA )) {
120+ for (TableFormat sourceTableFormat :
121+ Arrays .asList (TableFormat .HUDI , TableFormat .DELTA , TableFormat .ICEBERG )) {
130122 for (SyncMode syncMode : SyncMode .values ()) {
131123 for (boolean isPartitioned : new boolean [] {true , false }) {
132124 arguments .add (Arguments .of (sourceTableFormat , syncMode , isPartitioned ));
@@ -142,9 +134,18 @@ private static Stream<Arguments> testCasesWithSyncModes() {
142134
143135 private SourceClientProvider <?> getSourceClientProvider (TableFormat sourceTableFormat ) {
144136 if (sourceTableFormat == TableFormat .HUDI ) {
137+ SourceClientProvider <HoodieInstant > hudiSourceClientProvider = new HudiSourceClientProvider ();
138+ hudiSourceClientProvider .init (jsc .hadoopConfiguration (), Collections .emptyMap ());
145139 return hudiSourceClientProvider ;
146140 } else if (sourceTableFormat == TableFormat .DELTA ) {
141+ SourceClientProvider <Long > deltaSourceClientProvider = new DeltaSourceClientProvider ();
142+ deltaSourceClientProvider .init (jsc .hadoopConfiguration (), Collections .emptyMap ());
147143 return deltaSourceClientProvider ;
144+ } else if (sourceTableFormat == TableFormat .ICEBERG ) {
145+ SourceClientProvider <Snapshot > icebergSourceClientProvider =
146+ new IcebergSourceClientProvider ();
147+ icebergSourceClientProvider .init (jsc .hadoopConfiguration (), Collections .emptyMap ());
148+ return icebergSourceClientProvider ;
148149 } else {
149150 throw new IllegalArgumentException ("Unsupported source format: " + sourceTableFormat );
150151 }
@@ -183,6 +184,7 @@ public void testVariousOperations(
183184 .tableName (tableName )
184185 .targetTableFormats (targetTableFormats )
185186 .tableBasePath (table .getBasePath ())
187+ .tableDataPath (table .getDataPath ())
186188 .hudiSourceConfig (
187189 HudiSourceConfig .builder ()
188190 .partitionFieldSpecConfig (oneTablePartitionConfig )
@@ -215,6 +217,7 @@ public void testVariousOperations(
215217 .tableName (tableName )
216218 .targetTableFormats (targetTableFormats )
217219 .tableBasePath (tableWithUpdatedSchema .getBasePath ())
220+ .tableDataPath (tableWithUpdatedSchema .getDataPath ())
218221 .hudiSourceConfig (
219222 HudiSourceConfig .builder ()
220223 .partitionFieldSpecConfig (oneTablePartitionConfig )
@@ -254,6 +257,7 @@ public void testVariousOperations(
254257 public void testConcurrentInsertWritesInSource (
255258 SyncMode syncMode , PartitionConfig partitionConfig ) {
256259 String tableName = getTableName ();
260+ SourceClientProvider <?> sourceClientProvider = getSourceClientProvider (TableFormat .HUDI );
257261 List <TableFormat > targetTableFormats = getOtherFormats (TableFormat .HUDI );
258262 try (TestJavaHudiTable table =
259263 TestJavaHudiTable .forStandardSchema (
@@ -279,11 +283,11 @@ public void testConcurrentInsertWritesInSource(
279283 .syncMode (syncMode )
280284 .build ();
281285 OneTableClient oneTableClient = new OneTableClient (jsc .hadoopConfiguration ());
282- oneTableClient .sync (perTableConfig , hudiSourceClientProvider );
286+ oneTableClient .sync (perTableConfig , sourceClientProvider );
283287
284288 checkDatasetEquivalence (TableFormat .HUDI , table , targetTableFormats , 50 );
285289 table .insertRecordsWithCommitAlreadyStarted (insertsForCommit1 , commitInstant1 , true );
286- oneTableClient .sync (perTableConfig , hudiSourceClientProvider );
290+ oneTableClient .sync (perTableConfig , sourceClientProvider );
287291 checkDatasetEquivalence (TableFormat .HUDI , table , targetTableFormats , 100 );
288292 }
289293 }
@@ -293,7 +297,7 @@ public void testConcurrentInsertWritesInSource(
293297 public void testConcurrentInsertsAndTableServiceWrites (
294298 SyncMode syncMode , PartitionConfig partitionConfig ) {
295299 HoodieTableType tableType = HoodieTableType .MERGE_ON_READ ;
296-
300+ SourceClientProvider <?> sourceClientProvider = getSourceClientProvider ( TableFormat . HUDI );
297301 List <TableFormat > targetTableFormats = getOtherFormats (TableFormat .HUDI );
298302 String tableName = getTableName ();
299303 try (TestSparkHudiTable table =
@@ -313,15 +317,15 @@ public void testConcurrentInsertsAndTableServiceWrites(
313317 .syncMode (syncMode )
314318 .build ();
315319 OneTableClient oneTableClient = new OneTableClient (jsc .hadoopConfiguration ());
316- oneTableClient .sync (perTableConfig , hudiSourceClientProvider );
320+ oneTableClient .sync (perTableConfig , sourceClientProvider );
317321 checkDatasetEquivalence (TableFormat .HUDI , table , targetTableFormats , 50 );
318322
319323 table .deleteRecords (insertedRecords1 .subList (0 , 20 ), true );
320324 // At this point table should have 30 records but only after compaction.
321325 String scheduledCompactionInstant = table .onlyScheduleCompaction ();
322326
323327 table .insertRecords (50 , true );
324- oneTableClient .sync (perTableConfig , hudiSourceClientProvider );
328+ oneTableClient .sync (perTableConfig , sourceClientProvider );
325329 Map <String , String > sourceHudiOptions =
326330 Collections .singletonMap ("hoodie.datasource.query.type" , "read_optimized" );
327331 // Because compaction is not completed yet and read optimized query, there are 100 records.
@@ -334,7 +338,7 @@ public void testConcurrentInsertsAndTableServiceWrites(
334338 100 );
335339
336340 table .insertRecords (50 , true );
337- oneTableClient .sync (perTableConfig , hudiSourceClientProvider );
341+ oneTableClient .sync (perTableConfig , sourceClientProvider );
338342 // Because compaction is not completed yet and read optimized query, there are 150 records.
339343 checkDatasetEquivalence (
340344 TableFormat .HUDI ,
@@ -345,15 +349,15 @@ public void testConcurrentInsertsAndTableServiceWrites(
345349 150 );
346350
347351 table .completeScheduledCompaction (scheduledCompactionInstant );
348- oneTableClient .sync (perTableConfig , hudiSourceClientProvider );
352+ oneTableClient .sync (perTableConfig , sourceClientProvider );
349353 checkDatasetEquivalence (TableFormat .HUDI , table , targetTableFormats , 130 );
350354 }
351355 }
352356
353357 @ ParameterizedTest
354358 @ EnumSource (
355359 value = TableFormat .class ,
356- names = {"HUDI" , "DELTA" })
360+ names = {"HUDI" , "DELTA" , "ICEBERG" })
357361 public void testTimeTravelQueries (TableFormat sourceTableFormat ) throws Exception {
358362 String tableName = getTableName ();
359363 try (GenericTable table =
@@ -365,6 +369,7 @@ public void testTimeTravelQueries(TableFormat sourceTableFormat) throws Exceptio
365369 .tableName (tableName )
366370 .targetTableFormats (targetTableFormats )
367371 .tableBasePath (table .getBasePath ())
372+ .tableDataPath (table .getDataPath ())
368373 .syncMode (SyncMode .INCREMENTAL )
369374 .build ();
370375 SourceClientProvider <?> sourceClientProvider = getSourceClientProvider (sourceTableFormat );
@@ -462,6 +467,7 @@ public void testPartitionedData(
462467 String hudiPartitionConfig ,
463468 String filter ) {
464469 String tableName = getTableName ();
470+ SourceClientProvider <?> sourceClientProvider = getSourceClientProvider (TableFormat .HUDI );
465471 try (TestJavaHudiTable table =
466472 TestJavaHudiTable .forStandardSchema (
467473 tableName , tempDir , hudiPartitionConfig , HoodieTableType .COPY_ON_WRITE )) {
@@ -478,10 +484,10 @@ public void testPartitionedData(
478484 .build ();
479485 table .insertRecords (100 , true );
480486 OneTableClient oneTableClient = new OneTableClient (jsc .hadoopConfiguration ());
481- oneTableClient .sync (perTableConfig , hudiSourceClientProvider );
487+ oneTableClient .sync (perTableConfig , sourceClientProvider );
482488 // Do a second sync to force the test to read back the metadata it wrote earlier
483489 table .insertRecords (100 , true );
484- oneTableClient .sync (perTableConfig , hudiSourceClientProvider );
490+ oneTableClient .sync (perTableConfig , sourceClientProvider );
485491
486492 checkDatasetEquivalenceWithFilter (TableFormat .HUDI , table , targetTableFormats , filter );
487493 }
@@ -491,6 +497,7 @@ public void testPartitionedData(
491497 @ EnumSource (value = SyncMode .class )
492498 public void testSyncWithSingleFormat (SyncMode syncMode ) {
493499 String tableName = getTableName ();
500+ SourceClientProvider <?> sourceClientProvider = getSourceClientProvider (TableFormat .HUDI );
494501 try (TestJavaHudiTable table =
495502 TestJavaHudiTable .forStandardSchema (
496503 tableName , tempDir , null , HoodieTableType .COPY_ON_WRITE )) {
@@ -513,18 +520,18 @@ public void testSyncWithSingleFormat(SyncMode syncMode) {
513520 .build ();
514521
515522 OneTableClient oneTableClient = new OneTableClient (jsc .hadoopConfiguration ());
516- oneTableClient .sync (perTableConfigIceberg , hudiSourceClientProvider );
523+ oneTableClient .sync (perTableConfigIceberg , sourceClientProvider );
517524 checkDatasetEquivalence (
518525 TableFormat .HUDI , table , Collections .singletonList (TableFormat .ICEBERG ), 100 );
519- oneTableClient .sync (perTableConfigDelta , hudiSourceClientProvider );
526+ oneTableClient .sync (perTableConfigDelta , sourceClientProvider );
520527 checkDatasetEquivalence (
521528 TableFormat .HUDI , table , Collections .singletonList (TableFormat .DELTA ), 100 );
522529
523530 table .insertRecords (100 , true );
524- oneTableClient .sync (perTableConfigIceberg , hudiSourceClientProvider );
531+ oneTableClient .sync (perTableConfigIceberg , sourceClientProvider );
525532 checkDatasetEquivalence (
526533 TableFormat .HUDI , table , Collections .singletonList (TableFormat .ICEBERG ), 200 );
527- oneTableClient .sync (perTableConfigDelta , hudiSourceClientProvider );
534+ oneTableClient .sync (perTableConfigDelta , sourceClientProvider );
528535 checkDatasetEquivalence (
529536 TableFormat .HUDI , table , Collections .singletonList (TableFormat .DELTA ), 200 );
530537 }
@@ -533,6 +540,7 @@ public void testSyncWithSingleFormat(SyncMode syncMode) {
533540 @ Test
534541 public void testOutOfSyncIncrementalSyncs () {
535542 String tableName = getTableName ();
543+ SourceClientProvider <?> sourceClientProvider = getSourceClientProvider (TableFormat .HUDI );
536544 try (TestJavaHudiTable table =
537545 TestJavaHudiTable .forStandardSchema (
538546 tableName , tempDir , null , HoodieTableType .COPY_ON_WRITE )) {
@@ -555,13 +563,13 @@ public void testOutOfSyncIncrementalSyncs() {
555563 table .insertRecords (50 , true );
556564 OneTableClient oneTableClient = new OneTableClient (jsc .hadoopConfiguration ());
557565 // sync iceberg only
558- oneTableClient .sync (singleTableConfig , hudiSourceClientProvider );
566+ oneTableClient .sync (singleTableConfig , sourceClientProvider );
559567 checkDatasetEquivalence (
560568 TableFormat .HUDI , table , Collections .singletonList (TableFormat .ICEBERG ), 50 );
561569 // insert more records
562570 table .insertRecords (50 , true );
563571 // iceberg will be an incremental sync and delta will need to bootstrap with snapshot sync
564- oneTableClient .sync (dualTableConfig , hudiSourceClientProvider );
572+ oneTableClient .sync (dualTableConfig , sourceClientProvider );
565573 checkDatasetEquivalence (
566574 TableFormat .HUDI , table , Arrays .asList (TableFormat .ICEBERG , TableFormat .DELTA ), 100 );
567575
@@ -570,14 +578,14 @@ public void testOutOfSyncIncrementalSyncs() {
570578 // insert more records
571579 table .insertRecords (50 , true );
572580 // incremental sync for two commits for iceberg only
573- oneTableClient .sync (singleTableConfig , hudiSourceClientProvider );
581+ oneTableClient .sync (singleTableConfig , sourceClientProvider );
574582 checkDatasetEquivalence (
575583 TableFormat .HUDI , table , Collections .singletonList (TableFormat .ICEBERG ), 200 );
576584
577585 // insert more records
578586 table .insertRecords (50 , true );
579587 // incremental sync for one commit for iceberg and three commits for delta
580- oneTableClient .sync (dualTableConfig , hudiSourceClientProvider );
588+ oneTableClient .sync (dualTableConfig , sourceClientProvider );
581589 checkDatasetEquivalence (
582590 TableFormat .HUDI , table , Arrays .asList (TableFormat .ICEBERG , TableFormat .DELTA ), 250 );
583591 }
@@ -586,6 +594,7 @@ public void testOutOfSyncIncrementalSyncs() {
586594 @ Test
587595 public void testMetadataRetention () {
588596 String tableName = getTableName ();
597+ SourceClientProvider <?> sourceClientProvider = getSourceClientProvider (TableFormat .HUDI );
589598 try (TestJavaHudiTable table =
590599 TestJavaHudiTable .forStandardSchema (
591600 tableName , tempDir , null , HoodieTableType .COPY_ON_WRITE )) {
@@ -599,7 +608,7 @@ public void testMetadataRetention() {
599608 .build ();
600609 OneTableClient oneTableClient = new OneTableClient (jsc .hadoopConfiguration ());
601610 table .insertRecords (10 , true );
602- oneTableClient .sync (perTableConfig , hudiSourceClientProvider );
611+ oneTableClient .sync (perTableConfig , sourceClientProvider );
603612 // later we will ensure we can still read the source table at this instant to ensure that
604613 // neither target cleaned up the underlying parquet files in the table
605614 Instant instantAfterFirstCommit = Instant .now ();
@@ -608,7 +617,7 @@ public void testMetadataRetention() {
608617 .forEach (
609618 unused -> {
610619 table .insertRecords (10 , true );
611- oneTableClient .sync (perTableConfig , hudiSourceClientProvider );
620+ oneTableClient .sync (perTableConfig , sourceClientProvider );
612621 });
613622 // ensure that hudi rows can still be read and underlying files were not removed
614623 List <Row > rows =
@@ -729,7 +738,7 @@ private void checkDatasetEquivalence(
729738 .read ()
730739 .options (finalTargetOptions )
731740 .format (targetFormat .name ().toLowerCase ())
732- .load (sourceTable .getBasePath ())
741+ .load (sourceTable .getDataPath ())
733742 .orderBy (sourceTable .getOrderByColumn ())
734743 .filter (filterCondition );
735744 }));
0 commit comments