Skip to content

Commit 6d2edd6

Browse files
authored
Spark 3.2: Expose action classes in SparkActions (#5261)
1 parent 2ad3c1a commit 6d2edd6

24 files changed

+2172
-1856
lines changed

spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/actions/BaseDeleteOrphanFilesSparkAction.java

Lines changed: 12 additions & 306 deletions
Original file line numberDiff line numberDiff line change
@@ -19,346 +19,52 @@
1919

2020
package org.apache.iceberg.spark.actions;
2121

22-
import java.io.File;
23-
import java.io.IOException;
24-
import java.io.Serializable;
25-
import java.sql.Timestamp;
26-
import java.util.Iterator;
27-
import java.util.List;
28-
import java.util.Map;
29-
import java.util.Set;
3022
import java.util.concurrent.ExecutorService;
31-
import java.util.concurrent.TimeUnit;
3223
import java.util.function.Consumer;
33-
import java.util.function.Predicate;
34-
import java.util.stream.Collectors;
35-
import org.apache.hadoop.conf.Configuration;
36-
import org.apache.hadoop.fs.FileStatus;
37-
import org.apache.hadoop.fs.FileSystem;
38-
import org.apache.hadoop.fs.Path;
39-
import org.apache.hadoop.fs.PathFilter;
40-
import org.apache.iceberg.PartitionSpec;
4124
import org.apache.iceberg.Table;
42-
import org.apache.iceberg.actions.BaseDeleteOrphanFilesActionResult;
43-
import org.apache.iceberg.actions.DeleteOrphanFiles;
44-
import org.apache.iceberg.exceptions.RuntimeIOException;
45-
import org.apache.iceberg.exceptions.ValidationException;
46-
import org.apache.iceberg.hadoop.HiddenPathFilter;
47-
import org.apache.iceberg.relocated.com.google.common.annotations.VisibleForTesting;
48-
import org.apache.iceberg.relocated.com.google.common.base.Joiner;
49-
import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
50-
import org.apache.iceberg.relocated.com.google.common.collect.Lists;
51-
import org.apache.iceberg.spark.JobGroupInfo;
52-
import org.apache.iceberg.util.PropertyUtil;
53-
import org.apache.iceberg.util.Tasks;
54-
import org.apache.spark.api.java.JavaRDD;
55-
import org.apache.spark.api.java.function.FlatMapFunction;
56-
import org.apache.spark.broadcast.Broadcast;
57-
import org.apache.spark.sql.Column;
5825
import org.apache.spark.sql.Dataset;
59-
import org.apache.spark.sql.Encoders;
6026
import org.apache.spark.sql.Row;
6127
import org.apache.spark.sql.SparkSession;
62-
import org.apache.spark.sql.expressions.UserDefinedFunction;
63-
import org.apache.spark.sql.functions;
64-
import org.apache.spark.sql.types.DataTypes;
65-
import org.apache.spark.sql.types.StructField;
66-
import org.apache.spark.sql.types.StructType;
67-
import org.apache.spark.util.SerializableConfiguration;
68-
import org.slf4j.Logger;
69-
import org.slf4j.LoggerFactory;
70-
71-
import static org.apache.iceberg.TableProperties.GC_ENABLED;
72-
import static org.apache.iceberg.TableProperties.GC_ENABLED_DEFAULT;
7328

7429
/**
75-
* An action that removes orphan metadata, data and delete files by listing a given location and comparing
76-
* the actual files in that location with content and metadata files referenced by all valid snapshots.
77-
* The location must be accessible for listing via the Hadoop {@link FileSystem}.
78-
* <p>
79-
* By default, this action cleans up the table location returned by {@link Table#location()} and
80-
* removes unreachable files that are older than 3 days using {@link Table#io()}. The behavior can be modified
81-
* by passing a custom location to {@link #location} and a custom timestamp to {@link #olderThan(long)}.
82-
* For example, someone might point this action to the data folder to clean up only orphan data files.
83-
* <p>
84-
* Configure an alternative delete method using {@link #deleteWith(Consumer)}.
85-
* <p>
86-
* For full control of the set of files being evaluated, use the {@link #compareToFileList(Dataset)} argument. This
87-
* skips the directory listing - any files in the dataset provided which are not found in table metadata will
88-
* be deleted, using the same {@link Table#location()} and {@link #olderThan(long)} filtering as above.
89-
* <p>
90-
* <em>Note:</em> It is dangerous to call this action with a short retention interval as it might corrupt
91-
* the state of the table if another operation is writing at the same time.
30+
* An action to delete orphan files.
31+
*
32+
* @deprecated since 0.14.0, will be removed in 1.0.0;
33+
* use {@link SparkActions} and {@link DeleteOrphanFilesSparkAction} instead.
9234
*/
93-
public class BaseDeleteOrphanFilesSparkAction
94-
extends BaseSparkAction<DeleteOrphanFiles, DeleteOrphanFiles.Result> implements DeleteOrphanFiles {
95-
96-
private static final Logger LOG = LoggerFactory.getLogger(BaseDeleteOrphanFilesSparkAction.class);
97-
private static final UserDefinedFunction filenameUDF = functions.udf((String path) -> {
98-
int lastIndex = path.lastIndexOf(File.separator);
99-
if (lastIndex == -1) {
100-
return path;
101-
} else {
102-
return path.substring(lastIndex + 1);
103-
}
104-
}, DataTypes.StringType);
105-
106-
private final SerializableConfiguration hadoopConf;
107-
private final int partitionDiscoveryParallelism;
108-
private final Table table;
109-
private final Consumer<String> defaultDelete = new Consumer<String>() {
110-
@Override
111-
public void accept(String file) {
112-
table.io().deleteFile(file);
113-
}
114-
};
115-
116-
private String location = null;
117-
private long olderThanTimestamp = System.currentTimeMillis() - TimeUnit.DAYS.toMillis(3);
118-
private Dataset<Row> compareToFileList;
119-
private Consumer<String> deleteFunc = defaultDelete;
120-
private ExecutorService deleteExecutorService = null;
35+
@Deprecated
36+
public class BaseDeleteOrphanFilesSparkAction extends DeleteOrphanFilesSparkAction {
12137

12238
public BaseDeleteOrphanFilesSparkAction(SparkSession spark, Table table) {
123-
super(spark);
124-
125-
this.hadoopConf = new SerializableConfiguration(spark.sessionState().newHadoopConf());
126-
this.partitionDiscoveryParallelism = spark.sessionState().conf().parallelPartitionDiscoveryParallelism();
127-
this.table = table;
128-
this.location = table.location();
129-
130-
ValidationException.check(
131-
PropertyUtil.propertyAsBoolean(table.properties(), GC_ENABLED, GC_ENABLED_DEFAULT),
132-
"Cannot delete orphan files: GC is disabled (deleting files may corrupt other tables)");
133-
}
134-
135-
@Override
136-
protected DeleteOrphanFiles self() {
137-
return this;
39+
super(spark, table);
13840
}
13941

14042
@Override
14143
public BaseDeleteOrphanFilesSparkAction executeDeleteWith(ExecutorService executorService) {
142-
this.deleteExecutorService = executorService;
44+
super.executeDeleteWith(executorService);
14345
return this;
14446
}
14547

14648
@Override
14749
public BaseDeleteOrphanFilesSparkAction location(String newLocation) {
148-
this.location = newLocation;
50+
super.location(newLocation);
14951
return this;
15052
}
15153

15254
@Override
15355
public BaseDeleteOrphanFilesSparkAction olderThan(long newOlderThanTimestamp) {
154-
this.olderThanTimestamp = newOlderThanTimestamp;
56+
super.olderThan(newOlderThanTimestamp);
15557
return this;
15658
}
15759

15860
@Override
15961
public BaseDeleteOrphanFilesSparkAction deleteWith(Consumer<String> newDeleteFunc) {
160-
this.deleteFunc = newDeleteFunc;
62+
super.deleteWith(newDeleteFunc);
16163
return this;
16264
}
16365

16466
public BaseDeleteOrphanFilesSparkAction compareToFileList(Dataset<Row> files) {
165-
StructType schema = files.schema();
166-
167-
StructField filePathField = schema.apply(FILE_PATH);
168-
Preconditions.checkArgument(
169-
filePathField.dataType() == DataTypes.StringType,
170-
"Invalid %s column: %s is not a string",
171-
FILE_PATH,
172-
filePathField.dataType());
173-
174-
StructField lastModifiedField = schema.apply(LAST_MODIFIED);
175-
Preconditions.checkArgument(
176-
lastModifiedField.dataType() == DataTypes.TimestampType,
177-
"Invalid %s column: %s is not a timestamp",
178-
LAST_MODIFIED,
179-
lastModifiedField.dataType());
180-
181-
this.compareToFileList = files;
67+
super.compareToFileList(files);
18268
return this;
18369
}
184-
185-
private Dataset<Row> filteredCompareToFileList() {
186-
Dataset<Row> files = compareToFileList;
187-
if (location != null) {
188-
files = files.filter(files.col(FILE_PATH).startsWith(location));
189-
}
190-
return files
191-
.filter(files.col(LAST_MODIFIED).lt(new Timestamp(olderThanTimestamp)))
192-
.select(files.col(FILE_PATH));
193-
}
194-
195-
@Override
196-
public DeleteOrphanFiles.Result execute() {
197-
JobGroupInfo info = newJobGroupInfo("DELETE-ORPHAN-FILES", jobDesc());
198-
return withJobGroupInfo(info, this::doExecute);
199-
}
200-
201-
private String jobDesc() {
202-
List<String> options = Lists.newArrayList();
203-
options.add("older_than=" + olderThanTimestamp);
204-
if (location != null) {
205-
options.add("location=" + location);
206-
}
207-
return String.format("Deleting orphan files (%s) from %s", Joiner.on(',').join(options), table.name());
208-
}
209-
210-
private DeleteOrphanFiles.Result doExecute() {
211-
Dataset<Row> validContentFileDF = buildValidContentFileDF(table);
212-
Dataset<Row> validMetadataFileDF = buildValidMetadataFileDF(table);
213-
Dataset<Row> validFileDF = validContentFileDF.union(validMetadataFileDF);
214-
Dataset<Row> actualFileDF = compareToFileList == null ? buildActualFileDF() : filteredCompareToFileList();
215-
216-
Column actualFileName = filenameUDF.apply(actualFileDF.col(FILE_PATH));
217-
Column validFileName = filenameUDF.apply(validFileDF.col(FILE_PATH));
218-
Column nameEqual = actualFileName.equalTo(validFileName);
219-
Column actualContains = actualFileDF.col(FILE_PATH).contains(validFileDF.col(FILE_PATH));
220-
Column joinCond = nameEqual.and(actualContains);
221-
List<String> orphanFiles = actualFileDF.join(validFileDF, joinCond, "leftanti")
222-
.as(Encoders.STRING())
223-
.collectAsList();
224-
225-
Tasks.foreach(orphanFiles)
226-
.noRetry()
227-
.executeWith(deleteExecutorService)
228-
.suppressFailureWhenFinished()
229-
.onFailure((file, exc) -> LOG.warn("Failed to delete file: {}", file, exc))
230-
.run(deleteFunc::accept);
231-
232-
return new BaseDeleteOrphanFilesActionResult(orphanFiles);
233-
}
234-
235-
private Dataset<Row> buildActualFileDF() {
236-
List<String> subDirs = Lists.newArrayList();
237-
List<String> matchingFiles = Lists.newArrayList();
238-
239-
Predicate<FileStatus> predicate = file -> file.getModificationTime() < olderThanTimestamp;
240-
PathFilter pathFilter = PartitionAwareHiddenPathFilter.forSpecs(table.specs());
241-
242-
// list at most 3 levels and only dirs that have less than 10 direct sub dirs on the driver
243-
listDirRecursively(location, predicate, hadoopConf.value(), 3, 10, subDirs, pathFilter, matchingFiles);
244-
245-
JavaRDD<String> matchingFileRDD = sparkContext().parallelize(matchingFiles, 1);
246-
247-
if (subDirs.isEmpty()) {
248-
return spark().createDataset(matchingFileRDD.rdd(), Encoders.STRING()).toDF(FILE_PATH);
249-
}
250-
251-
int parallelism = Math.min(subDirs.size(), partitionDiscoveryParallelism);
252-
JavaRDD<String> subDirRDD = sparkContext().parallelize(subDirs, parallelism);
253-
254-
Broadcast<SerializableConfiguration> conf = sparkContext().broadcast(hadoopConf);
255-
JavaRDD<String> matchingLeafFileRDD = subDirRDD.mapPartitions(
256-
listDirsRecursively(conf, olderThanTimestamp, pathFilter)
257-
);
258-
259-
JavaRDD<String> completeMatchingFileRDD = matchingFileRDD.union(matchingLeafFileRDD);
260-
return spark().createDataset(completeMatchingFileRDD.rdd(), Encoders.STRING()).toDF(FILE_PATH);
261-
}
262-
263-
private static void listDirRecursively(
264-
String dir, Predicate<FileStatus> predicate, Configuration conf, int maxDepth,
265-
int maxDirectSubDirs, List<String> remainingSubDirs, PathFilter pathFilter, List<String> matchingFiles) {
266-
267-
// stop listing whenever we reach the max depth
268-
if (maxDepth <= 0) {
269-
remainingSubDirs.add(dir);
270-
return;
271-
}
272-
273-
try {
274-
Path path = new Path(dir);
275-
FileSystem fs = path.getFileSystem(conf);
276-
277-
List<String> subDirs = Lists.newArrayList();
278-
279-
for (FileStatus file : fs.listStatus(path, pathFilter)) {
280-
if (file.isDirectory()) {
281-
subDirs.add(file.getPath().toString());
282-
} else if (file.isFile() && predicate.test(file)) {
283-
matchingFiles.add(file.getPath().toString());
284-
}
285-
}
286-
287-
// stop listing if the number of direct sub dirs is bigger than maxDirectSubDirs
288-
if (subDirs.size() > maxDirectSubDirs) {
289-
remainingSubDirs.addAll(subDirs);
290-
return;
291-
}
292-
293-
for (String subDir : subDirs) {
294-
listDirRecursively(
295-
subDir, predicate, conf, maxDepth - 1, maxDirectSubDirs, remainingSubDirs, pathFilter, matchingFiles);
296-
}
297-
} catch (IOException e) {
298-
throw new RuntimeIOException(e);
299-
}
300-
}
301-
302-
private static FlatMapFunction<Iterator<String>, String> listDirsRecursively(
303-
Broadcast<SerializableConfiguration> conf,
304-
long olderThanTimestamp,
305-
PathFilter pathFilter) {
306-
307-
return dirs -> {
308-
List<String> subDirs = Lists.newArrayList();
309-
List<String> files = Lists.newArrayList();
310-
311-
Predicate<FileStatus> predicate = file -> file.getModificationTime() < olderThanTimestamp;
312-
313-
int maxDepth = 2000;
314-
int maxDirectSubDirs = Integer.MAX_VALUE;
315-
316-
dirs.forEachRemaining(dir -> {
317-
listDirRecursively(
318-
dir, predicate, conf.value().value(), maxDepth, maxDirectSubDirs, subDirs, pathFilter, files);
319-
});
320-
321-
if (!subDirs.isEmpty()) {
322-
throw new RuntimeException("Could not list subdirectories, reached maximum subdirectory depth: " + maxDepth);
323-
}
324-
325-
return files.iterator();
326-
};
327-
}
328-
329-
/**
330-
* A {@link PathFilter} that filters out hidden path, but does not filter out paths that would be marked
331-
* as hidden by {@link HiddenPathFilter} due to a partition field that starts with one of the characters that
332-
* indicate a hidden path.
333-
*/
334-
@VisibleForTesting
335-
static class PartitionAwareHiddenPathFilter implements PathFilter, Serializable {
336-
337-
private final Set<String> hiddenPathPartitionNames;
338-
339-
PartitionAwareHiddenPathFilter(Set<String> hiddenPathPartitionNames) {
340-
this.hiddenPathPartitionNames = hiddenPathPartitionNames;
341-
}
342-
343-
@Override
344-
public boolean accept(Path path) {
345-
boolean isHiddenPartitionPath = hiddenPathPartitionNames.stream().anyMatch(path.getName()::startsWith);
346-
return isHiddenPartitionPath || HiddenPathFilter.get().accept(path);
347-
}
348-
349-
static PathFilter forSpecs(Map<Integer, PartitionSpec> specs) {
350-
if (specs == null) {
351-
return HiddenPathFilter.get();
352-
}
353-
354-
Set<String> partitionNames = specs.values().stream()
355-
.map(PartitionSpec::fields)
356-
.flatMap(List::stream)
357-
.filter(partitionField -> partitionField.name().startsWith("_") || partitionField.name().startsWith("."))
358-
.map(partitionField -> partitionField.name() + "=")
359-
.collect(Collectors.toSet());
360-
361-
return partitionNames.isEmpty() ? HiddenPathFilter.get() : new PartitionAwareHiddenPathFilter(partitionNames);
362-
}
363-
}
36470
}

0 commit comments

Comments
 (0)