Skip to content

Commit 0b74d50

Browse files
bryanckAcehaidrey
andauthored
Spark: Fix changelog table bug for start time older than current snapshot (#11564) (#11613)
Co-authored-by: Ace Haidrey <[email protected]>
1 parent 9e6595f commit 0b74d50

File tree

2 files changed

+52
-0
lines changed

2 files changed

+52
-0
lines changed

spark/v3.5/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestChangelogTable.java

Lines changed: 48 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -408,4 +408,52 @@ private List<Row> collect(DataFrameReader reader) {
408408
.orderBy("_change_ordinal", "_commit_snapshot_id", "_change_type", "id")
409409
.collectAsList();
410410
}
411+
412+
@TestTemplate
413+
public void testChangelogViewOutsideTimeRange() {
414+
createTableWithDefaultRows();
415+
416+
// Insert new records
417+
sql("INSERT INTO %s VALUES (3, 'c')", tableName);
418+
sql("INSERT INTO %s VALUES (4, 'd')", tableName);
419+
420+
Table table = validationCatalog.loadTable(tableIdent);
421+
Snapshot insertSnapshot = table.currentSnapshot();
422+
423+
// Get timestamp after inserts but before our changelog window
424+
long beforeWindowTime = System.currentTimeMillis();
425+
426+
// Small delay to ensure our timestamps are different
427+
try {
428+
Thread.sleep(100);
429+
} catch (InterruptedException e) {
430+
throw new RuntimeException("Test interrupted", e);
431+
}
432+
433+
long startTime = System.currentTimeMillis();
434+
long endTime = startTime + 1000; // 1 second window
435+
436+
// Create changelog view for a time window after our inserts
437+
sql(
438+
"CALL %s.system.create_changelog_view("
439+
+ " table => '%s', "
440+
+ " options => map("
441+
+ " 'start-timestamp', '%d',"
442+
+ " 'end-timestamp', '%d'"
443+
+ " ),"
444+
+ " changelog_view => 'test_changelog_view'"
445+
+ ")",
446+
catalogName, tableName, startTime, endTime);
447+
448+
// Query the changelog view
449+
List<Object[]> results =
450+
sql(
451+
"SELECT * FROM test_changelog_view WHERE _change_type IN ('INSERT', 'DELETE') ORDER BY _change_ordinal");
452+
453+
// Verify no changes are returned since our window is after the inserts
454+
assertThat(results).as("Num records must be zero").isEmpty();
455+
456+
// Clean up the changelog view
457+
sql("DROP VIEW IF EXISTS test_changelog_view");
458+
}
411459
}

spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/source/SparkScanBuilder.java

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -561,6 +561,10 @@ public Scan buildChangelogScan() {
561561

562562
boolean emptyScan = false;
563563
if (startTimestamp != null) {
564+
if (table.currentSnapshot() != null
565+
&& table.currentSnapshot().timestampMillis() < startTimestamp) {
566+
emptyScan = true;
567+
}
564568
startSnapshotId = getStartSnapshotId(startTimestamp);
565569
if (startSnapshotId == null && endTimestamp == null) {
566570
emptyScan = true;

0 commit comments

Comments
 (0)