Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import java.io.IOException;
import java.util.Arrays;
import java.util.Collection;
import java.util.Comparator;
import java.util.List;
import java.util.Map;
import java.util.NavigableMap;
Expand Down Expand Up @@ -119,43 +120,58 @@ public void commit(Collection<CommitRequest<DynamicCommittable>> commitRequests)
DynamicWriteResultAggregator. Iceberg 1.12 will remove this, and users should upgrade to the 1.11 release first
to migrate their state to a single commit request per checkpoint.
*/
Map<TableKey, NavigableMap<Long, List<CommitRequest<DynamicCommittable>>>> commitRequestMap =
Maps.newHashMap();
Map<TableKey, Map<JobOperatorKey, NavigableMap<Long, List<CommitRequest<DynamicCommittable>>>>>
commitRequestMap = Maps.newHashMap();
for (CommitRequest<DynamicCommittable> request : commitRequests) {
NavigableMap<Long, List<CommitRequest<DynamicCommittable>>> committables =
commitRequestMap.computeIfAbsent(
new TableKey(request.getCommittable()), unused -> Maps.newTreeMap());
committables
.computeIfAbsent(request.getCommittable().checkpointId(), unused -> Lists.newArrayList())
DynamicCommittable committable = request.getCommittable();
commitRequestMap
.computeIfAbsent(committable.key(), unused -> Maps.newHashMap())
.computeIfAbsent(new JobOperatorKey(committable), unused -> Maps.newTreeMap())
.computeIfAbsent(committable.checkpointId(), unused -> Lists.newArrayList())
.add(request);
}

for (Map.Entry<TableKey, NavigableMap<Long, List<CommitRequest<DynamicCommittable>>>> entry :
Comment thread
lrsb marked this conversation as resolved.
commitRequestMap.entrySet()) {
Table table = catalog.loadTable(TableIdentifier.parse(entry.getKey().tableName()));
DynamicCommittable last = entry.getValue().lastEntry().getValue().get(0).getCommittable();
Snapshot latestSnapshot = table.snapshot(entry.getKey().branch());
for (Map.Entry<
TableKey,
Map<JobOperatorKey, NavigableMap<Long, List<CommitRequest<DynamicCommittable>>>>>
tableEntry : commitRequestMap.entrySet()) {
TableKey tableKey = tableEntry.getKey();
Table table = catalog.loadTable(TableIdentifier.parse(tableKey.tableName()));
Snapshot latestSnapshot = table.snapshot(tableKey.branch());
Iterable<Snapshot> ancestors =
latestSnapshot != null
? SnapshotUtil.ancestorsOf(latestSnapshot.snapshotId(), table::snapshot)
: List.of();
long maxCommittedCheckpointId =
getMaxCommittedCheckpointId(ancestors, last.jobId(), last.operatorId());

NavigableMap<Long, List<CommitRequest<DynamicCommittable>>> skippedCommitRequests =
entry.getValue().headMap(maxCommittedCheckpointId, true);
LOG.debug(
"Skipping {} commit requests: {}", skippedCommitRequests.size(), skippedCommitRequests);
// Mark the already committed FilesCommittable(s) as finished
skippedCommitRequests
.values()
.forEach(list -> list.forEach(CommitRequest::signalAlreadyCommitted));

NavigableMap<Long, List<CommitRequest<DynamicCommittable>>> uncommitted =
entry.getValue().tailMap(maxCommittedCheckpointId, false);
if (!uncommitted.isEmpty()) {
commitPendingRequests(
table, entry.getKey().branch(), uncommitted, last.jobId(), last.operatorId());

List<Map.Entry<JobOperatorKey, NavigableMap<Long, List<CommitRequest<DynamicCommittable>>>>>
jobEntries = Lists.newArrayList(tableEntry.getValue().entrySet());
// Preserve checkpoint order across groups so that older-jobId commits land before newer-jobId
// ones when the batch mixes committables from different jobIds (e.g. state replay after a
// restart). Within a (jobId, operatorId) group, checkpoint order is already guaranteed by
// the inner NavigableMap.
jobEntries.sort(Comparator.comparingLong(entry -> entry.getValue().firstKey()));

for (Map.Entry<JobOperatorKey, NavigableMap<Long, List<CommitRequest<DynamicCommittable>>>>
jobEntry : jobEntries) {
JobOperatorKey jobKey = jobEntry.getKey();
long maxCommittedCheckpointId =
getMaxCommittedCheckpointId(ancestors, jobKey.jobId(), jobKey.operatorId());

NavigableMap<Long, List<CommitRequest<DynamicCommittable>>> skippedCommitRequests =
jobEntry.getValue().headMap(maxCommittedCheckpointId, true);
LOG.debug(
"Skipping {} commit requests: {}", skippedCommitRequests.size(), skippedCommitRequests);
// Mark the already committed FilesCommittable(s) as finished
skippedCommitRequests
.values()
.forEach(list -> list.forEach(CommitRequest::signalAlreadyCommitted));

NavigableMap<Long, List<CommitRequest<DynamicCommittable>>> uncommitted =
jobEntry.getValue().tailMap(maxCommittedCheckpointId, false);
if (!uncommitted.isEmpty()) {
commitPendingRequests(
table, tableKey.branch(), uncommitted, jobKey.jobId(), jobKey.operatorId());
}
}
}
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,57 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.iceberg.flink.sink.dynamic;

import java.util.Objects;

class JobOperatorKey {
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe SinkOperatorKey?

Copy link
Copy Markdown
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think JobOperatorKey is better as it contains jobId and operatorId. SinkOperatorKey reads like a key that identifies just the operator within the sink, which would imply only operatorId.

private final String jobId;
private final String operatorId;

JobOperatorKey(DynamicCommittable committable) {
this.jobId = committable.jobId();
this.operatorId = committable.operatorId();
}

String jobId() {
return jobId;
}

String operatorId() {
return operatorId;
}

@Override
public boolean equals(Object other) {
if (this == other) {
return true;
}

if (!(other instanceof JobOperatorKey that)) {
return false;
}

return jobId.equals(that.jobId) && operatorId.equals(that.operatorId);
}

@Override
public int hashCode() {
return Objects.hash(jobId, operatorId);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -33,11 +33,6 @@ class TableKey {
this.branch = branch;
}

TableKey(DynamicCommittable committable) {
this.tableName = committable.key().tableName();
this.branch = committable.key().branch();
}

String tableName() {
return tableName;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -326,6 +326,101 @@ void testSkipsCommitRequestsForPreviousCheckpoints() throws Exception {
.build());
}

@Test
void testSkipsAlreadyCommittedDataAfterJobIdChanges() throws Exception {
Table table = catalog.loadTable(TableIdentifier.of(TABLE1));
assertThat(table.snapshots()).isEmpty();

boolean overwriteMode = false;
int workerPoolSize = 1;
String uidPrefix = "uidPrefix";
UnregisteredMetricsGroup metricGroup = new UnregisteredMetricsGroup();
JobID previousJobId = JobID.generate();
DynamicCommitterMetrics previousCommitterMetrics = new DynamicCommitterMetrics(metricGroup);
DynamicCommitter previousCommitter =
new DynamicCommitter(
CATALOG_EXTENSION.catalog(),
Maps.newHashMap(),
overwriteMode,
workerPoolSize,
uidPrefix,
previousCommitterMetrics);

DynamicWriteResultAggregator aggregator =
new DynamicWriteResultAggregator(CATALOG_EXTENSION.catalogLoader(), cacheMaximumSize);
OneInputStreamOperatorTestHarness aggregatorHarness =
new OneInputStreamOperatorTestHarness(aggregator);
aggregatorHarness.open();

TableKey tableKey = new TableKey(TABLE1, "branch");
// Operator id is stable across Flink job restarts, jobId is not.
final String operatorId = new OperatorID().toHexString();
final String previousJobIdStr = previousJobId.toHexString();
final int previousCheckpointId = 10;

byte[][] previousManifests =
aggregator.writeToManifests(
tableKey.tableName(), WRITE_RESULT_BY_SPEC, previousCheckpointId);

DynamicCommittable previousCommittable =
new DynamicCommittable(
tableKey, previousManifests, previousJobIdStr, operatorId, previousCheckpointId);
previousCommitter.commit(Sets.newHashSet(new MockCommitRequest<>(previousCommittable)));

table.refresh();
assertThat(table.snapshots()).hasSize(1);

JobID newJobId = JobID.generate();
DynamicCommitterMetrics newCommitterMetrics = new DynamicCommitterMetrics(metricGroup);
DynamicCommitter newCommitter =
new DynamicCommitter(
CATALOG_EXTENSION.catalog(),
Maps.newHashMap(),
overwriteMode,
workerPoolSize,
uidPrefix,
newCommitterMetrics);

final String newJobIdStr = newJobId.toHexString();
final int newCheckpointId = previousCheckpointId + 1;

byte[][] previousManifestsNew =
aggregator.writeToManifests(
tableKey.tableName(), WRITE_RESULT_BY_SPEC, previousCheckpointId);
byte[][] newManifests =
aggregator.writeToManifests(tableKey.tableName(), WRITE_RESULT_BY_SPEC_2, newCheckpointId);

CommitRequest<DynamicCommittable> replayedPreviousCommitRequest =
new MockCommitRequest<>(
new DynamicCommittable(
tableKey,
previousManifestsNew,
previousJobIdStr,
operatorId,
previousCheckpointId));
CommitRequest<DynamicCommittable> newCommitRequest =
new MockCommitRequest<>(
new DynamicCommittable(
tableKey, newManifests, newJobIdStr, operatorId, newCheckpointId));

newCommitter.commit(Sets.newHashSet(replayedPreviousCommitRequest, newCommitRequest));

table.refresh();
assertThat(table.snapshots()).hasSize(2);

Snapshot first = Iterables.get(table.snapshots(), 0);
assertThat(first.summary())
.containsEntry("flink.job-id", previousJobIdStr)
.containsEntry("flink.max-committed-checkpoint-id", String.valueOf(previousCheckpointId))
.containsEntry("flink.operator-id", operatorId);

Snapshot second = Iterables.get(table.snapshots(), 1);
assertThat(second.summary())
.containsEntry("flink.job-id", newJobIdStr)
.containsEntry("flink.max-committed-checkpoint-id", String.valueOf(newCheckpointId))
.containsEntry("flink.operator-id", operatorId);
}

@Test
void testCommitDeleteInDifferentFormatVersion() throws Exception {
Table table1 = catalog.loadTable(TableIdentifier.of(TABLE1));
Expand Down Expand Up @@ -512,8 +607,7 @@ void testTableBranchAtomicCommitForAppendOnlyData() throws Exception {
// Two committables, one for each snapshot / table / branch.
assertThat(table.snapshots()).hasSize(2);

Snapshot snapshot1 = Iterables.getFirst(table.snapshots(), null);
assertThat(snapshot1.snapshotId()).isEqualTo(table.refs().get("branch1").snapshotId());
Snapshot snapshot1 = table.snapshot(table.refs().get("branch1").snapshotId());
assertThat(snapshot1.summary())
.containsAllEntriesOf(
ImmutableMap.<String, String>builder()
Expand All @@ -531,8 +625,7 @@ void testTableBranchAtomicCommitForAppendOnlyData() throws Exception {
.put("total-records", "66")
.build());

Snapshot snapshot2 = Iterables.get(table.snapshots(), 1);
assertThat(snapshot2.snapshotId()).isEqualTo(table.refs().get("branch2").snapshotId());
Snapshot snapshot2 = table.snapshot(table.refs().get("branch2").snapshotId());
assertThat(snapshot2.summary())
.containsAllEntriesOf(
ImmutableMap.<String, String>builder()
Expand Down Expand Up @@ -898,7 +991,10 @@ void testThrowsValidationExceptionOnDuplicateCommit(boolean overwriteMode) throw
}

interface CommitHook extends Serializable {
default void beforeCommit(Collection<CommitRequest<DynamicCommittable>> commitRequests) {}
default Collection<CommitRequest<DynamicCommittable>> beforeCommit(
Collection<CommitRequest<DynamicCommittable>> commitRequests) {
return commitRequests;
}

default void beforeCommitOperation() {}

Expand All @@ -919,11 +1015,14 @@ static class FailBeforeAndAfterCommit implements CommitHook {
}

@Override
public void beforeCommit(Collection<CommitRequest<DynamicCommittable>> ignored) {
public Collection<CommitRequest<DynamicCommittable>> beforeCommit(
Collection<CommitRequest<DynamicCommittable>> requests) {
if (!failedBeforeCommit) {
failedBeforeCommit = true;
throw new RuntimeException("Failing before commit");
}

return requests;
}

@Override
Expand Down Expand Up @@ -977,8 +1076,7 @@ static class CommitHookEnabledDynamicCommitter extends DynamicCommitter {
@Override
public void commit(Collection<CommitRequest<DynamicCommittable>> commitRequests)
throws IOException, InterruptedException {
commitHook.beforeCommit(commitRequests);
super.commit(commitRequests);
super.commit(commitHook.beforeCommit(commitRequests));
commitHook.afterCommit();
}

Expand Down
Loading
Loading