Skip to content

Commit e7d9ec1

Browse files
author
Jack Ye
authored
Spark 3.3: Support write to WAP branch (#7050)
1 parent 7f20b36 commit e7d9ec1

File tree

9 files changed

+373
-4
lines changed

9 files changed

+373
-4
lines changed

spark/v3.3/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestDelete.java

Lines changed: 69 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -47,6 +47,7 @@
4747
import org.apache.iceberg.RowLevelOperationMode;
4848
import org.apache.iceberg.Snapshot;
4949
import org.apache.iceberg.Table;
50+
import org.apache.iceberg.TableProperties;
5051
import org.apache.iceberg.data.GenericRecord;
5152
import org.apache.iceberg.exceptions.ValidationException;
5253
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList;
@@ -55,6 +56,7 @@
5556
import org.apache.iceberg.relocated.com.google.common.collect.Lists;
5657
import org.apache.iceberg.relocated.com.google.common.util.concurrent.MoreExecutors;
5758
import org.apache.iceberg.spark.Spark3Util;
59+
import org.apache.iceberg.spark.SparkSQLProperties;
5860
import org.apache.iceberg.spark.data.TestHelpers;
5961
import org.apache.iceberg.util.SnapshotUtil;
6062
import org.apache.spark.SparkException;
@@ -1066,6 +1068,73 @@ public void testDeleteWithMultipleSpecs() {
10661068
sql("SELECT * FROM %s ORDER BY id", selectTarget()));
10671069
}
10681070

1071+
@Test
1072+
public void testDeleteToWapBranch() throws NoSuchTableException {
1073+
Assume.assumeTrue("WAP branch only works for table identifier without branch", branch == null);
1074+
1075+
createAndInitPartitionedTable();
1076+
sql(
1077+
"ALTER TABLE %s SET TBLPROPERTIES ('%s' = 'true')",
1078+
tableName, TableProperties.WRITE_AUDIT_PUBLISH_ENABLED);
1079+
append(new Employee(0, "hr"), new Employee(1, "hr"), new Employee(2, "hr"));
1080+
1081+
withSQLConf(
1082+
ImmutableMap.of(SparkSQLProperties.WAP_BRANCH, "wap"),
1083+
() -> {
1084+
sql("DELETE FROM %s t WHERE id=0", tableName);
1085+
Assert.assertEquals(
1086+
"Should have expected num of rows when reading table",
1087+
2L,
1088+
spark.table(tableName).count());
1089+
Assert.assertEquals(
1090+
"Should have expected num of rows when reading WAP branch",
1091+
2L,
1092+
spark.table(tableName + ".branch_wap").count());
1093+
Assert.assertEquals(
1094+
"Should not modify main branch", 3L, spark.table(tableName + ".branch_main").count());
1095+
});
1096+
1097+
withSQLConf(
1098+
ImmutableMap.of(SparkSQLProperties.WAP_BRANCH, "wap"),
1099+
() -> {
1100+
sql("DELETE FROM %s t WHERE id=1", tableName);
1101+
Assert.assertEquals(
1102+
"Should have expected num of rows when reading table with multiple writes",
1103+
1L,
1104+
spark.table(tableName).count());
1105+
Assert.assertEquals(
1106+
"Should have expected num of rows when reading WAP branch with multiple writes",
1107+
1L,
1108+
spark.table(tableName + ".branch_wap").count());
1109+
Assert.assertEquals(
1110+
"Should not modify main branch with multiple writes",
1111+
3L,
1112+
spark.table(tableName + ".branch_main").count());
1113+
});
1114+
}
1115+
1116+
@Test
1117+
public void testDeleteToWapBranchWithTableBranchIdentifier() throws NoSuchTableException {
1118+
Assume.assumeTrue("Test must have branch name part in table identifier", branch != null);
1119+
1120+
createAndInitPartitionedTable();
1121+
sql(
1122+
"ALTER TABLE %s SET TBLPROPERTIES ('%s' = 'true')",
1123+
tableName, TableProperties.WRITE_AUDIT_PUBLISH_ENABLED);
1124+
append(tableName, new Employee(0, "hr"), new Employee(1, "hr"), new Employee(2, "hr"));
1125+
createBranchIfNeeded();
1126+
1127+
withSQLConf(
1128+
ImmutableMap.of(SparkSQLProperties.WAP_BRANCH, "wap"),
1129+
() ->
1130+
Assertions.assertThatThrownBy(() -> sql("DELETE FROM %s t WHERE id=0", commitTarget()))
1131+
.isInstanceOf(ValidationException.class)
1132+
.hasMessage(
1133+
String.format(
1134+
"Cannot write to both branch and WAP branch, but got branch [%s] and WAP branch [wap]",
1135+
branch)));
1136+
}
1137+
10691138
// TODO: multiple stripes for ORC
10701139

10711140
protected void createAndInitPartitionedTable() {

spark/v3.3/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestMerge.java

Lines changed: 92 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -47,12 +47,14 @@
4747
import org.apache.iceberg.Snapshot;
4848
import org.apache.iceberg.SnapshotSummary;
4949
import org.apache.iceberg.Table;
50+
import org.apache.iceberg.TableProperties;
5051
import org.apache.iceberg.data.GenericRecord;
5152
import org.apache.iceberg.exceptions.ValidationException;
5253
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList;
5354
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
5455
import org.apache.iceberg.relocated.com.google.common.collect.Lists;
5556
import org.apache.iceberg.relocated.com.google.common.util.concurrent.MoreExecutors;
57+
import org.apache.iceberg.spark.SparkSQLProperties;
5658
import org.apache.iceberg.util.SnapshotUtil;
5759
import org.apache.spark.SparkException;
5860
import org.apache.spark.sql.AnalysisException;
@@ -2448,6 +2450,96 @@ public void testMergeNonExistingBranch() {
24482450
.hasMessage("Cannot use branch (does not exist): test");
24492451
}
24502452

2453+
@Test
2454+
public void testMergeToWapBranch() {
2455+
Assume.assumeTrue("WAP branch only works for table identifier without branch", branch == null);
2456+
2457+
createAndInitTable("id INT", "{\"id\": -1}");
2458+
ImmutableList<Object[]> originalRows = ImmutableList.of(row(-1));
2459+
sql(
2460+
"ALTER TABLE %s SET TBLPROPERTIES ('%s' = 'true')",
2461+
tableName, TableProperties.WRITE_AUDIT_PUBLISH_ENABLED);
2462+
spark.range(0, 5).coalesce(1).createOrReplaceTempView("source");
2463+
ImmutableList<Object[]> expectedRows =
2464+
ImmutableList.of(row(-1), row(0), row(1), row(2), row(3), row(4));
2465+
2466+
withSQLConf(
2467+
ImmutableMap.of(SparkSQLProperties.WAP_BRANCH, "wap"),
2468+
() -> {
2469+
sql(
2470+
"MERGE INTO %s t USING source s ON t.id = s.id "
2471+
+ "WHEN MATCHED THEN UPDATE SET *"
2472+
+ "WHEN NOT MATCHED THEN INSERT *",
2473+
tableName);
2474+
assertEquals(
2475+
"Should have expected rows when reading table",
2476+
expectedRows,
2477+
sql("SELECT * FROM %s ORDER BY id", tableName));
2478+
assertEquals(
2479+
"Should have expected rows when reading WAP branch",
2480+
expectedRows,
2481+
sql("SELECT * FROM %s.branch_wap ORDER BY id", tableName));
2482+
assertEquals(
2483+
"Should not modify main branch",
2484+
originalRows,
2485+
sql("SELECT * FROM %s.branch_main ORDER BY id", tableName));
2486+
});
2487+
2488+
spark.range(3, 6).coalesce(1).createOrReplaceTempView("source2");
2489+
ImmutableList<Object[]> expectedRows2 =
2490+
ImmutableList.of(row(-1), row(0), row(1), row(2), row(5));
2491+
withSQLConf(
2492+
ImmutableMap.of(SparkSQLProperties.WAP_BRANCH, "wap"),
2493+
() -> {
2494+
sql(
2495+
"MERGE INTO %s t USING source2 s ON t.id = s.id "
2496+
+ "WHEN MATCHED THEN DELETE "
2497+
+ "WHEN NOT MATCHED THEN INSERT *",
2498+
tableName);
2499+
assertEquals(
2500+
"Should have expected rows when reading table with multiple writes",
2501+
expectedRows2,
2502+
sql("SELECT * FROM %s ORDER BY id", tableName));
2503+
assertEquals(
2504+
"Should have expected rows when reading WAP branch with multiple writes",
2505+
expectedRows2,
2506+
sql("SELECT * FROM %s.branch_wap ORDER BY id", tableName));
2507+
assertEquals(
2508+
"Should not modify main branch with multiple writes",
2509+
originalRows,
2510+
sql("SELECT * FROM %s.branch_main ORDER BY id", tableName));
2511+
});
2512+
}
2513+
2514+
@Test
2515+
public void testMergeToWapBranchWithTableBranchIdentifier() {
2516+
Assume.assumeTrue("Test must have branch name part in table identifier", branch != null);
2517+
2518+
createAndInitTable("id INT", "{\"id\": -1}");
2519+
sql(
2520+
"ALTER TABLE %s SET TBLPROPERTIES ('%s' = 'true')",
2521+
tableName, TableProperties.WRITE_AUDIT_PUBLISH_ENABLED);
2522+
spark.range(0, 5).coalesce(1).createOrReplaceTempView("source");
2523+
ImmutableList<Object[]> expectedRows =
2524+
ImmutableList.of(row(-1), row(0), row(1), row(2), row(3), row(4));
2525+
2526+
withSQLConf(
2527+
ImmutableMap.of(SparkSQLProperties.WAP_BRANCH, "wap"),
2528+
() ->
2529+
Assertions.assertThatThrownBy(
2530+
() ->
2531+
sql(
2532+
"MERGE INTO %s t USING source s ON t.id = s.id "
2533+
+ "WHEN MATCHED THEN UPDATE SET *"
2534+
+ "WHEN NOT MATCHED THEN INSERT *",
2535+
commitTarget()))
2536+
.isInstanceOf(ValidationException.class)
2537+
.hasMessage(
2538+
String.format(
2539+
"Cannot write to both branch and WAP branch, but got branch [%s] and WAP branch [wap]",
2540+
branch)));
2541+
}
2542+
24512543
private void checkJoinAndFilterConditions(String query, String join, String icebergFilters) {
24522544
// disable runtime filtering for easier validation
24532545
withSQLConf(

spark/v3.3/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestUpdate.java

Lines changed: 69 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -48,6 +48,7 @@
4848
import org.apache.iceberg.Snapshot;
4949
import org.apache.iceberg.SnapshotSummary;
5050
import org.apache.iceberg.Table;
51+
import org.apache.iceberg.TableProperties;
5152
import org.apache.iceberg.data.GenericRecord;
5253
import org.apache.iceberg.exceptions.ValidationException;
5354
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList;
@@ -1257,6 +1258,74 @@ public void testUpdateOnNonIcebergTableNotSupported() {
12571258
() -> sql("UPDATE %s SET c1 = -1 WHERE c2 = 1", "testtable"));
12581259
}
12591260

1261+
@Test
1262+
public void testUpdateToWAPBranch() {
1263+
Assume.assumeTrue("WAP branch only works for table identifier without branch", branch == null);
1264+
1265+
createAndInitTable(
1266+
"id INT, dep STRING", "{ \"id\": 1, \"dep\": \"hr\" }\n" + "{ \"id\": 2, \"dep\": \"a\" }");
1267+
sql(
1268+
"ALTER TABLE %s SET TBLPROPERTIES ('%s' = 'true')",
1269+
tableName, TableProperties.WRITE_AUDIT_PUBLISH_ENABLED);
1270+
1271+
withSQLConf(
1272+
ImmutableMap.of(SparkSQLProperties.WAP_BRANCH, "wap"),
1273+
() -> {
1274+
sql("UPDATE %s SET dep='hr' WHERE dep='a'", tableName);
1275+
Assert.assertEquals(
1276+
"Should have expected num of rows when reading table",
1277+
2L,
1278+
sql("SELECT * FROM %s WHERE dep='hr'", tableName).size());
1279+
Assert.assertEquals(
1280+
"Should have expected num of rows when reading WAP branch",
1281+
2L,
1282+
sql("SELECT * FROM %s.branch_wap WHERE dep='hr'", tableName).size());
1283+
Assert.assertEquals(
1284+
"Should not modify main branch",
1285+
1L,
1286+
sql("SELECT * FROM %s.branch_main WHERE dep='hr'", tableName).size());
1287+
});
1288+
1289+
withSQLConf(
1290+
ImmutableMap.of(SparkSQLProperties.WAP_BRANCH, "wap"),
1291+
() -> {
1292+
sql("UPDATE %s SET dep='b' WHERE dep='hr'", tableName);
1293+
Assert.assertEquals(
1294+
"Should have expected num of rows when reading table with multiple writes",
1295+
2L,
1296+
sql("SELECT * FROM %s WHERE dep='b'", tableName).size());
1297+
Assert.assertEquals(
1298+
"Should have expected num of rows when reading WAP branch with multiple writes",
1299+
2L,
1300+
sql("SELECT * FROM %s.branch_wap WHERE dep='b'", tableName).size());
1301+
Assert.assertEquals(
1302+
"Should not modify main branch with multiple writes",
1303+
0L,
1304+
sql("SELECT * FROM %s.branch_main WHERE dep='b'", tableName).size());
1305+
});
1306+
}
1307+
1308+
@Test
1309+
public void testUpdateToWapBranchWithTableBranchIdentifier() {
1310+
Assume.assumeTrue("Test must have branch name part in table identifier", branch != null);
1311+
1312+
createAndInitTable("id INT, dep STRING", "{ \"id\": 1, \"dep\": \"hr\" }");
1313+
sql(
1314+
"ALTER TABLE %s SET TBLPROPERTIES ('%s' = 'true')",
1315+
tableName, TableProperties.WRITE_AUDIT_PUBLISH_ENABLED);
1316+
1317+
withSQLConf(
1318+
ImmutableMap.of(SparkSQLProperties.WAP_BRANCH, "wap"),
1319+
() ->
1320+
Assertions.assertThatThrownBy(
1321+
() -> sql("UPDATE %s SET dep='hr' WHERE dep='a'", commitTarget()))
1322+
.isInstanceOf(ValidationException.class)
1323+
.hasMessage(
1324+
String.format(
1325+
"Cannot write to both branch and WAP branch, but got branch [%s] and WAP branch [wap]",
1326+
branch)));
1327+
}
1328+
12601329
private RowLevelOperationMode mode(Table table) {
12611330
String modeName = table.properties().getOrDefault(UPDATE_MODE, UPDATE_MODE_DEFAULT);
12621331
return RowLevelOperationMode.fromName(modeName);

spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/SparkReadConf.java

Lines changed: 16 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -98,7 +98,22 @@ public String branch() {
9898
+ "got [%s] in identifier and [%s] in options",
9999
branch,
100100
optionBranch);
101-
return branch != null ? branch : optionBranch;
101+
String inputBranch = branch != null ? branch : optionBranch;
102+
if (inputBranch != null) {
103+
return inputBranch;
104+
}
105+
106+
boolean wapEnabled =
107+
PropertyUtil.propertyAsBoolean(
108+
table.properties(), TableProperties.WRITE_AUDIT_PUBLISH_ENABLED, false);
109+
if (wapEnabled) {
110+
String wapBranch = spark.conf().get(SparkSQLProperties.WAP_BRANCH, null);
111+
if (wapBranch != null && table.refs().containsKey(wapBranch)) {
112+
return wapBranch;
113+
}
114+
}
115+
116+
return null;
102117
}
103118

104119
public String tag() {

spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/SparkSQLProperties.java

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -55,4 +55,12 @@ private SparkSQLProperties() {}
5555

5656
// Controls write distribution mode
5757
public static final String DISTRIBUTION_MODE = "spark.sql.iceberg.distribution-mode";
58+
59+
// Controls the WAP ID used for write-audit-publish workflow.
60+
// When set, new snapshots will be staged with this ID in snapshot summary.
61+
public static final String WAP_ID = "spark.wap.id";
62+
63+
// Controls the WAP branch used for write-audit-publish workflow.
64+
// When set, new snapshots will be committed to this branch.
65+
public static final String WAP_BRANCH = "spark.wap.branch";
5866
}

spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/SparkWriteConf.java

Lines changed: 24 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,7 @@
3030
import org.apache.iceberg.SnapshotSummary;
3131
import org.apache.iceberg.Table;
3232
import org.apache.iceberg.TableProperties;
33+
import org.apache.iceberg.exceptions.ValidationException;
3334
import org.apache.iceberg.relocated.com.google.common.collect.Maps;
3435
import org.apache.spark.sql.RuntimeConfig;
3536
import org.apache.spark.sql.SparkSession;
@@ -128,7 +129,7 @@ public boolean wapEnabled() {
128129
}
129130

130131
public String wapId() {
131-
return sessionConf.get("spark.wap.id", null);
132+
return sessionConf.get(SparkSQLProperties.WAP_ID, null);
132133
}
133134

134135
public boolean mergeSchema() {
@@ -333,6 +334,28 @@ public boolean caseSensitive() {
333334
}
334335

335336
public String branch() {
337+
if (wapEnabled()) {
338+
String wapId = wapId();
339+
String wapBranch =
340+
confParser.stringConf().sessionConf(SparkSQLProperties.WAP_BRANCH).parseOptional();
341+
342+
ValidationException.check(
343+
wapId == null || wapBranch == null,
344+
"Cannot set both WAP ID and branch, but got ID [%s] and branch [%s]",
345+
wapId,
346+
wapBranch);
347+
348+
if (wapBranch != null) {
349+
ValidationException.check(
350+
branch == null,
351+
"Cannot write to both branch and WAP branch, but got branch [%s] and WAP branch [%s]",
352+
branch,
353+
wapBranch);
354+
355+
return wapBranch;
356+
}
357+
}
358+
336359
return branch;
337360
}
338361
}

spark/v3.3/spark/src/test/java/org/apache/iceberg/spark/actions/TestRemoveOrphanFilesAction.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -67,6 +67,7 @@
6767
import org.apache.iceberg.relocated.com.google.common.collect.Lists;
6868
import org.apache.iceberg.relocated.com.google.common.collect.Maps;
6969
import org.apache.iceberg.relocated.com.google.common.collect.Sets;
70+
import org.apache.iceberg.spark.SparkSQLProperties;
7071
import org.apache.iceberg.spark.SparkTestBase;
7172
import org.apache.iceberg.spark.actions.DeleteOrphanFilesSparkAction.StringToFileURI;
7273
import org.apache.iceberg.spark.source.FilePathLastModifiedRecord;
@@ -319,7 +320,7 @@ public void testWapFilesAreKept() throws InterruptedException {
319320
// normal write
320321
df.select("c1", "c2", "c3").write().format("iceberg").mode("append").save(tableLocation);
321322

322-
spark.conf().set("spark.wap.id", "1");
323+
spark.conf().set(SparkSQLProperties.WAP_ID, "1");
323324

324325
// wap write
325326
df.select("c1", "c2", "c3").write().format("iceberg").mode("append").save(tableLocation);

0 commit comments

Comments
 (0)