diff --git a/flink/v2.1/flink/src/main/java/org/apache/iceberg/flink/sink/dynamic/DynamicCommitter.java b/flink/v2.1/flink/src/main/java/org/apache/iceberg/flink/sink/dynamic/DynamicCommitter.java index 5e824773f4bf..68fa30720710 100644 --- a/flink/v2.1/flink/src/main/java/org/apache/iceberg/flink/sink/dynamic/DynamicCommitter.java +++ b/flink/v2.1/flink/src/main/java/org/apache/iceberg/flink/sink/dynamic/DynamicCommitter.java @@ -21,6 +21,7 @@ import java.io.IOException; import java.util.Arrays; import java.util.Collection; +import java.util.Comparator; import java.util.List; import java.util.Map; import java.util.NavigableMap; @@ -119,43 +120,58 @@ public void commit(Collection> commitRequests) DynamicWriteResultAggregator. Iceberg 1.12 will remove this, and users should upgrade to the 1.11 release first to migrate their state to a single commit request per checkpoint. */ - Map>>> commitRequestMap = - Maps.newHashMap(); + Map>>>> + commitRequestMap = Maps.newHashMap(); for (CommitRequest request : commitRequests) { - NavigableMap>> committables = - commitRequestMap.computeIfAbsent( - new TableKey(request.getCommittable()), unused -> Maps.newTreeMap()); - committables - .computeIfAbsent(request.getCommittable().checkpointId(), unused -> Lists.newArrayList()) + DynamicCommittable committable = request.getCommittable(); + commitRequestMap + .computeIfAbsent(committable.key(), unused -> Maps.newHashMap()) + .computeIfAbsent(new JobOperatorKey(committable), unused -> Maps.newTreeMap()) + .computeIfAbsent(committable.checkpointId(), unused -> Lists.newArrayList()) .add(request); } - for (Map.Entry>>> entry : - commitRequestMap.entrySet()) { - Table table = catalog.loadTable(TableIdentifier.parse(entry.getKey().tableName())); - DynamicCommittable last = entry.getValue().lastEntry().getValue().get(0).getCommittable(); - Snapshot latestSnapshot = table.snapshot(entry.getKey().branch()); + for (Map.Entry< + TableKey, + Map>>>> + tableEntry : commitRequestMap.entrySet()) { + TableKey tableKey = tableEntry.getKey(); + Table table = catalog.loadTable(TableIdentifier.parse(tableKey.tableName())); + Snapshot latestSnapshot = table.snapshot(tableKey.branch()); Iterable ancestors = latestSnapshot != null ? SnapshotUtil.ancestorsOf(latestSnapshot.snapshotId(), table::snapshot) : List.of(); - long maxCommittedCheckpointId = - getMaxCommittedCheckpointId(ancestors, last.jobId(), last.operatorId()); - - NavigableMap>> skippedCommitRequests = - entry.getValue().headMap(maxCommittedCheckpointId, true); - LOG.debug( - "Skipping {} commit requests: {}", skippedCommitRequests.size(), skippedCommitRequests); - // Mark the already committed FilesCommittable(s) as finished - skippedCommitRequests - .values() - .forEach(list -> list.forEach(CommitRequest::signalAlreadyCommitted)); - - NavigableMap>> uncommitted = - entry.getValue().tailMap(maxCommittedCheckpointId, false); - if (!uncommitted.isEmpty()) { - commitPendingRequests( - table, entry.getKey().branch(), uncommitted, last.jobId(), last.operatorId()); + + List>>>> + jobEntries = Lists.newArrayList(tableEntry.getValue().entrySet()); + // Preserve checkpoint order across groups so that older-jobId commits land before newer-jobId + // ones when the batch mixes committables from different jobIds (e.g. state replay after a + // restart). Within a (jobId, operatorId) group, checkpoint order is already guaranteed by + // the inner NavigableMap. + jobEntries.sort(Comparator.comparingLong(entry -> entry.getValue().firstKey())); + + for (Map.Entry>>> + jobEntry : jobEntries) { + JobOperatorKey jobKey = jobEntry.getKey(); + long maxCommittedCheckpointId = + getMaxCommittedCheckpointId(ancestors, jobKey.jobId(), jobKey.operatorId()); + + NavigableMap>> skippedCommitRequests = + jobEntry.getValue().headMap(maxCommittedCheckpointId, true); + LOG.debug( + "Skipping {} commit requests: {}", skippedCommitRequests.size(), skippedCommitRequests); + // Mark the already committed FilesCommittable(s) as finished + skippedCommitRequests + .values() + .forEach(list -> list.forEach(CommitRequest::signalAlreadyCommitted)); + + NavigableMap>> uncommitted = + jobEntry.getValue().tailMap(maxCommittedCheckpointId, false); + if (!uncommitted.isEmpty()) { + commitPendingRequests( + table, tableKey.branch(), uncommitted, jobKey.jobId(), jobKey.operatorId()); + } } } } diff --git a/flink/v2.1/flink/src/main/java/org/apache/iceberg/flink/sink/dynamic/JobOperatorKey.java b/flink/v2.1/flink/src/main/java/org/apache/iceberg/flink/sink/dynamic/JobOperatorKey.java new file mode 100644 index 000000000000..4d7c891ccd20 --- /dev/null +++ b/flink/v2.1/flink/src/main/java/org/apache/iceberg/flink/sink/dynamic/JobOperatorKey.java @@ -0,0 +1,57 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg.flink.sink.dynamic; + +import java.util.Objects; + +class JobOperatorKey { + private final String jobId; + private final String operatorId; + + JobOperatorKey(DynamicCommittable committable) { + this.jobId = committable.jobId(); + this.operatorId = committable.operatorId(); + } + + String jobId() { + return jobId; + } + + String operatorId() { + return operatorId; + } + + @Override + public boolean equals(Object other) { + if (this == other) { + return true; + } + + if (!(other instanceof JobOperatorKey that)) { + return false; + } + + return jobId.equals(that.jobId) && operatorId.equals(that.operatorId); + } + + @Override + public int hashCode() { + return Objects.hash(jobId, operatorId); + } +} diff --git a/flink/v2.1/flink/src/main/java/org/apache/iceberg/flink/sink/dynamic/TableKey.java b/flink/v2.1/flink/src/main/java/org/apache/iceberg/flink/sink/dynamic/TableKey.java index 08b755fe14a8..45a2961b62a8 100644 --- a/flink/v2.1/flink/src/main/java/org/apache/iceberg/flink/sink/dynamic/TableKey.java +++ b/flink/v2.1/flink/src/main/java/org/apache/iceberg/flink/sink/dynamic/TableKey.java @@ -33,11 +33,6 @@ class TableKey { this.branch = branch; } - TableKey(DynamicCommittable committable) { - this.tableName = committable.key().tableName(); - this.branch = committable.key().branch(); - } - String tableName() { return tableName; } diff --git a/flink/v2.1/flink/src/test/java/org/apache/iceberg/flink/sink/dynamic/TestDynamicCommitter.java b/flink/v2.1/flink/src/test/java/org/apache/iceberg/flink/sink/dynamic/TestDynamicCommitter.java index 4cc27151b094..e1d5b831a211 100644 --- a/flink/v2.1/flink/src/test/java/org/apache/iceberg/flink/sink/dynamic/TestDynamicCommitter.java +++ b/flink/v2.1/flink/src/test/java/org/apache/iceberg/flink/sink/dynamic/TestDynamicCommitter.java @@ -326,6 +326,101 @@ void testSkipsCommitRequestsForPreviousCheckpoints() throws Exception { .build()); } + @Test + void testSkipsAlreadyCommittedDataAfterJobIdChanges() throws Exception { + Table table = catalog.loadTable(TableIdentifier.of(TABLE1)); + assertThat(table.snapshots()).isEmpty(); + + boolean overwriteMode = false; + int workerPoolSize = 1; + String uidPrefix = "uidPrefix"; + UnregisteredMetricsGroup metricGroup = new UnregisteredMetricsGroup(); + JobID previousJobId = JobID.generate(); + DynamicCommitterMetrics previousCommitterMetrics = new DynamicCommitterMetrics(metricGroup); + DynamicCommitter previousCommitter = + new DynamicCommitter( + CATALOG_EXTENSION.catalog(), + Maps.newHashMap(), + overwriteMode, + workerPoolSize, + uidPrefix, + previousCommitterMetrics); + + DynamicWriteResultAggregator aggregator = + new DynamicWriteResultAggregator(CATALOG_EXTENSION.catalogLoader(), cacheMaximumSize); + OneInputStreamOperatorTestHarness aggregatorHarness = + new OneInputStreamOperatorTestHarness(aggregator); + aggregatorHarness.open(); + + TableKey tableKey = new TableKey(TABLE1, "branch"); + // Operator id is stable across Flink job restarts, jobId is not. + final String operatorId = new OperatorID().toHexString(); + final String previousJobIdStr = previousJobId.toHexString(); + final int previousCheckpointId = 10; + + byte[][] previousManifests = + aggregator.writeToManifests( + tableKey.tableName(), WRITE_RESULT_BY_SPEC, previousCheckpointId); + + DynamicCommittable previousCommittable = + new DynamicCommittable( + tableKey, previousManifests, previousJobIdStr, operatorId, previousCheckpointId); + previousCommitter.commit(Sets.newHashSet(new MockCommitRequest<>(previousCommittable))); + + table.refresh(); + assertThat(table.snapshots()).hasSize(1); + + JobID newJobId = JobID.generate(); + DynamicCommitterMetrics newCommitterMetrics = new DynamicCommitterMetrics(metricGroup); + DynamicCommitter newCommitter = + new DynamicCommitter( + CATALOG_EXTENSION.catalog(), + Maps.newHashMap(), + overwriteMode, + workerPoolSize, + uidPrefix, + newCommitterMetrics); + + final String newJobIdStr = newJobId.toHexString(); + final int newCheckpointId = previousCheckpointId + 1; + + byte[][] previousManifestsNew = + aggregator.writeToManifests( + tableKey.tableName(), WRITE_RESULT_BY_SPEC, previousCheckpointId); + byte[][] newManifests = + aggregator.writeToManifests(tableKey.tableName(), WRITE_RESULT_BY_SPEC_2, newCheckpointId); + + CommitRequest replayedPreviousCommitRequest = + new MockCommitRequest<>( + new DynamicCommittable( + tableKey, + previousManifestsNew, + previousJobIdStr, + operatorId, + previousCheckpointId)); + CommitRequest newCommitRequest = + new MockCommitRequest<>( + new DynamicCommittable( + tableKey, newManifests, newJobIdStr, operatorId, newCheckpointId)); + + newCommitter.commit(Sets.newHashSet(replayedPreviousCommitRequest, newCommitRequest)); + + table.refresh(); + assertThat(table.snapshots()).hasSize(2); + + Snapshot first = Iterables.get(table.snapshots(), 0); + assertThat(first.summary()) + .containsEntry("flink.job-id", previousJobIdStr) + .containsEntry("flink.max-committed-checkpoint-id", String.valueOf(previousCheckpointId)) + .containsEntry("flink.operator-id", operatorId); + + Snapshot second = Iterables.get(table.snapshots(), 1); + assertThat(second.summary()) + .containsEntry("flink.job-id", newJobIdStr) + .containsEntry("flink.max-committed-checkpoint-id", String.valueOf(newCheckpointId)) + .containsEntry("flink.operator-id", operatorId); + } + @Test void testCommitDeleteInDifferentFormatVersion() throws Exception { Table table1 = catalog.loadTable(TableIdentifier.of(TABLE1)); @@ -512,8 +607,7 @@ void testTableBranchAtomicCommitForAppendOnlyData() throws Exception { // Two committables, one for each snapshot / table / branch. assertThat(table.snapshots()).hasSize(2); - Snapshot snapshot1 = Iterables.getFirst(table.snapshots(), null); - assertThat(snapshot1.snapshotId()).isEqualTo(table.refs().get("branch1").snapshotId()); + Snapshot snapshot1 = table.snapshot(table.refs().get("branch1").snapshotId()); assertThat(snapshot1.summary()) .containsAllEntriesOf( ImmutableMap.builder() @@ -531,8 +625,7 @@ void testTableBranchAtomicCommitForAppendOnlyData() throws Exception { .put("total-records", "66") .build()); - Snapshot snapshot2 = Iterables.get(table.snapshots(), 1); - assertThat(snapshot2.snapshotId()).isEqualTo(table.refs().get("branch2").snapshotId()); + Snapshot snapshot2 = table.snapshot(table.refs().get("branch2").snapshotId()); assertThat(snapshot2.summary()) .containsAllEntriesOf( ImmutableMap.builder() @@ -898,7 +991,10 @@ void testThrowsValidationExceptionOnDuplicateCommit(boolean overwriteMode) throw } interface CommitHook extends Serializable { - default void beforeCommit(Collection> commitRequests) {} + default Collection> beforeCommit( + Collection> commitRequests) { + return commitRequests; + } default void beforeCommitOperation() {} @@ -919,11 +1015,14 @@ static class FailBeforeAndAfterCommit implements CommitHook { } @Override - public void beforeCommit(Collection> ignored) { + public Collection> beforeCommit( + Collection> requests) { if (!failedBeforeCommit) { failedBeforeCommit = true; throw new RuntimeException("Failing before commit"); } + + return requests; } @Override @@ -977,8 +1076,7 @@ static class CommitHookEnabledDynamicCommitter extends DynamicCommitter { @Override public void commit(Collection> commitRequests) throws IOException, InterruptedException { - commitHook.beforeCommit(commitRequests); - super.commit(commitRequests); + super.commit(commitHook.beforeCommit(commitRequests)); commitHook.afterCommit(); } diff --git a/flink/v2.1/flink/src/test/java/org/apache/iceberg/flink/sink/dynamic/TestDynamicIcebergSink.java b/flink/v2.1/flink/src/test/java/org/apache/iceberg/flink/sink/dynamic/TestDynamicIcebergSink.java index 2a46f8021cca..6071f62bdc8c 100644 --- a/flink/v2.1/flink/src/test/java/org/apache/iceberg/flink/sink/dynamic/TestDynamicIcebergSink.java +++ b/flink/v2.1/flink/src/test/java/org/apache/iceberg/flink/sink/dynamic/TestDynamicIcebergSink.java @@ -35,10 +35,12 @@ import java.util.stream.Collectors; import java.util.stream.StreamSupport; import javax.annotation.Nullable; +import org.apache.flink.api.common.JobID; import org.apache.flink.api.common.typeinfo.TypeHint; import org.apache.flink.api.common.typeinfo.TypeInformation; import org.apache.flink.api.connector.sink2.Committer; import org.apache.flink.api.connector.sink2.CommitterInitContext; +import org.apache.flink.api.connector.sink2.mocks.MockCommitRequest; import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.configuration.Configuration; import org.apache.flink.configuration.RestartStrategyOptions; @@ -1091,6 +1093,48 @@ void testCommitsOnceWhenConcurrentDuplicateCommit(boolean overwriteMode) throws assertThat(totalAddedRecords).isEqualTo(records.size()); } + @ParameterizedTest + @ValueSource(booleans = {false, true}) + void testSkipsAlreadyCommittedDataAfterJobIdChanges(boolean overwriteMode) throws Exception { + TableIdentifier tableId = TableIdentifier.of(DATABASE, "t1"); + List records = + Lists.newArrayList( + new DynamicIcebergDataImpl( + SimpleDataUtil.SCHEMA, + tableId.name(), + SnapshotRef.MAIN_BRANCH, + PartitionSpec.unpartitioned())); + + DataFile seedDataFile = + DataFiles.builder(PartitionSpec.unpartitioned()) + .withPath("/path/to/seed-data-1.parquet") + .withFileSizeInBytes(0) + .withRecordCount(1) + .build(); + + executeDynamicSink( + records, env, true, 1, new ReplayPreviousJobIdCommittableHook(seedDataFile), overwriteMode); + + Table table = CATALOG_EXTENSION.catalog().loadTable(tableId); + + Snapshot mainSnapshot = + StreamSupport.stream(table.snapshots().spliterator(), false) + .filter( + s -> + !ReplayPreviousJobIdCommittableHook.PREVIOUS_JOB_ID.equals( + s.summary().get("flink.job-id"))) + .filter(s -> s.summary().get("flink.job-id") != null) + .reduce((first, second) -> second) + .orElseThrow(); + assertThat(mainSnapshot.summary()) + .containsEntry("added-data-files", "1") + .containsEntry("added-records", String.valueOf(records.size())); + + long expectedTotalRecords = overwriteMode ? records.size() : records.size() + 1L; + assertThat(Long.parseLong(mainSnapshot.summary().get("total-records"))) + .isEqualTo(expectedTotalRecords); + } + @Test void testCommitsOncePerTableBranchAndCheckpoint() throws Exception { String tableName = "t1"; @@ -1386,10 +1430,13 @@ private static void resetState() { } @Override - public void beforeCommit(Collection> requests) { + public Collection> beforeCommit( + Collection> requests) { if (!hasTriggered) { this.commitRequests.addAll(requests); } + + return requests; } @Override @@ -1407,6 +1454,66 @@ public void beforeCommitOperation() { } } + /** + * Seeds an ancestor snapshot under a synthetic previous jobId and prepends a replay committable + * tagged with that jobId (at an earlier checkpoint, as would happen on restart-replay) to the + * batch. The seed uses {@code table.newAppend()} directly rather than a side committer so the + * real committable's manifest stays intact. + */ + static class ReplayPreviousJobIdCommittableHook implements CommitHook { + static final String PREVIOUS_JOB_ID = JobID.generate().toHexString(); + + private static final String MAX_COMMITTED_CHECKPOINT_ID = "flink.max-committed-checkpoint-id"; + private static final String FLINK_JOB_ID = "flink.job-id"; + private static final String OPERATOR_ID = "flink.operator-id"; + + // Static to survive Flink operator serialization. + private static boolean hasTriggered = false; + + private final DataFile seedDataFile; + + ReplayPreviousJobIdCommittableHook(DataFile seedDataFile) { + this.seedDataFile = seedDataFile; + hasTriggered = false; + } + + @Override + public Collection> beforeCommit( + Collection> requests) { + if (hasTriggered || requests.isEmpty()) { + return requests; + } + + hasTriggered = true; + DynamicCommittable original = requests.iterator().next().getCommittable(); + long replayedCheckpointId = original.checkpointId() - 1; + + Table table = + CATALOG_EXTENSION.catalog().loadTable(TableIdentifier.parse(original.key().tableName())); + table + .newAppend() + .appendFile(seedDataFile) + .set(FLINK_JOB_ID, PREVIOUS_JOB_ID) + .set(OPERATOR_ID, original.operatorId()) + .set(MAX_COMMITTED_CHECKPOINT_ID, Long.toString(replayedCheckpointId)) + .toBranch(original.key().branch()) + .commit(); + + DynamicCommittable replayed = + new DynamicCommittable( + original.key(), + original.manifests(), + PREVIOUS_JOB_ID, + original.operatorId(), + replayedCheckpointId); + List> enriched = + Lists.newArrayListWithCapacity(requests.size() + 1); + enriched.add(new MockCommitRequest<>(replayed)); + enriched.addAll(requests); + return enriched; + } + } + private static class AppendRightBeforeCommit implements CommitHook { final String tableIdentifier;