Skip to content

Commit 2ad3c1a

Browse files
authored
Spark 3.3: Expose action classes in SparkActions (#5257)
1 parent 774b2f7 commit 2ad3c1a

17 files changed

+144
-181
lines changed

spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/actions/BaseSnapshotUpdateSparkAction.java

Lines changed: 1 addition & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -20,20 +20,17 @@
2020
package org.apache.iceberg.spark.actions;
2121

2222
import java.util.Map;
23-
import org.apache.iceberg.actions.SnapshotUpdate;
2423
import org.apache.iceberg.relocated.com.google.common.collect.Maps;
2524
import org.apache.spark.sql.SparkSession;
2625

27-
abstract class BaseSnapshotUpdateSparkAction<ThisT, R>
28-
extends BaseSparkAction<ThisT, R> implements SnapshotUpdate<ThisT, R> {
26+
abstract class BaseSnapshotUpdateSparkAction<ThisT> extends BaseSparkAction<ThisT> {
2927

3028
private final Map<String, String> summary = Maps.newHashMap();
3129

3230
protected BaseSnapshotUpdateSparkAction(SparkSession spark) {
3331
super(spark);
3432
}
3533

36-
@Override
3734
public ThisT snapshotProperty(String property, String value) {
3835
summary.put(property, value);
3936
return self();

spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/actions/BaseSparkAction.java

Lines changed: 1 addition & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -34,7 +34,6 @@
3434
import org.apache.iceberg.StaticTableOperations;
3535
import org.apache.iceberg.Table;
3636
import org.apache.iceberg.TableMetadata;
37-
import org.apache.iceberg.actions.Action;
3837
import org.apache.iceberg.io.CloseableIterator;
3938
import org.apache.iceberg.io.ClosingIterator;
4039
import org.apache.iceberg.io.FileIO;
@@ -59,7 +58,7 @@
5958
import static org.apache.spark.sql.functions.col;
6059
import static org.apache.spark.sql.functions.lit;
6160

62-
abstract class BaseSparkAction<ThisT, R> implements Action<ThisT, R> {
61+
abstract class BaseSparkAction<ThisT> {
6362

6463
protected static final String CONTENT_FILE = "Content File";
6564
protected static final String MANIFEST = "Manifest";
@@ -91,13 +90,11 @@ protected JavaSparkContext sparkContext() {
9190

9291
protected abstract ThisT self();
9392

94-
@Override
9593
public ThisT option(String name, String value) {
9694
options.put(name, value);
9795
return self();
9896
}
9997

100-
@Override
10198
public ThisT options(Map<String, String> newOptions) {
10299
options.putAll(newOptions);
103100
return self();

spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/actions/BaseTableCreationSparkAction.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -49,7 +49,7 @@
4949
import org.apache.spark.sql.connector.expressions.Transform;
5050
import org.apache.spark.sql.types.StructType;
5151

52-
abstract class BaseTableCreationSparkAction<ThisT, R> extends BaseSparkAction<ThisT, R> {
52+
abstract class BaseTableCreationSparkAction<ThisT> extends BaseSparkAction<ThisT> {
5353
private static final Set<String> ALLOWED_SOURCES = ImmutableSet.of("parquet", "avro", "orc", "hive");
5454
protected static final String LOCATION = "location";
5555
protected static final String ICEBERG_METADATA_FOLDER = "metadata";

spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/actions/BaseDeleteOrphanFilesSparkAction.java renamed to spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/actions/DeleteOrphanFilesSparkAction.java

Lines changed: 10 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -90,10 +90,10 @@
9090
* <em>Note:</em> It is dangerous to call this action with a short retention interval as it might corrupt
9191
* the state of the table if another operation is writing at the same time.
9292
*/
93-
public class BaseDeleteOrphanFilesSparkAction
94-
extends BaseSparkAction<DeleteOrphanFiles, DeleteOrphanFiles.Result> implements DeleteOrphanFiles {
93+
public class DeleteOrphanFilesSparkAction
94+
extends BaseSparkAction<DeleteOrphanFilesSparkAction> implements DeleteOrphanFiles {
9595

96-
private static final Logger LOG = LoggerFactory.getLogger(BaseDeleteOrphanFilesSparkAction.class);
96+
private static final Logger LOG = LoggerFactory.getLogger(DeleteOrphanFilesSparkAction.class);
9797
private static final UserDefinedFunction filenameUDF = functions.udf((String path) -> {
9898
int lastIndex = path.lastIndexOf(File.separator);
9999
if (lastIndex == -1) {
@@ -119,7 +119,7 @@ public void accept(String file) {
119119
private Consumer<String> deleteFunc = defaultDelete;
120120
private ExecutorService deleteExecutorService = null;
121121

122-
public BaseDeleteOrphanFilesSparkAction(SparkSession spark, Table table) {
122+
DeleteOrphanFilesSparkAction(SparkSession spark, Table table) {
123123
super(spark);
124124

125125
this.hadoopConf = new SerializableConfiguration(spark.sessionState().newHadoopConf());
@@ -133,35 +133,35 @@ public BaseDeleteOrphanFilesSparkAction(SparkSession spark, Table table) {
133133
}
134134

135135
@Override
136-
protected DeleteOrphanFiles self() {
136+
protected DeleteOrphanFilesSparkAction self() {
137137
return this;
138138
}
139139

140140
@Override
141-
public BaseDeleteOrphanFilesSparkAction executeDeleteWith(ExecutorService executorService) {
141+
public DeleteOrphanFilesSparkAction executeDeleteWith(ExecutorService executorService) {
142142
this.deleteExecutorService = executorService;
143143
return this;
144144
}
145145

146146
@Override
147-
public BaseDeleteOrphanFilesSparkAction location(String newLocation) {
147+
public DeleteOrphanFilesSparkAction location(String newLocation) {
148148
this.location = newLocation;
149149
return this;
150150
}
151151

152152
@Override
153-
public BaseDeleteOrphanFilesSparkAction olderThan(long newOlderThanTimestamp) {
153+
public DeleteOrphanFilesSparkAction olderThan(long newOlderThanTimestamp) {
154154
this.olderThanTimestamp = newOlderThanTimestamp;
155155
return this;
156156
}
157157

158158
@Override
159-
public BaseDeleteOrphanFilesSparkAction deleteWith(Consumer<String> newDeleteFunc) {
159+
public DeleteOrphanFilesSparkAction deleteWith(Consumer<String> newDeleteFunc) {
160160
this.deleteFunc = newDeleteFunc;
161161
return this;
162162
}
163163

164-
public BaseDeleteOrphanFilesSparkAction compareToFileList(Dataset<Row> files) {
164+
public DeleteOrphanFilesSparkAction compareToFileList(Dataset<Row> files) {
165165
StructType schema = files.schema();
166166

167167
StructField filePathField = schema.apply(FILE_PATH);

spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/actions/BaseDeleteReachableFilesSparkAction.java renamed to spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/actions/DeleteReachableFilesSparkAction.java

Lines changed: 8 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -50,13 +50,13 @@
5050
* to determine which files should be deleted.
5151
*/
5252
@SuppressWarnings("UnnecessaryAnonymousClass")
53-
public class BaseDeleteReachableFilesSparkAction
54-
extends BaseSparkAction<DeleteReachableFiles, DeleteReachableFiles.Result> implements DeleteReachableFiles {
53+
public class DeleteReachableFilesSparkAction
54+
extends BaseSparkAction<DeleteReachableFilesSparkAction> implements DeleteReachableFiles {
5555

5656
public static final String STREAM_RESULTS = "stream-results";
5757
public static final boolean STREAM_RESULTS_DEFAULT = false;
5858

59-
private static final Logger LOG = LoggerFactory.getLogger(BaseDeleteReachableFilesSparkAction.class);
59+
private static final Logger LOG = LoggerFactory.getLogger(DeleteReachableFilesSparkAction.class);
6060

6161
private final String metadataFileLocation;
6262
private final Consumer<String> defaultDelete = new Consumer<String>() {
@@ -70,30 +70,30 @@ public void accept(String file) {
7070
private ExecutorService deleteExecutorService = null;
7171
private FileIO io = new HadoopFileIO(spark().sessionState().newHadoopConf());
7272

73-
public BaseDeleteReachableFilesSparkAction(SparkSession spark, String metadataFileLocation) {
73+
DeleteReachableFilesSparkAction(SparkSession spark, String metadataFileLocation) {
7474
super(spark);
7575
this.metadataFileLocation = metadataFileLocation;
7676
}
7777

7878
@Override
79-
protected DeleteReachableFiles self() {
79+
protected DeleteReachableFilesSparkAction self() {
8080
return this;
8181
}
8282

8383
@Override
84-
public DeleteReachableFiles io(FileIO fileIO) {
84+
public DeleteReachableFilesSparkAction io(FileIO fileIO) {
8585
this.io = fileIO;
8686
return this;
8787
}
8888

8989
@Override
90-
public DeleteReachableFiles deleteWith(Consumer<String> newDeleteFunc) {
90+
public DeleteReachableFilesSparkAction deleteWith(Consumer<String> newDeleteFunc) {
9191
this.deleteFunc = newDeleteFunc;
9292
return this;
9393
}
9494

9595
@Override
96-
public DeleteReachableFiles executeDeleteWith(ExecutorService executorService) {
96+
public DeleteReachableFilesSparkAction executeDeleteWith(ExecutorService executorService) {
9797
this.deleteExecutorService = executorService;
9898
return this;
9999
}

spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/actions/BaseExpireSnapshotsSparkAction.java renamed to spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/actions/ExpireSnapshotsSparkAction.java

Lines changed: 10 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -65,13 +65,13 @@
6565
* Deletes are still performed locally after retrieving the results from the Spark executors.
6666
*/
6767
@SuppressWarnings("UnnecessaryAnonymousClass")
68-
public class BaseExpireSnapshotsSparkAction
69-
extends BaseSparkAction<ExpireSnapshots, ExpireSnapshots.Result> implements ExpireSnapshots {
68+
public class ExpireSnapshotsSparkAction
69+
extends BaseSparkAction<ExpireSnapshotsSparkAction> implements ExpireSnapshots {
7070

7171
public static final String STREAM_RESULTS = "stream-results";
7272
public static final boolean STREAM_RESULTS_DEFAULT = false;
7373

74-
private static final Logger LOG = LoggerFactory.getLogger(BaseExpireSnapshotsSparkAction.class);
74+
private static final Logger LOG = LoggerFactory.getLogger(ExpireSnapshotsSparkAction.class);
7575

7676
private final Table table;
7777
private final TableOperations ops;
@@ -89,7 +89,7 @@ public void accept(String file) {
8989
private ExecutorService deleteExecutorService = null;
9090
private Dataset<Row> expiredFiles = null;
9191

92-
public BaseExpireSnapshotsSparkAction(SparkSession spark, Table table) {
92+
ExpireSnapshotsSparkAction(SparkSession spark, Table table) {
9393
super(spark);
9494
this.table = table;
9595
this.ops = ((HasTableOperations) table).operations();
@@ -100,38 +100,38 @@ public BaseExpireSnapshotsSparkAction(SparkSession spark, Table table) {
100100
}
101101

102102
@Override
103-
protected ExpireSnapshots self() {
103+
protected ExpireSnapshotsSparkAction self() {
104104
return this;
105105
}
106106

107107
@Override
108-
public BaseExpireSnapshotsSparkAction executeDeleteWith(ExecutorService executorService) {
108+
public ExpireSnapshotsSparkAction executeDeleteWith(ExecutorService executorService) {
109109
this.deleteExecutorService = executorService;
110110
return this;
111111
}
112112

113113
@Override
114-
public BaseExpireSnapshotsSparkAction expireSnapshotId(long snapshotId) {
114+
public ExpireSnapshotsSparkAction expireSnapshotId(long snapshotId) {
115115
expiredSnapshotIds.add(snapshotId);
116116
return this;
117117
}
118118

119119
@Override
120-
public BaseExpireSnapshotsSparkAction expireOlderThan(long timestampMillis) {
120+
public ExpireSnapshotsSparkAction expireOlderThan(long timestampMillis) {
121121
this.expireOlderThanValue = timestampMillis;
122122
return this;
123123
}
124124

125125
@Override
126-
public BaseExpireSnapshotsSparkAction retainLast(int numSnapshots) {
126+
public ExpireSnapshotsSparkAction retainLast(int numSnapshots) {
127127
Preconditions.checkArgument(1 <= numSnapshots,
128128
"Number of snapshots to retain must be at least 1, cannot be: %s", numSnapshots);
129129
this.retainLastValue = numSnapshots;
130130
return this;
131131
}
132132

133133
@Override
134-
public BaseExpireSnapshotsSparkAction deleteWith(Consumer<String> newDeleteFunc) {
134+
public ExpireSnapshotsSparkAction deleteWith(Consumer<String> newDeleteFunc) {
135135
this.deleteFunc = newDeleteFunc;
136136
return this;
137137
}

spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/actions/BaseMigrateTableSparkAction.java renamed to spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/actions/MigrateTableSparkAction.java

Lines changed: 7 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -50,18 +50,18 @@
5050
* previously referred to a non-Iceberg table will refer to the newly migrated Iceberg
5151
* table.
5252
*/
53-
public class BaseMigrateTableSparkAction
54-
extends BaseTableCreationSparkAction<MigrateTable, MigrateTable.Result>
53+
public class MigrateTableSparkAction
54+
extends BaseTableCreationSparkAction<MigrateTableSparkAction>
5555
implements MigrateTable {
5656

57-
private static final Logger LOG = LoggerFactory.getLogger(BaseMigrateTableSparkAction.class);
57+
private static final Logger LOG = LoggerFactory.getLogger(MigrateTableSparkAction.class);
5858
private static final String BACKUP_SUFFIX = "_BACKUP_";
5959

6060
private final StagingTableCatalog destCatalog;
6161
private final Identifier destTableIdent;
6262
private final Identifier backupIdent;
6363

64-
public BaseMigrateTableSparkAction(SparkSession spark, CatalogPlugin sourceCatalog, Identifier sourceTableIdent) {
64+
MigrateTableSparkAction(SparkSession spark, CatalogPlugin sourceCatalog, Identifier sourceTableIdent) {
6565
super(spark, sourceCatalog, sourceTableIdent);
6666
this.destCatalog = checkDestinationCatalog(sourceCatalog);
6767
this.destTableIdent = sourceTableIdent;
@@ -70,7 +70,7 @@ public BaseMigrateTableSparkAction(SparkSession spark, CatalogPlugin sourceCatal
7070
}
7171

7272
@Override
73-
protected MigrateTable self() {
73+
protected MigrateTableSparkAction self() {
7474
return this;
7575
}
7676

@@ -85,13 +85,13 @@ protected Identifier destTableIdent() {
8585
}
8686

8787
@Override
88-
public MigrateTable tableProperties(Map<String, String> properties) {
88+
public MigrateTableSparkAction tableProperties(Map<String, String> properties) {
8989
setProperties(properties);
9090
return this;
9191
}
9292

9393
@Override
94-
public MigrateTable tableProperty(String property, String value) {
94+
public MigrateTableSparkAction tableProperty(String property, String value) {
9595
setProperty(property, value);
9696
return this;
9797
}

spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/actions/BaseRewriteDataFilesSparkAction.java renamed to spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/actions/RewriteDataFilesSparkAction.java

Lines changed: 11 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -72,10 +72,11 @@
7272
import org.slf4j.Logger;
7373
import org.slf4j.LoggerFactory;
7474

75-
public class BaseRewriteDataFilesSparkAction
76-
extends BaseSnapshotUpdateSparkAction<RewriteDataFiles, RewriteDataFiles.Result> implements RewriteDataFiles {
75+
public class RewriteDataFilesSparkAction
76+
extends BaseSnapshotUpdateSparkAction<RewriteDataFilesSparkAction>
77+
implements RewriteDataFiles {
7778

78-
private static final Logger LOG = LoggerFactory.getLogger(BaseRewriteDataFilesSparkAction.class);
79+
private static final Logger LOG = LoggerFactory.getLogger(RewriteDataFilesSparkAction.class);
7980
private static final Set<String> VALID_OPTIONS = ImmutableSet.of(
8081
MAX_CONCURRENT_FILE_GROUP_REWRITES,
8182
MAX_FILE_GROUP_SIZE_BYTES,
@@ -96,50 +97,50 @@ public class BaseRewriteDataFilesSparkAction
9697
private RewriteJobOrder rewriteJobOrder;
9798
private RewriteStrategy strategy = null;
9899

99-
protected BaseRewriteDataFilesSparkAction(SparkSession spark, Table table) {
100+
RewriteDataFilesSparkAction(SparkSession spark, Table table) {
100101
super(spark);
101102
this.table = table;
102103
}
103104

104105
@Override
105-
protected RewriteDataFiles self() {
106+
protected RewriteDataFilesSparkAction self() {
106107
return this;
107108
}
108109

109110
@Override
110-
public RewriteDataFiles binPack() {
111+
public RewriteDataFilesSparkAction binPack() {
111112
Preconditions.checkArgument(this.strategy == null,
112113
"Cannot set strategy to binpack, it has already been set", this.strategy);
113114
this.strategy = binPackStrategy();
114115
return this;
115116
}
116117

117118
@Override
118-
public RewriteDataFiles sort(SortOrder sortOrder) {
119+
public RewriteDataFilesSparkAction sort(SortOrder sortOrder) {
119120
Preconditions.checkArgument(this.strategy == null,
120121
"Cannot set strategy to sort, it has already been set to %s", this.strategy);
121122
this.strategy = sortStrategy().sortOrder(sortOrder);
122123
return this;
123124
}
124125

125126
@Override
126-
public RewriteDataFiles sort() {
127+
public RewriteDataFilesSparkAction sort() {
127128
Preconditions.checkArgument(this.strategy == null,
128129
"Cannot set strategy to sort, it has already been set to %s", this.strategy);
129130
this.strategy = sortStrategy();
130131
return this;
131132
}
132133

133134
@Override
134-
public RewriteDataFiles zOrder(String... columnNames) {
135+
public RewriteDataFilesSparkAction zOrder(String... columnNames) {
135136
Preconditions.checkArgument(this.strategy == null,
136137
"Cannot set strategy to zorder, it has already been set to %s", this.strategy);
137138
this.strategy = zOrderStrategy(columnNames);
138139
return this;
139140
}
140141

141142
@Override
142-
public RewriteDataFiles filter(Expression expression) {
143+
public RewriteDataFilesSparkAction filter(Expression expression) {
143144
filter = Expressions.and(filter, expression);
144145
return this;
145146
}

0 commit comments

Comments
 (0)