From 6ca116d3e13b4b87746aa1a8715df4070ec2d8fa Mon Sep 17 00:00:00 2001 From: lrsb Date: Tue, 21 Apr 2026 14:21:44 +0200 Subject: [PATCH 1/4] Flink: Fix duplicate commits when Flink jobId changes on restart DynamicCommitter dedupes pending commit requests against the table's ancestor snapshots by looking up snapshots whose FLINK_JOB_ID and OPERATOR_ID match the committable that produced them. The lookup used a single (jobId, operatorId) pair picked from the last committable in the batch, which breaks down when the batch spans multiple Flink jobIds. After a stop-with-savepoint, the committer's state replays in-flight committables tagged with the previous Flink jobId alongside fresh committables tagged with the new jobId. Using only the new jobId for dedup hides the snapshot that the previous jobId already produced, so the replayed committable's data files are committed a second time and the affected rows appear twice in the target table. Partition commit requests by (table, branch, jobId, operatorId) so each group is deduped and stamped with its own jobId. The downstream commit pipeline is unchanged; it already assumes a single (jobId, operatorId) per batch, which the new grouping restores in the mixed-jobId case. Change-Id: I2e0322a8f6564f960dd5c4730009c20e533223db --- .../flink/sink/dynamic/CommitGroupKey.java | 70 ++++++++++++ .../flink/sink/dynamic/DynamicCommitter.java | 20 ++-- .../iceberg/flink/sink/dynamic/TableKey.java | 5 - .../sink/dynamic/TestDynamicCommitter.java | 106 ++++++++++++++++++ 4 files changed, 186 insertions(+), 15 deletions(-) create mode 100644 flink/v2.1/flink/src/main/java/org/apache/iceberg/flink/sink/dynamic/CommitGroupKey.java diff --git a/flink/v2.1/flink/src/main/java/org/apache/iceberg/flink/sink/dynamic/CommitGroupKey.java b/flink/v2.1/flink/src/main/java/org/apache/iceberg/flink/sink/dynamic/CommitGroupKey.java new file mode 100644 index 000000000000..8f3aff1dfd6c --- /dev/null +++ b/flink/v2.1/flink/src/main/java/org/apache/iceberg/flink/sink/dynamic/CommitGroupKey.java @@ -0,0 +1,70 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg.flink.sink.dynamic; + +import java.util.Objects; + +class CommitGroupKey { + private final String tableName; + private final String branch; + private final String jobId; + private final String operatorId; + + CommitGroupKey(DynamicCommittable committable) { + this.tableName = committable.key().tableName(); + this.branch = committable.key().branch(); + this.jobId = committable.jobId(); + this.operatorId = committable.operatorId(); + } + + String tableName() { + return tableName; + } + + String branch() { + return branch; + } + + String jobId() { + return jobId; + } + + String operatorId() { + return operatorId; + } + + @Override + public boolean equals(Object other) { + if (this == other) { + return true; + } + if (!(other instanceof CommitGroupKey that)) { + return false; + } + return tableName.equals(that.tableName) + && branch.equals(that.branch) + && Objects.equals(jobId, that.jobId) + && Objects.equals(operatorId, that.operatorId); + } + + @Override + public int hashCode() { + return Objects.hash(tableName, branch, jobId, operatorId); + } +} diff --git a/flink/v2.1/flink/src/main/java/org/apache/iceberg/flink/sink/dynamic/DynamicCommitter.java b/flink/v2.1/flink/src/main/java/org/apache/iceberg/flink/sink/dynamic/DynamicCommitter.java index 5e824773f4bf..366412c5b939 100644 --- a/flink/v2.1/flink/src/main/java/org/apache/iceberg/flink/sink/dynamic/DynamicCommitter.java +++ b/flink/v2.1/flink/src/main/java/org/apache/iceberg/flink/sink/dynamic/DynamicCommitter.java @@ -119,28 +119,28 @@ public void commit(Collection> commitRequests) DynamicWriteResultAggregator. Iceberg 1.12 will remove this, and users should upgrade to the 1.11 release first to migrate their state to a single commit request per checkpoint. */ - Map>>> commitRequestMap = - Maps.newHashMap(); + Map>>> + commitRequestMap = Maps.newHashMap(); for (CommitRequest request : commitRequests) { NavigableMap>> committables = commitRequestMap.computeIfAbsent( - new TableKey(request.getCommittable()), unused -> Maps.newTreeMap()); + new CommitGroupKey(request.getCommittable()), unused -> Maps.newTreeMap()); committables .computeIfAbsent(request.getCommittable().checkpointId(), unused -> Lists.newArrayList()) .add(request); } - for (Map.Entry>>> entry : - commitRequestMap.entrySet()) { - Table table = catalog.loadTable(TableIdentifier.parse(entry.getKey().tableName())); - DynamicCommittable last = entry.getValue().lastEntry().getValue().get(0).getCommittable(); - Snapshot latestSnapshot = table.snapshot(entry.getKey().branch()); + for (Map.Entry>>> + entry : commitRequestMap.entrySet()) { + CommitGroupKey groupKey = entry.getKey(); + Table table = catalog.loadTable(TableIdentifier.parse(groupKey.tableName())); + Snapshot latestSnapshot = table.snapshot(groupKey.branch()); Iterable ancestors = latestSnapshot != null ? SnapshotUtil.ancestorsOf(latestSnapshot.snapshotId(), table::snapshot) : List.of(); long maxCommittedCheckpointId = - getMaxCommittedCheckpointId(ancestors, last.jobId(), last.operatorId()); + getMaxCommittedCheckpointId(ancestors, groupKey.jobId(), groupKey.operatorId()); NavigableMap>> skippedCommitRequests = entry.getValue().headMap(maxCommittedCheckpointId, true); @@ -155,7 +155,7 @@ public void commit(Collection> commitRequests) entry.getValue().tailMap(maxCommittedCheckpointId, false); if (!uncommitted.isEmpty()) { commitPendingRequests( - table, entry.getKey().branch(), uncommitted, last.jobId(), last.operatorId()); + table, groupKey.branch(), uncommitted, groupKey.jobId(), groupKey.operatorId()); } } } diff --git a/flink/v2.1/flink/src/main/java/org/apache/iceberg/flink/sink/dynamic/TableKey.java b/flink/v2.1/flink/src/main/java/org/apache/iceberg/flink/sink/dynamic/TableKey.java index 08b755fe14a8..45a2961b62a8 100644 --- a/flink/v2.1/flink/src/main/java/org/apache/iceberg/flink/sink/dynamic/TableKey.java +++ b/flink/v2.1/flink/src/main/java/org/apache/iceberg/flink/sink/dynamic/TableKey.java @@ -33,11 +33,6 @@ class TableKey { this.branch = branch; } - TableKey(DynamicCommittable committable) { - this.tableName = committable.key().tableName(); - this.branch = committable.key().branch(); - } - String tableName() { return tableName; } diff --git a/flink/v2.1/flink/src/test/java/org/apache/iceberg/flink/sink/dynamic/TestDynamicCommitter.java b/flink/v2.1/flink/src/test/java/org/apache/iceberg/flink/sink/dynamic/TestDynamicCommitter.java index 4cc27151b094..719b22ad1379 100644 --- a/flink/v2.1/flink/src/test/java/org/apache/iceberg/flink/sink/dynamic/TestDynamicCommitter.java +++ b/flink/v2.1/flink/src/test/java/org/apache/iceberg/flink/sink/dynamic/TestDynamicCommitter.java @@ -326,6 +326,112 @@ void testSkipsCommitRequestsForPreviousCheckpoints() throws Exception { .build()); } + @Test + void testDedupsCommittablesTaggedWithPreviousJobIdAfterRestart() throws Exception { + // Reproduces duplication bug: after a stop-with-savepoint rescale, the committer's state + // contains a committable tagged with the previous Flink jobId alongside fresh committables + // tagged with the new jobId. Dedup must recognise that the previous-jobId committable's + // data is already present in the ancestor chain (stamped with the previous jobId) and skip it, + // otherwise the underlying data files are committed twice and rows are duplicated. + Table table = catalog.loadTable(TableIdentifier.of(TABLE1)); + assertThat(table.snapshots()).isEmpty(); + + boolean overwriteMode = false; + int workerPoolSize = 1; + String uidPrefix = "uidPrefix"; + UnregisteredMetricsGroup metricGroup = new UnregisteredMetricsGroup(); + JobID previousJobId = JobID.generate(); + DynamicCommitterMetrics previousCommitterMetrics = new DynamicCommitterMetrics(metricGroup); + DynamicCommitter previousCommitter = + new DynamicCommitter( + CATALOG_EXTENSION.catalog(), + Maps.newHashMap(), + overwriteMode, + workerPoolSize, + uidPrefix, + previousCommitterMetrics); + + DynamicWriteResultAggregator aggregator = + new DynamicWriteResultAggregator(CATALOG_EXTENSION.catalogLoader(), cacheMaximumSize); + OneInputStreamOperatorTestHarness aggregatorHarness = + new OneInputStreamOperatorTestHarness(aggregator); + aggregatorHarness.open(); + + TableKey tableKey = new TableKey(TABLE1, "branch"); + // Operator id is stable across Flink job restarts, jobId is not. + final String operatorId = new OperatorID().toHexString(); + final String previousJobIdStr = previousJobId.toHexString(); + final int previousCheckpointId = 10; + + byte[][] previousManifests = + aggregator.writeToManifests( + tableKey.tableName(), WRITE_RESULT_BY_SPEC, previousCheckpointId); + + DynamicCommittable previousCommittable = + new DynamicCommittable( + tableKey, previousManifests, previousJobIdStr, operatorId, previousCheckpointId); + previousCommitter.commit(Sets.newHashSet(new MockCommitRequest<>(previousCommittable))); + + table.refresh(); + assertThat(table.snapshots()).hasSize(1); + + // Simulate the savepoint-driven restart: a new Flink jobId is assigned, yet the committer + // state replays the previous committable tagged with previousJobIdStr and adds a fresh + // committable for the next checkpoint tagged with newJobIdStr. + JobID newJobId = JobID.generate(); + DynamicCommitterMetrics newCommitterMetrics = new DynamicCommitterMetrics(metricGroup); + DynamicCommitter newCommitter = + new DynamicCommitter( + CATALOG_EXTENSION.catalog(), + Maps.newHashMap(), + overwriteMode, + workerPoolSize, + uidPrefix, + newCommitterMetrics); + + final String newJobIdStr = newJobId.toHexString(); + final int newCheckpointId = previousCheckpointId + 1; + + byte[][] previousManifestsNew = + aggregator.writeToManifests( + tableKey.tableName(), WRITE_RESULT_BY_SPEC, previousCheckpointId); + byte[][] newManifests = + aggregator.writeToManifests(tableKey.tableName(), WRITE_RESULT_BY_SPEC_2, newCheckpointId); + + CommitRequest replayedPreviousCommitRequest = + new MockCommitRequest<>( + new DynamicCommittable( + tableKey, + previousManifestsNew, + previousJobIdStr, + operatorId, + previousCheckpointId)); + CommitRequest newCommitRequest = + new MockCommitRequest<>( + new DynamicCommittable( + tableKey, newManifests, newJobIdStr, operatorId, newCheckpointId)); + + newCommitter.commit(Sets.newHashSet(replayedPreviousCommitRequest, newCommitRequest)); + + table.refresh(); + // Exactly two snapshots: the original one stamped with previousJobIdStr and the one added + // under newJobIdStr. Without the dedup fix the replayed committable would produce a third + // snapshot that re-adds the same data files, yielding duplicate rows. + assertThat(table.snapshots()).hasSize(2); + + Snapshot first = Iterables.get(table.snapshots(), 0); + assertThat(first.summary()) + .containsEntry("flink.job-id", previousJobIdStr) + .containsEntry("flink.max-committed-checkpoint-id", String.valueOf(previousCheckpointId)) + .containsEntry("flink.operator-id", operatorId); + + Snapshot second = Iterables.get(table.snapshots(), 1); + assertThat(second.summary()) + .containsEntry("flink.job-id", newJobIdStr) + .containsEntry("flink.max-committed-checkpoint-id", String.valueOf(newCheckpointId)) + .containsEntry("flink.operator-id", operatorId); + } + @Test void testCommitDeleteInDifferentFormatVersion() throws Exception { Table table1 = catalog.loadTable(TableIdentifier.of(TABLE1)); From 76b5f0742ebd92115f0cc95ca99c64323ad3dddf Mon Sep 17 00:00:00 2001 From: lrsb Date: Tue, 21 Apr 2026 17:55:41 +0200 Subject: [PATCH 2/4] Flink: Fix flaky testTableBranchAtomicCommitForAppendOnlyData --- .../iceberg/flink/sink/dynamic/TestDynamicCommitter.java | 6 ++---- 1 file changed, 2 insertions(+), 4 deletions(-) diff --git a/flink/v2.1/flink/src/test/java/org/apache/iceberg/flink/sink/dynamic/TestDynamicCommitter.java b/flink/v2.1/flink/src/test/java/org/apache/iceberg/flink/sink/dynamic/TestDynamicCommitter.java index 719b22ad1379..cafd50134ed8 100644 --- a/flink/v2.1/flink/src/test/java/org/apache/iceberg/flink/sink/dynamic/TestDynamicCommitter.java +++ b/flink/v2.1/flink/src/test/java/org/apache/iceberg/flink/sink/dynamic/TestDynamicCommitter.java @@ -618,8 +618,7 @@ void testTableBranchAtomicCommitForAppendOnlyData() throws Exception { // Two committables, one for each snapshot / table / branch. assertThat(table.snapshots()).hasSize(2); - Snapshot snapshot1 = Iterables.getFirst(table.snapshots(), null); - assertThat(snapshot1.snapshotId()).isEqualTo(table.refs().get("branch1").snapshotId()); + Snapshot snapshot1 = table.snapshot(table.refs().get("branch1").snapshotId()); assertThat(snapshot1.summary()) .containsAllEntriesOf( ImmutableMap.builder() @@ -637,8 +636,7 @@ void testTableBranchAtomicCommitForAppendOnlyData() throws Exception { .put("total-records", "66") .build()); - Snapshot snapshot2 = Iterables.get(table.snapshots(), 1); - assertThat(snapshot2.snapshotId()).isEqualTo(table.refs().get("branch2").snapshotId()); + Snapshot snapshot2 = table.snapshot(table.refs().get("branch2").snapshotId()); assertThat(snapshot2.summary()) .containsAllEntriesOf( ImmutableMap.builder() From 31d6b24a744b3ef428283910ad6162c757c778d3 Mon Sep 17 00:00:00 2001 From: lrsb Date: Wed, 22 Apr 2026 16:01:26 +0200 Subject: [PATCH 3/4] Flink: Cache table ancestors per TableKey in DynamicCommitter --- .../flink/sink/dynamic/DynamicCommitter.java | 68 ++++++++------ ...ommitGroupKey.java => JobOperatorKey.java} | 27 ++---- .../sink/dynamic/TestDynamicCommitter.java | 19 +--- .../sink/dynamic/TestDynamicIcebergSink.java | 93 +++++++++++++++++++ 4 files changed, 143 insertions(+), 64 deletions(-) rename flink/v2.1/flink/src/main/java/org/apache/iceberg/flink/sink/dynamic/{CommitGroupKey.java => JobOperatorKey.java} (67%) diff --git a/flink/v2.1/flink/src/main/java/org/apache/iceberg/flink/sink/dynamic/DynamicCommitter.java b/flink/v2.1/flink/src/main/java/org/apache/iceberg/flink/sink/dynamic/DynamicCommitter.java index 366412c5b939..5944927a43c1 100644 --- a/flink/v2.1/flink/src/main/java/org/apache/iceberg/flink/sink/dynamic/DynamicCommitter.java +++ b/flink/v2.1/flink/src/main/java/org/apache/iceberg/flink/sink/dynamic/DynamicCommitter.java @@ -119,43 +119,51 @@ public void commit(Collection> commitRequests) DynamicWriteResultAggregator. Iceberg 1.12 will remove this, and users should upgrade to the 1.11 release first to migrate their state to a single commit request per checkpoint. */ - Map>>> + Map>>>> commitRequestMap = Maps.newHashMap(); for (CommitRequest request : commitRequests) { - NavigableMap>> committables = - commitRequestMap.computeIfAbsent( - new CommitGroupKey(request.getCommittable()), unused -> Maps.newTreeMap()); - committables - .computeIfAbsent(request.getCommittable().checkpointId(), unused -> Lists.newArrayList()) + DynamicCommittable committable = request.getCommittable(); + commitRequestMap + .computeIfAbsent(committable.key(), unused -> Maps.newHashMap()) + .computeIfAbsent(new JobOperatorKey(committable), unused -> Maps.newTreeMap()) + .computeIfAbsent(committable.checkpointId(), unused -> Lists.newArrayList()) .add(request); } - for (Map.Entry>>> - entry : commitRequestMap.entrySet()) { - CommitGroupKey groupKey = entry.getKey(); - Table table = catalog.loadTable(TableIdentifier.parse(groupKey.tableName())); - Snapshot latestSnapshot = table.snapshot(groupKey.branch()); - Iterable ancestors = + for (Map.Entry< + TableKey, + Map>>>> + tableEntry : commitRequestMap.entrySet()) { + TableKey tableKey = tableEntry.getKey(); + Table table = catalog.loadTable(TableIdentifier.parse(tableKey.tableName())); + Snapshot latestSnapshot = table.snapshot(tableKey.branch()); + List ancestors = latestSnapshot != null - ? SnapshotUtil.ancestorsOf(latestSnapshot.snapshotId(), table::snapshot) + ? Lists.newArrayList( + SnapshotUtil.ancestorsOf(latestSnapshot.snapshotId(), table::snapshot)) : List.of(); - long maxCommittedCheckpointId = - getMaxCommittedCheckpointId(ancestors, groupKey.jobId(), groupKey.operatorId()); - - NavigableMap>> skippedCommitRequests = - entry.getValue().headMap(maxCommittedCheckpointId, true); - LOG.debug( - "Skipping {} commit requests: {}", skippedCommitRequests.size(), skippedCommitRequests); - // Mark the already committed FilesCommittable(s) as finished - skippedCommitRequests - .values() - .forEach(list -> list.forEach(CommitRequest::signalAlreadyCommitted)); - - NavigableMap>> uncommitted = - entry.getValue().tailMap(maxCommittedCheckpointId, false); - if (!uncommitted.isEmpty()) { - commitPendingRequests( - table, groupKey.branch(), uncommitted, groupKey.jobId(), groupKey.operatorId()); + + for (Map.Entry>>> + jobEntry : tableEntry.getValue().entrySet()) { + JobOperatorKey jobKey = jobEntry.getKey(); + long maxCommittedCheckpointId = + getMaxCommittedCheckpointId(ancestors, jobKey.jobId(), jobKey.operatorId()); + + NavigableMap>> skippedCommitRequests = + jobEntry.getValue().headMap(maxCommittedCheckpointId, true); + LOG.debug( + "Skipping {} commit requests: {}", skippedCommitRequests.size(), skippedCommitRequests); + // Mark the already committed FilesCommittable(s) as finished + skippedCommitRequests + .values() + .forEach(list -> list.forEach(CommitRequest::signalAlreadyCommitted)); + + NavigableMap>> uncommitted = + jobEntry.getValue().tailMap(maxCommittedCheckpointId, false); + if (!uncommitted.isEmpty()) { + commitPendingRequests( + table, tableKey.branch(), uncommitted, jobKey.jobId(), jobKey.operatorId()); + } } } } diff --git a/flink/v2.1/flink/src/main/java/org/apache/iceberg/flink/sink/dynamic/CommitGroupKey.java b/flink/v2.1/flink/src/main/java/org/apache/iceberg/flink/sink/dynamic/JobOperatorKey.java similarity index 67% rename from flink/v2.1/flink/src/main/java/org/apache/iceberg/flink/sink/dynamic/CommitGroupKey.java rename to flink/v2.1/flink/src/main/java/org/apache/iceberg/flink/sink/dynamic/JobOperatorKey.java index 8f3aff1dfd6c..4d7c891ccd20 100644 --- a/flink/v2.1/flink/src/main/java/org/apache/iceberg/flink/sink/dynamic/CommitGroupKey.java +++ b/flink/v2.1/flink/src/main/java/org/apache/iceberg/flink/sink/dynamic/JobOperatorKey.java @@ -20,27 +20,15 @@ import java.util.Objects; -class CommitGroupKey { - private final String tableName; - private final String branch; +class JobOperatorKey { private final String jobId; private final String operatorId; - CommitGroupKey(DynamicCommittable committable) { - this.tableName = committable.key().tableName(); - this.branch = committable.key().branch(); + JobOperatorKey(DynamicCommittable committable) { this.jobId = committable.jobId(); this.operatorId = committable.operatorId(); } - String tableName() { - return tableName; - } - - String branch() { - return branch; - } - String jobId() { return jobId; } @@ -54,17 +42,16 @@ public boolean equals(Object other) { if (this == other) { return true; } - if (!(other instanceof CommitGroupKey that)) { + + if (!(other instanceof JobOperatorKey that)) { return false; } - return tableName.equals(that.tableName) - && branch.equals(that.branch) - && Objects.equals(jobId, that.jobId) - && Objects.equals(operatorId, that.operatorId); + + return jobId.equals(that.jobId) && operatorId.equals(that.operatorId); } @Override public int hashCode() { - return Objects.hash(tableName, branch, jobId, operatorId); + return Objects.hash(jobId, operatorId); } } diff --git a/flink/v2.1/flink/src/test/java/org/apache/iceberg/flink/sink/dynamic/TestDynamicCommitter.java b/flink/v2.1/flink/src/test/java/org/apache/iceberg/flink/sink/dynamic/TestDynamicCommitter.java index cafd50134ed8..d2e6615458ca 100644 --- a/flink/v2.1/flink/src/test/java/org/apache/iceberg/flink/sink/dynamic/TestDynamicCommitter.java +++ b/flink/v2.1/flink/src/test/java/org/apache/iceberg/flink/sink/dynamic/TestDynamicCommitter.java @@ -26,6 +26,7 @@ import java.io.Serializable; import java.nio.ByteBuffer; import java.util.Collection; +import java.util.List; import java.util.Map; import org.apache.flink.api.common.JobID; import org.apache.flink.api.connector.sink2.Committer.CommitRequest; @@ -327,12 +328,7 @@ void testSkipsCommitRequestsForPreviousCheckpoints() throws Exception { } @Test - void testDedupsCommittablesTaggedWithPreviousJobIdAfterRestart() throws Exception { - // Reproduces duplication bug: after a stop-with-savepoint rescale, the committer's state - // contains a committable tagged with the previous Flink jobId alongside fresh committables - // tagged with the new jobId. Dedup must recognise that the previous-jobId committable's - // data is already present in the ancestor chain (stamped with the previous jobId) and skip it, - // otherwise the underlying data files are committed twice and rows are duplicated. + void testSkipsAlreadyCommittedDataAfterJobIdChanges() throws Exception { Table table = catalog.loadTable(TableIdentifier.of(TABLE1)); assertThat(table.snapshots()).isEmpty(); @@ -375,9 +371,6 @@ void testDedupsCommittablesTaggedWithPreviousJobIdAfterRestart() throws Exceptio table.refresh(); assertThat(table.snapshots()).hasSize(1); - // Simulate the savepoint-driven restart: a new Flink jobId is assigned, yet the committer - // state replays the previous committable tagged with previousJobIdStr and adds a fresh - // committable for the next checkpoint tagged with newJobIdStr. JobID newJobId = JobID.generate(); DynamicCommitterMetrics newCommitterMetrics = new DynamicCommitterMetrics(metricGroup); DynamicCommitter newCommitter = @@ -414,9 +407,6 @@ void testDedupsCommittablesTaggedWithPreviousJobIdAfterRestart() throws Exceptio newCommitter.commit(Sets.newHashSet(replayedPreviousCommitRequest, newCommitRequest)); table.refresh(); - // Exactly two snapshots: the original one stamped with previousJobIdStr and the one added - // under newJobIdStr. Without the dedup fix the replayed committable would produce a third - // snapshot that re-adds the same data files, yielding duplicate rows. assertThat(table.snapshots()).hasSize(2); Snapshot first = Iterables.get(table.snapshots(), 0); @@ -1081,8 +1071,9 @@ static class CommitHookEnabledDynamicCommitter extends DynamicCommitter { @Override public void commit(Collection> commitRequests) throws IOException, InterruptedException { - commitHook.beforeCommit(commitRequests); - super.commit(commitRequests); + List> mutableRequests = Lists.newArrayList(commitRequests); + commitHook.beforeCommit(mutableRequests); + super.commit(mutableRequests); commitHook.afterCommit(); } diff --git a/flink/v2.1/flink/src/test/java/org/apache/iceberg/flink/sink/dynamic/TestDynamicIcebergSink.java b/flink/v2.1/flink/src/test/java/org/apache/iceberg/flink/sink/dynamic/TestDynamicIcebergSink.java index 2a46f8021cca..e6480442d03a 100644 --- a/flink/v2.1/flink/src/test/java/org/apache/iceberg/flink/sink/dynamic/TestDynamicIcebergSink.java +++ b/flink/v2.1/flink/src/test/java/org/apache/iceberg/flink/sink/dynamic/TestDynamicIcebergSink.java @@ -35,10 +35,12 @@ import java.util.stream.Collectors; import java.util.stream.StreamSupport; import javax.annotation.Nullable; +import org.apache.flink.api.common.JobID; import org.apache.flink.api.common.typeinfo.TypeHint; import org.apache.flink.api.common.typeinfo.TypeInformation; import org.apache.flink.api.connector.sink2.Committer; import org.apache.flink.api.connector.sink2.CommitterInitContext; +import org.apache.flink.api.connector.sink2.mocks.MockCommitRequest; import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.configuration.Configuration; import org.apache.flink.configuration.RestartStrategyOptions; @@ -1091,6 +1093,44 @@ void testCommitsOnceWhenConcurrentDuplicateCommit(boolean overwriteMode) throws assertThat(totalAddedRecords).isEqualTo(records.size()); } + @ParameterizedTest + @ValueSource(booleans = {false, true}) + void testSkipsAlreadyCommittedDataAfterJobIdChanges(boolean overwriteMode) throws Exception { + TableIdentifier tableId = TableIdentifier.of(DATABASE, "t1"); + List records = + Lists.newArrayList( + new DynamicIcebergDataImpl( + SimpleDataUtil.SCHEMA, + tableId.name(), + SnapshotRef.MAIN_BRANCH, + PartitionSpec.unpartitioned())); + + DataFile seedDataFile = + DataFiles.builder(PartitionSpec.unpartitioned()) + .withPath("/path/to/seed-data-1.parquet") + .withFileSizeInBytes(0) + .withRecordCount(1) + .build(); + + executeDynamicSink( + records, env, true, 1, new ReplayPreviousJobIdCommittableHook(seedDataFile), overwriteMode); + + Table table = CATALOG_EXTENSION.catalog().loadTable(tableId); + + Snapshot mainSnapshot = + StreamSupport.stream(table.snapshots().spliterator(), false) + .filter( + s -> + !ReplayPreviousJobIdCommittableHook.PREVIOUS_JOB_ID.equals( + s.summary().get("flink.job-id"))) + .filter(s -> s.summary().get("flink.job-id") != null) + .reduce((first, second) -> second) + .orElseThrow(); + assertThat(mainSnapshot.summary()) + .containsEntry("added-data-files", "1") + .containsEntry("added-records", String.valueOf(records.size())); + } + @Test void testCommitsOncePerTableBranchAndCheckpoint() throws Exception { String tableName = "t1"; @@ -1407,6 +1447,59 @@ public void beforeCommitOperation() { } } + /** + * Seeds an ancestor snapshot under a synthetic previous jobId and injects a replay committable + * tagged with that jobId into the main batch. The seed uses {@code table.newAppend()} directly + * rather than a side committer so the real committable's manifest stays intact. + */ + static class ReplayPreviousJobIdCommittableHook implements CommitHook { + static final String PREVIOUS_JOB_ID = JobID.generate().toHexString(); + + private static final String MAX_COMMITTED_CHECKPOINT_ID = "flink.max-committed-checkpoint-id"; + private static final String FLINK_JOB_ID = "flink.job-id"; + private static final String OPERATOR_ID = "flink.operator-id"; + + // Static to survive Flink operator serialization. + private static boolean hasTriggered = false; + + private final DataFile seedDataFile; + + ReplayPreviousJobIdCommittableHook(DataFile seedDataFile) { + this.seedDataFile = seedDataFile; + hasTriggered = false; + } + + @Override + public void beforeCommit(Collection> requests) { + if (hasTriggered || requests.isEmpty()) { + return; + } + + hasTriggered = true; + DynamicCommittable original = requests.iterator().next().getCommittable(); + + Table table = + CATALOG_EXTENSION.catalog().loadTable(TableIdentifier.parse(original.key().tableName())); + table + .newAppend() + .appendFile(seedDataFile) + .set(FLINK_JOB_ID, PREVIOUS_JOB_ID) + .set(OPERATOR_ID, original.operatorId()) + .set(MAX_COMMITTED_CHECKPOINT_ID, Long.toString(original.checkpointId())) + .toBranch(original.key().branch()) + .commit(); + + DynamicCommittable replayed = + new DynamicCommittable( + original.key(), + original.manifests(), + PREVIOUS_JOB_ID, + original.operatorId(), + original.checkpointId()); + requests.add(new MockCommitRequest<>(replayed)); + } + } + private static class AppendRightBeforeCommit implements CommitHook { final String tableIdentifier; From 5797c3f35facd3b839c061771ed9ec66118292de Mon Sep 17 00:00:00 2001 From: lrsb Date: Thu, 23 Apr 2026 13:26:50 +0200 Subject: [PATCH 4/4] Flink: Order DynamicCommitter groups by checkpoint id --- .../flink/sink/dynamic/DynamicCommitter.java | 16 +++++++--- .../sink/dynamic/TestDynamicCommitter.java | 15 +++++---- .../sink/dynamic/TestDynamicIcebergSink.java | 32 +++++++++++++------ 3 files changed, 44 insertions(+), 19 deletions(-) diff --git a/flink/v2.1/flink/src/main/java/org/apache/iceberg/flink/sink/dynamic/DynamicCommitter.java b/flink/v2.1/flink/src/main/java/org/apache/iceberg/flink/sink/dynamic/DynamicCommitter.java index 5944927a43c1..68fa30720710 100644 --- a/flink/v2.1/flink/src/main/java/org/apache/iceberg/flink/sink/dynamic/DynamicCommitter.java +++ b/flink/v2.1/flink/src/main/java/org/apache/iceberg/flink/sink/dynamic/DynamicCommitter.java @@ -21,6 +21,7 @@ import java.io.IOException; import java.util.Arrays; import java.util.Collection; +import java.util.Comparator; import java.util.List; import java.util.Map; import java.util.NavigableMap; @@ -137,14 +138,21 @@ public void commit(Collection> commitRequests) TableKey tableKey = tableEntry.getKey(); Table table = catalog.loadTable(TableIdentifier.parse(tableKey.tableName())); Snapshot latestSnapshot = table.snapshot(tableKey.branch()); - List ancestors = + Iterable ancestors = latestSnapshot != null - ? Lists.newArrayList( - SnapshotUtil.ancestorsOf(latestSnapshot.snapshotId(), table::snapshot)) + ? SnapshotUtil.ancestorsOf(latestSnapshot.snapshotId(), table::snapshot) : List.of(); + List>>>> + jobEntries = Lists.newArrayList(tableEntry.getValue().entrySet()); + // Preserve checkpoint order across groups so that older-jobId commits land before newer-jobId + // ones when the batch mixes committables from different jobIds (e.g. state replay after a + // restart). Within a (jobId, operatorId) group, checkpoint order is already guaranteed by + // the inner NavigableMap. + jobEntries.sort(Comparator.comparingLong(entry -> entry.getValue().firstKey())); + for (Map.Entry>>> - jobEntry : tableEntry.getValue().entrySet()) { + jobEntry : jobEntries) { JobOperatorKey jobKey = jobEntry.getKey(); long maxCommittedCheckpointId = getMaxCommittedCheckpointId(ancestors, jobKey.jobId(), jobKey.operatorId()); diff --git a/flink/v2.1/flink/src/test/java/org/apache/iceberg/flink/sink/dynamic/TestDynamicCommitter.java b/flink/v2.1/flink/src/test/java/org/apache/iceberg/flink/sink/dynamic/TestDynamicCommitter.java index d2e6615458ca..e1d5b831a211 100644 --- a/flink/v2.1/flink/src/test/java/org/apache/iceberg/flink/sink/dynamic/TestDynamicCommitter.java +++ b/flink/v2.1/flink/src/test/java/org/apache/iceberg/flink/sink/dynamic/TestDynamicCommitter.java @@ -26,7 +26,6 @@ import java.io.Serializable; import java.nio.ByteBuffer; import java.util.Collection; -import java.util.List; import java.util.Map; import org.apache.flink.api.common.JobID; import org.apache.flink.api.connector.sink2.Committer.CommitRequest; @@ -992,7 +991,10 @@ void testThrowsValidationExceptionOnDuplicateCommit(boolean overwriteMode) throw } interface CommitHook extends Serializable { - default void beforeCommit(Collection> commitRequests) {} + default Collection> beforeCommit( + Collection> commitRequests) { + return commitRequests; + } default void beforeCommitOperation() {} @@ -1013,11 +1015,14 @@ static class FailBeforeAndAfterCommit implements CommitHook { } @Override - public void beforeCommit(Collection> ignored) { + public Collection> beforeCommit( + Collection> requests) { if (!failedBeforeCommit) { failedBeforeCommit = true; throw new RuntimeException("Failing before commit"); } + + return requests; } @Override @@ -1071,9 +1076,7 @@ static class CommitHookEnabledDynamicCommitter extends DynamicCommitter { @Override public void commit(Collection> commitRequests) throws IOException, InterruptedException { - List> mutableRequests = Lists.newArrayList(commitRequests); - commitHook.beforeCommit(mutableRequests); - super.commit(mutableRequests); + super.commit(commitHook.beforeCommit(commitRequests)); commitHook.afterCommit(); } diff --git a/flink/v2.1/flink/src/test/java/org/apache/iceberg/flink/sink/dynamic/TestDynamicIcebergSink.java b/flink/v2.1/flink/src/test/java/org/apache/iceberg/flink/sink/dynamic/TestDynamicIcebergSink.java index e6480442d03a..6071f62bdc8c 100644 --- a/flink/v2.1/flink/src/test/java/org/apache/iceberg/flink/sink/dynamic/TestDynamicIcebergSink.java +++ b/flink/v2.1/flink/src/test/java/org/apache/iceberg/flink/sink/dynamic/TestDynamicIcebergSink.java @@ -1129,6 +1129,10 @@ void testSkipsAlreadyCommittedDataAfterJobIdChanges(boolean overwriteMode) throw assertThat(mainSnapshot.summary()) .containsEntry("added-data-files", "1") .containsEntry("added-records", String.valueOf(records.size())); + + long expectedTotalRecords = overwriteMode ? records.size() : records.size() + 1L; + assertThat(Long.parseLong(mainSnapshot.summary().get("total-records"))) + .isEqualTo(expectedTotalRecords); } @Test @@ -1426,10 +1430,13 @@ private static void resetState() { } @Override - public void beforeCommit(Collection> requests) { + public Collection> beforeCommit( + Collection> requests) { if (!hasTriggered) { this.commitRequests.addAll(requests); } + + return requests; } @Override @@ -1448,9 +1455,10 @@ public void beforeCommitOperation() { } /** - * Seeds an ancestor snapshot under a synthetic previous jobId and injects a replay committable - * tagged with that jobId into the main batch. The seed uses {@code table.newAppend()} directly - * rather than a side committer so the real committable's manifest stays intact. + * Seeds an ancestor snapshot under a synthetic previous jobId and prepends a replay committable + * tagged with that jobId (at an earlier checkpoint, as would happen on restart-replay) to the + * batch. The seed uses {@code table.newAppend()} directly rather than a side committer so the + * real committable's manifest stays intact. */ static class ReplayPreviousJobIdCommittableHook implements CommitHook { static final String PREVIOUS_JOB_ID = JobID.generate().toHexString(); @@ -1470,13 +1478,15 @@ static class ReplayPreviousJobIdCommittableHook implements CommitHook { } @Override - public void beforeCommit(Collection> requests) { + public Collection> beforeCommit( + Collection> requests) { if (hasTriggered || requests.isEmpty()) { - return; + return requests; } hasTriggered = true; DynamicCommittable original = requests.iterator().next().getCommittable(); + long replayedCheckpointId = original.checkpointId() - 1; Table table = CATALOG_EXTENSION.catalog().loadTable(TableIdentifier.parse(original.key().tableName())); @@ -1485,7 +1495,7 @@ public void beforeCommit(Collection> .appendFile(seedDataFile) .set(FLINK_JOB_ID, PREVIOUS_JOB_ID) .set(OPERATOR_ID, original.operatorId()) - .set(MAX_COMMITTED_CHECKPOINT_ID, Long.toString(original.checkpointId())) + .set(MAX_COMMITTED_CHECKPOINT_ID, Long.toString(replayedCheckpointId)) .toBranch(original.key().branch()) .commit(); @@ -1495,8 +1505,12 @@ public void beforeCommit(Collection> original.manifests(), PREVIOUS_JOB_ID, original.operatorId(), - original.checkpointId()); - requests.add(new MockCommitRequest<>(replayed)); + replayedCheckpointId); + List> enriched = + Lists.newArrayListWithCapacity(requests.size() + 1); + enriched.add(new MockCommitRequest<>(replayed)); + enriched.addAll(requests); + return enriched; } }