Skip to content

Commit 0d9c63e

Browse files
coolderliJack Ye
authored andcommitted
Flink: Ensure temp manifest names are unique across tasks (#3986)
1 parent 614ec11 commit 0d9c63e

File tree

5 files changed

+32
-13
lines changed

5 files changed

+32
-13
lines changed

flink/v1.14/flink/src/main/java/org/apache/iceberg/flink/sink/FlinkManifestUtil.java

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -61,10 +61,11 @@ static List<DataFile> readDataFiles(ManifestFile manifestFile, FileIO io) throws
6161
}
6262
}
6363

64-
static ManifestOutputFileFactory createOutputFileFactory(Table table, String flinkJobId, int subTaskId,
65-
long attemptNumber) {
64+
static ManifestOutputFileFactory createOutputFileFactory(Table table, String flinkJobId, String operatorUniqueId,
65+
int subTaskId, long attemptNumber) {
6666
TableOperations ops = ((HasTableOperations) table).operations();
67-
return new ManifestOutputFileFactory(ops, table.io(), table.properties(), flinkJobId, subTaskId, attemptNumber);
67+
return new ManifestOutputFileFactory(ops, table.io(), table.properties(), flinkJobId, operatorUniqueId,
68+
subTaskId, attemptNumber);
6869
}
6970

7071
static DeltaManifests writeCompletedFiles(WriteResult result,

flink/v1.14/flink/src/main/java/org/apache/iceberg/flink/sink/IcebergFilesCommitter.java

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -129,7 +129,9 @@ public void initializeState(StateInitializationContext context) throws Exception
129129

130130
int subTaskId = getRuntimeContext().getIndexOfThisSubtask();
131131
int attemptId = getRuntimeContext().getAttemptNumber();
132-
this.manifestOutputFileFactory = FlinkManifestUtil.createOutputFileFactory(table, flinkJobId, subTaskId, attemptId);
132+
String operatorUniqueId = getRuntimeContext().getOperatorUniqueID();
133+
this.manifestOutputFileFactory = FlinkManifestUtil.createOutputFileFactory(table, flinkJobId, operatorUniqueId,
134+
subTaskId, attemptId);
133135
this.maxCommittedCheckpointId = INITIAL_CHECKPOINT_ID;
134136

135137
this.checkpointsState = context.getOperatorStateStore().getListState(STATE_DESCRIPTOR);

flink/v1.14/flink/src/main/java/org/apache/iceberg/flink/sink/ManifestOutputFileFactory.java

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -35,23 +35,25 @@ class ManifestOutputFileFactory {
3535
private final FileIO io;
3636
private final Map<String, String> props;
3737
private final String flinkJobId;
38+
private final String operatorUniqueId;
3839
private final int subTaskId;
3940
private final long attemptNumber;
4041
private final AtomicInteger fileCount = new AtomicInteger(0);
4142

4243
ManifestOutputFileFactory(TableOperations ops, FileIO io, Map<String, String> props,
43-
String flinkJobId, int subTaskId, long attemptNumber) {
44+
String flinkJobId, String operatorUniqueId, int subTaskId, long attemptNumber) {
4445
this.ops = ops;
4546
this.io = io;
4647
this.props = props;
4748
this.flinkJobId = flinkJobId;
49+
this.operatorUniqueId = operatorUniqueId;
4850
this.subTaskId = subTaskId;
4951
this.attemptNumber = attemptNumber;
5052
}
5153

5254
private String generatePath(long checkpointId) {
53-
return FileFormat.AVRO.addExtension(String.format("%s-%05d-%d-%d-%05d", flinkJobId, subTaskId,
54-
attemptNumber, checkpointId, fileCount.incrementAndGet()));
55+
return FileFormat.AVRO.addExtension(String.format("%s-%s-%05d-%d-%d-%05d", flinkJobId, operatorUniqueId,
56+
subTaskId, attemptNumber, checkpointId, fileCount.incrementAndGet()));
5557
}
5658

5759
OutputFile create(long checkpointId) {

flink/v1.14/flink/src/test/java/org/apache/iceberg/flink/sink/TestFlinkManifest.java

Lines changed: 14 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -87,9 +87,10 @@ public void before() throws IOException {
8787
@Test
8888
public void testIO() throws IOException {
8989
String flinkJobId = newFlinkJobId();
90+
String operatorId = newOperatorUniqueId();
9091
for (long checkpointId = 1; checkpointId <= 3; checkpointId++) {
9192
ManifestOutputFileFactory factory =
92-
FlinkManifestUtil.createOutputFileFactory(table, flinkJobId, 1, 1);
93+
FlinkManifestUtil.createOutputFileFactory(table, flinkJobId, operatorId, 1, 1);
9394
final long curCkpId = checkpointId;
9495

9596
List<DataFile> dataFiles = generateDataFiles(10);
@@ -122,11 +123,12 @@ public void testIO() throws IOException {
122123
public void testUserProvidedManifestLocation() throws IOException {
123124
long checkpointId = 1;
124125
String flinkJobId = newFlinkJobId();
126+
String operatorId = newOperatorUniqueId();
125127
File userProvidedFolder = tempFolder.newFolder();
126128
Map<String, String> props = ImmutableMap.of(FLINK_MANIFEST_LOCATION, userProvidedFolder.getAbsolutePath() + "///");
127129
ManifestOutputFileFactory factory = new ManifestOutputFileFactory(
128130
((HasTableOperations) table).operations(), table.io(), props,
129-
flinkJobId, 1, 1);
131+
flinkJobId, operatorId, 1, 1);
130132

131133
List<DataFile> dataFiles = generateDataFiles(5);
132134
DeltaManifests deltaManifests = FlinkManifestUtil.writeCompletedFiles(
@@ -156,7 +158,9 @@ public void testUserProvidedManifestLocation() throws IOException {
156158
public void testVersionedSerializer() throws IOException {
157159
long checkpointId = 1;
158160
String flinkJobId = newFlinkJobId();
159-
ManifestOutputFileFactory factory = FlinkManifestUtil.createOutputFileFactory(table, flinkJobId, 1, 1);
161+
String operatorId = newOperatorUniqueId();
162+
ManifestOutputFileFactory factory = FlinkManifestUtil.createOutputFileFactory(table, flinkJobId, operatorId,
163+
1, 1);
160164

161165
List<DataFile> dataFiles = generateDataFiles(10);
162166
List<DeleteFile> eqDeleteFiles = generateEqDeleteFiles(10);
@@ -186,7 +190,9 @@ public void testCompatibility() throws IOException {
186190
// The v2 deserializer should be able to deserialize the v1 binary.
187191
long checkpointId = 1;
188192
String flinkJobId = newFlinkJobId();
189-
ManifestOutputFileFactory factory = FlinkManifestUtil.createOutputFileFactory(table, flinkJobId, 1, 1);
193+
String operatorId = newOperatorUniqueId();
194+
ManifestOutputFileFactory factory = FlinkManifestUtil.createOutputFileFactory(table, flinkJobId, operatorId,
195+
1, 1);
190196

191197
List<DataFile> dataFiles = generateDataFiles(10);
192198
ManifestFile manifest = FlinkManifestUtil.writeDataFiles(factory.create(checkpointId), table.spec(), dataFiles);
@@ -271,4 +277,8 @@ private List<DeleteFile> generatePosDeleteFiles(int fileNum) throws IOException
271277
private static String newFlinkJobId() {
272278
return UUID.randomUUID().toString();
273279
}
280+
281+
private static String newOperatorUniqueId() {
282+
return UUID.randomUUID().toString();
283+
}
274284
}

flink/v1.14/flink/src/test/java/org/apache/iceberg/flink/sink/TestIcebergFilesCommitter.java

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -599,8 +599,10 @@ public void testFlinkManifests() throws Exception {
599599
harness.snapshot(checkpoint, ++timestamp);
600600
List<Path> manifestPaths = assertFlinkManifests(1);
601601
Path manifestPath = manifestPaths.get(0);
602+
String operatorId = harness.getOneInputOperator().getOperatorID().toString();
602603
Assert.assertEquals("File name should have the expected pattern.",
603-
String.format("%s-%05d-%d-%d-%05d.avro", jobId, 0, 0, checkpoint, 1), manifestPath.getFileName().toString());
604+
String.format("%s-%s-%05d-%d-%d-%05d.avro", jobId, operatorId, 0, 0, checkpoint, 1),
605+
manifestPath.getFileName().toString());
604606

605607
// 2. Read the data files from manifests and assert.
606608
List<DataFile> dataFiles = FlinkManifestUtil.readDataFiles(createTestingManifestFile(manifestPath), table.io());
@@ -640,8 +642,10 @@ public void testDeleteFiles() throws Exception {
640642
harness.snapshot(checkpoint, ++timestamp);
641643
List<Path> manifestPaths = assertFlinkManifests(1);
642644
Path manifestPath = manifestPaths.get(0);
645+
String operatorId = harness.getOneInputOperator().getOperatorID().toString();
643646
Assert.assertEquals("File name should have the expected pattern.",
644-
String.format("%s-%05d-%d-%d-%05d.avro", jobId, 0, 0, checkpoint, 1), manifestPath.getFileName().toString());
647+
String.format("%s-%s-%05d-%d-%d-%05d.avro", jobId, operatorId, 0, 0, checkpoint, 1),
648+
manifestPath.getFileName().toString());
645649

646650
// 2. Read the data files from manifests and assert.
647651
List<DataFile> dataFiles = FlinkManifestUtil.readDataFiles(createTestingManifestFile(manifestPath), table.io());

0 commit comments

Comments
 (0)