Skip to content

Commit 29e60c4

Browse files
bryanckdanielcweeks
authored andcommitted
Spark: broadcast table instead of file IO in rewrite manifests (#7263)
1 parent f09f1e1 commit 29e60c4

File tree

1 file changed

+16
-16
lines changed

1 file changed

+16
-16
lines changed

spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/actions/RewriteManifestsSparkAction.java

Lines changed: 16 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -36,6 +36,7 @@
3636
import org.apache.iceberg.ManifestWriter;
3737
import org.apache.iceberg.PartitionSpec;
3838
import org.apache.iceberg.Partitioning;
39+
import org.apache.iceberg.SerializableTable;
3940
import org.apache.iceberg.Snapshot;
4041
import org.apache.iceberg.Table;
4142
import org.apache.iceberg.TableOperations;
@@ -44,15 +45,13 @@
4445
import org.apache.iceberg.actions.RewriteManifests;
4546
import org.apache.iceberg.exceptions.CommitStateUnknownException;
4647
import org.apache.iceberg.exceptions.ValidationException;
47-
import org.apache.iceberg.io.FileIO;
4848
import org.apache.iceberg.io.OutputFile;
4949
import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
5050
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList;
5151
import org.apache.iceberg.relocated.com.google.common.collect.Iterables;
5252
import org.apache.iceberg.relocated.com.google.common.collect.Lists;
5353
import org.apache.iceberg.spark.JobGroupInfo;
5454
import org.apache.iceberg.spark.SparkDataFile;
55-
import org.apache.iceberg.spark.SparkUtil;
5655
import org.apache.iceberg.types.Types;
5756
import org.apache.iceberg.util.PropertyUtil;
5857
import org.apache.iceberg.util.Tasks;
@@ -90,7 +89,6 @@ public class RewriteManifestsSparkAction
9089
private final Encoder<ManifestFile> manifestEncoder;
9190
private final Table table;
9291
private final int formatVersion;
93-
private final FileIO fileIO;
9492
private final long targetManifestSizeBytes;
9593

9694
private PartitionSpec spec = null;
@@ -107,7 +105,6 @@ public class RewriteManifestsSparkAction
107105
table.properties(),
108106
TableProperties.MANIFEST_TARGET_SIZE_BYTES,
109107
TableProperties.MANIFEST_TARGET_SIZE_BYTES_DEFAULT);
110-
this.fileIO = SparkUtil.serializableFileIO(table);
111108

112109
// default the staging location to the metadata location
113110
TableOperations ops = ((HasTableOperations) table).operations();
@@ -216,7 +213,7 @@ private Dataset<Row> buildManifestEntryDF(List<ManifestFile> manifests) {
216213

217214
private List<ManifestFile> writeManifestsForUnpartitionedTable(
218215
Dataset<Row> manifestEntryDF, int numManifests) {
219-
Broadcast<FileIO> io = sparkContext().broadcast(fileIO);
216+
Broadcast<Table> tableBroadcast = sparkContext().broadcast(SerializableTable.copyOf(table));
220217
StructType sparkType = (StructType) manifestEntryDF.schema().apply("data_file").dataType();
221218
Types.StructType combinedPartitionType = Partitioning.partitionType(table);
222219

@@ -228,7 +225,7 @@ private List<ManifestFile> writeManifestsForUnpartitionedTable(
228225
.repartition(numManifests)
229226
.mapPartitions(
230227
toManifests(
231-
io,
228+
tableBroadcast,
232229
maxNumManifestEntries,
233230
stagingLocation,
234231
formatVersion,
@@ -242,7 +239,7 @@ private List<ManifestFile> writeManifestsForUnpartitionedTable(
242239
private List<ManifestFile> writeManifestsForPartitionedTable(
243240
Dataset<Row> manifestEntryDF, int numManifests, int targetNumManifestEntries) {
244241

245-
Broadcast<FileIO> io = sparkContext().broadcast(fileIO);
242+
Broadcast<Table> tableBroadcast = sparkContext().broadcast(SerializableTable.copyOf(table));
246243
StructType sparkType = (StructType) manifestEntryDF.schema().apply("data_file").dataType();
247244
Types.StructType combinedPartitionType = Partitioning.partitionType(table);
248245

@@ -258,7 +255,7 @@ private List<ManifestFile> writeManifestsForPartitionedTable(
258255
.sortWithinPartitions(partitionColumn)
259256
.mapPartitions(
260257
toManifests(
261-
io,
258+
tableBroadcast,
262259
maxNumManifestEntries,
263260
stagingLocation,
264261
formatVersion,
@@ -298,7 +295,7 @@ private List<ManifestFile> findMatchingManifests() {
298295
return ImmutableList.of();
299296
}
300297

301-
return currentSnapshot.dataManifests(fileIO).stream()
298+
return currentSnapshot.dataManifests(table.io()).stream()
302299
.filter(manifest -> manifest.partitionSpecId() == spec.specId() && predicate.test(manifest))
303300
.collect(Collectors.toList());
304301
}
@@ -351,14 +348,14 @@ private void deleteFiles(Iterable<String> locations) {
351348
.noRetry()
352349
.suppressFailureWhenFinished()
353350
.onFailure((location, exc) -> LOG.warn("Failed to delete: {}", location, exc))
354-
.run(fileIO::deleteFile);
351+
.run(location -> table.io().deleteFile(location));
355352
}
356353

357354
private static ManifestFile writeManifest(
358355
List<Row> rows,
359356
int startIndex,
360357
int endIndex,
361-
Broadcast<FileIO> io,
358+
Broadcast<Table> tableBroadcast,
362359
String location,
363360
int format,
364361
Types.StructType combinedPartitionType,
@@ -369,7 +366,10 @@ private static ManifestFile writeManifest(
369366
String manifestName = "optimized-m-" + UUID.randomUUID();
370367
Path manifestPath = new Path(location, manifestName);
371368
OutputFile outputFile =
372-
io.value().newOutputFile(FileFormat.AVRO.addExtension(manifestPath.toString()));
369+
tableBroadcast
370+
.value()
371+
.io()
372+
.newOutputFile(FileFormat.AVRO.addExtension(manifestPath.toString()));
373373

374374
Types.StructType combinedFileType = DataFile.getType(combinedPartitionType);
375375
Types.StructType manifestFileType = DataFile.getType(spec.partitionType());
@@ -394,7 +394,7 @@ private static ManifestFile writeManifest(
394394
}
395395

396396
private static MapPartitionsFunction<Row, ManifestFile> toManifests(
397-
Broadcast<FileIO> io,
397+
Broadcast<Table> tableBroadcast,
398398
long maxNumManifestEntries,
399399
String location,
400400
int format,
@@ -416,7 +416,7 @@ private static MapPartitionsFunction<Row, ManifestFile> toManifests(
416416
rowsAsList,
417417
0,
418418
rowsAsList.size(),
419-
io,
419+
tableBroadcast,
420420
location,
421421
format,
422422
combinedPartitionType,
@@ -429,7 +429,7 @@ private static MapPartitionsFunction<Row, ManifestFile> toManifests(
429429
rowsAsList,
430430
0,
431431
midIndex,
432-
io,
432+
tableBroadcast,
433433
location,
434434
format,
435435
combinedPartitionType,
@@ -440,7 +440,7 @@ private static MapPartitionsFunction<Row, ManifestFile> toManifests(
440440
rowsAsList,
441441
midIndex,
442442
rowsAsList.size(),
443-
io,
443+
tableBroadcast,
444444
location,
445445
format,
446446
combinedPartitionType,

0 commit comments

Comments
 (0)