diff --git a/flink/v1.20/flink/src/main/java/org/apache/iceberg/flink/sink/dynamic/DynamicIcebergSink.java b/flink/v1.20/flink/src/main/java/org/apache/iceberg/flink/sink/dynamic/DynamicIcebergSink.java index 218fa2d911c8..596a3d2792ae 100644 --- a/flink/v1.20/flink/src/main/java/org/apache/iceberg/flink/sink/dynamic/DynamicIcebergSink.java +++ b/flink/v1.20/flink/src/main/java/org/apache/iceberg/flink/sink/dynamic/DynamicIcebergSink.java @@ -44,6 +44,7 @@ import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.datastream.DataStreamSink; import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator; +import org.apache.flink.streaming.runtime.operators.sink.SinkWriterOperatorFactory; import org.apache.flink.table.data.RowData; import org.apache.flink.util.OutputTag; import org.apache.iceberg.Table; @@ -79,13 +80,17 @@ public class DynamicIcebergSink private final Configuration flinkConfig; private final int cacheMaximumSize; + // Set by the builder before sinkTo() — forward writer results to union into pre-commit topology + private final transient DataStream> forwardWriteResults; + DynamicIcebergSink( CatalogLoader catalogLoader, Map snapshotProperties, String uidPrefix, Map writeProperties, Configuration flinkConfig, - int cacheMaximumSize) { + int cacheMaximumSize, + DataStream> forwardWriteResults) { this.catalogLoader = catalogLoader; this.snapshotProperties = snapshotProperties; this.uidPrefix = uidPrefix; @@ -96,6 +101,7 @@ public class DynamicIcebergSink // This is used to separate files generated by different sinks writing the same table. // Also used to generate the aggregator operator name this.sinkId = UUID.randomUUID().toString(); + this.forwardWriteResults = forwardWriteResults; } @SuppressWarnings("deprecation") @@ -145,7 +151,11 @@ public DataStream> addPreCommitTopology( TypeInformation> typeInformation = CommittableMessageTypeInfo.of(this::getCommittableSerializer); - return writeResults + // Union forward writer results with the shuffle writer results + DataStream> allResults = + writeResults.union(forwardWriteResults); + + return allResults .keyBy( committable -> { if (committable instanceof CommittableSummary) { @@ -168,6 +178,56 @@ public SimpleVersionedSerializer getWriteResultSerializer() return new DynamicWriteResultSerializer(); } + /** + * A lightweight Sink used with {@link SinkWriterOperatorFactory} for the forward write path. + * Implements {@link SupportsCommitter} so that {@code SinkWriterOperator} emits committables + * downstream. The committer is never called — committing is handled by the main sink. + */ + @VisibleForTesting + static class ForwardWriterSink + implements Sink, SupportsCommitter { + + private final CatalogLoader catalogLoader; + private final Map writeProperties; + private final Configuration flinkConfig; + private final int cacheMaximumSize; + + ForwardWriterSink( + CatalogLoader catalogLoader, + Map writeProperties, + Configuration flinkConfig, + int cacheMaximumSize) { + this.catalogLoader = catalogLoader; + this.writeProperties = writeProperties; + this.flinkConfig = flinkConfig; + this.cacheMaximumSize = cacheMaximumSize; + } + + @SuppressWarnings("deprecation") + @Override + public SinkWriter createWriter(InitContext context) throws IOException { + return new DynamicWriter( + catalogLoader.loadCatalog(), + writeProperties, + flinkConfig, + cacheMaximumSize, + new DynamicWriterMetrics(context.metricGroup()), + context.getSubtaskId(), + context.getAttemptNumber()); + } + + @Override + public Committer createCommitter(CommitterInitContext context) { + throw new UnsupportedOperationException( + "WriterSink is used only for writing; committing is handled by the main sink"); + } + + @Override + public SimpleVersionedSerializer getCommittableSerializer() { + return new DynamicWriteResultSerializer(); + } + } + public static class Builder { private DataStream input; private DynamicRecordGenerator generator; @@ -358,43 +418,79 @@ private String operatorName(String suffix) { return uidPrefix != null ? uidPrefix + "-" + suffix : suffix; } - private DynamicIcebergSink build() { + private DynamicIcebergSink build( + SingleOutputStreamOperator converted, + DynamicRecordInternalType sideOutputType) { Preconditions.checkArgument( generator != null, "Please use withGenerator() to convert the input DataStream."); Preconditions.checkNotNull(catalogLoader, "Catalog loader shouldn't be null"); - uidPrefix = Optional.ofNullable(uidPrefix).orElse(""); - Configuration flinkConfig = readableConfig instanceof Configuration ? (Configuration) readableConfig : Configuration.fromMap(readableConfig.toMap()); - return instantiateSink(writeOptions, flinkConfig); + // Forward writer: chained with generator via forward edge, no data shuffle + ForwardWriterSink forwardWriterSink = + new ForwardWriterSink(catalogLoader, writeOptions, flinkConfig, cacheMaximumSize); + TypeInformation> writeResultTypeInfo = + CommittableMessageTypeInfo.of(DynamicWriteResultSerializer::new); + + DataStream> forwardWriteResults = + converted + .getSideOutput( + new OutputTag<>(DynamicRecordProcessor.DYNAMIC_FORWARD_STREAM, sideOutputType)) + .transform( + operatorName("Forward-Writer"), + writeResultTypeInfo, + new SinkWriterOperatorFactory<>(forwardWriterSink)) + .uid(prefixIfNotNull(uidPrefix, "-forward-writer")); + + // Inject forward write results into sink — they'll be unioned in addPreCommitTopology + return instantiateSink(writeOptions, flinkConfig, forwardWriteResults); } @VisibleForTesting DynamicIcebergSink instantiateSink( - Map writeProperties, Configuration flinkWriteConf) { + Map writeProperties, + Configuration flinkWriteConf, + DataStream> forwardWriteResults) { return new DynamicIcebergSink( catalogLoader, snapshotSummary, uidPrefix, writeProperties, flinkWriteConf, - cacheMaximumSize); + cacheMaximumSize, + forwardWriteResults); } /** * Append the iceberg sink operators to write records to iceberg table. * + *

The topology splits records by distribution mode: + * + *

    + *
  • Forward records ({@code null} distributionMode) go through a forward edge to a chained + * writer, avoiding any data shuffle. + *
  • Shuffle records (non-null distributionMode) go through the standard Sink2 pipeline with + * hash/round-robin distribution. + *
+ * + * Both writers feed into a single shared pre-commit aggregator and committer, ensuring atomic + * commits across both paths. + * * @return {@link DataStreamSink} for sink. */ public DataStreamSink append() { + uidPrefix = Optional.ofNullable(uidPrefix).orElse(""); + DynamicRecordInternalType type = new DynamicRecordInternalType(catalogLoader, false, cacheMaximumSize); - DynamicIcebergSink sink = build(); + DynamicRecordInternalType sideOutputType = + new DynamicRecordInternalType(catalogLoader, true, cacheMaximumSize); + SingleOutputStreamOperator converted = input .process( @@ -412,12 +508,14 @@ public DataStreamSink append() { .name(operatorName("generator")) .returns(type); - DataStreamSink rowDataDataStreamSink = + DynamicIcebergSink sink = build(converted, sideOutputType); + + // Shuffle path: table update side output + main output → sinkTo() + DataStream shuffleInput = converted .getSideOutput( new OutputTag<>( - DynamicRecordProcessor.DYNAMIC_TABLE_UPDATE_STREAM, - new DynamicRecordInternalType(catalogLoader, true, cacheMaximumSize))) + DynamicRecordProcessor.DYNAMIC_TABLE_UPDATE_STREAM, sideOutputType)) .keyBy((KeySelector) DynamicRecordInternal::tableName) .map( new DynamicTableUpdateOperator( @@ -431,16 +529,19 @@ public DataStreamSink append() { .uid(prefixIfNotNull(uidPrefix, "-updater")) .name(operatorName("Updater")) .returns(type) - .union(converted) - .sinkTo(sink) + .union(converted); + + DataStreamSink result = + shuffleInput + .sinkTo(sink) // Forward write results are implicitly injected here .uid(prefixIfNotNull(uidPrefix, "-sink")); FlinkWriteConf flinkWriteConf = new FlinkWriteConf(writeOptions, readableConfig); if (flinkWriteConf.writeParallelism() != null) { - rowDataDataStreamSink.setParallelism(flinkWriteConf.writeParallelism()); + result.setParallelism(flinkWriteConf.writeParallelism()); } - return rowDataDataStreamSink; + return result; } } diff --git a/flink/v1.20/flink/src/main/java/org/apache/iceberg/flink/sink/dynamic/DynamicRecord.java b/flink/v1.20/flink/src/main/java/org/apache/iceberg/flink/sink/dynamic/DynamicRecord.java index 9f445766083e..15b83a589382 100644 --- a/flink/v1.20/flink/src/main/java/org/apache/iceberg/flink/sink/dynamic/DynamicRecord.java +++ b/flink/v1.20/flink/src/main/java/org/apache/iceberg/flink/sink/dynamic/DynamicRecord.java @@ -34,20 +34,40 @@ public class DynamicRecord { private Schema schema; private RowData rowData; private PartitionSpec partitionSpec; - private DistributionMode distributionMode; + @Nullable private DistributionMode distributionMode; private int writeParallelism; private boolean upsertMode; @Nullable private Set equalityFields; /** - * Constructs a new DynamicRecord. + * Constructs a new DynamicRecord with forward (no shuffle) writes. * * @param tableIdentifier The target table identifier. * @param branch The target table branch. * @param schema The target table schema. * @param rowData The data matching the provided schema. * @param partitionSpec The target table {@link PartitionSpec}. - * @param distributionMode The {@link DistributionMode}. + */ + public DynamicRecord( + TableIdentifier tableIdentifier, + String branch, + Schema schema, + RowData rowData, + PartitionSpec partitionSpec) { + this(tableIdentifier, branch, schema, rowData, partitionSpec, null, -1); + } + + /** + * Constructs a new DynamicRecord. This record will be shuffled as specified by {@code + * distributionMode}. + * + * @param tableIdentifier The target table identifier. + * @param branch The target table branch. + * @param schema The target table schema. + * @param rowData The data matching the provided schema. + * @param partitionSpec The target table {@link PartitionSpec}. + * @param distributionMode The {@link DistributionMode}. {@code null} indicates forward (no + * shuffle) writes. * @param writeParallelism The number of parallel writers. Can be set to any value {@literal > 0}, * but will always be automatically capped by the maximum write parallelism, which is the * parallelism of the sink. Set to Integer.MAX_VALUE for always using the maximum available diff --git a/flink/v1.20/flink/src/main/java/org/apache/iceberg/flink/sink/dynamic/DynamicRecordProcessor.java b/flink/v1.20/flink/src/main/java/org/apache/iceberg/flink/sink/dynamic/DynamicRecordProcessor.java index 07dfad2780f7..fc6892b2cd9e 100644 --- a/flink/v1.20/flink/src/main/java/org/apache/iceberg/flink/sink/dynamic/DynamicRecordProcessor.java +++ b/flink/v1.20/flink/src/main/java/org/apache/iceberg/flink/sink/dynamic/DynamicRecordProcessor.java @@ -37,6 +37,8 @@ class DynamicRecordProcessor extends ProcessFunction generator; private final CatalogLoader catalogLoader; private final boolean immediateUpdate; @@ -51,6 +53,7 @@ class DynamicRecordProcessor extends ProcessFunction updateStream; + private transient OutputTag forwardStream; private transient Collector collector; private transient Context context; @@ -90,9 +93,14 @@ public void open(OpenContext openContext) throws Exception { this.hashKeyGenerator = new HashKeyGenerator( cacheMaximumSize, getRuntimeContext().getTaskInfo().getMaxNumberOfParallelSubtasks()); - if (immediateUpdate) { - updater = new TableUpdater(tableCache, catalog, caseSensitive, dropUnusedColumns); - } else { + // Always create updater — needed for forced immediate updates on forward records + this.updater = new TableUpdater(tableCache, catalog, caseSensitive, dropUnusedColumns); + // Always create forward stream tag for forward (distributionMode == null) records + this.forwardStream = + new OutputTag<>( + DYNAMIC_FORWARD_STREAM, + new DynamicRecordInternalType(catalogLoader, true, cacheMaximumSize)) {}; + if (!immediateUpdate) { updateStream = new OutputTag<>( DYNAMIC_TABLE_UPDATE_STREAM, @@ -112,6 +120,8 @@ public void processElement(T element, Context ctx, Collector newData = updater.update( data.tableIdentifier(), data.branch(), data.schema(), data.spec(), tableCreator); emit( - collector, data, newData.f0.resolvedTableSchema(), newData.f0.recordConverter(), - newData.f1); + newData.f1, + isForward); } else { + // Shuffled records with immediateUpdate=false go to the update side output int writerKey = hashKeyGenerator.generateKey( data, @@ -159,33 +174,38 @@ public void collect(DynamicRecord data) { } } else { emit( - collector, data, foundSchema.resolvedTableSchema(), foundSchema.recordConverter(), - foundSpec); + foundSpec, + isForward); } } private void emit( - Collector out, DynamicRecord data, Schema schema, DataConverter recordConverter, - PartitionSpec spec) { + PartitionSpec spec, + boolean forward) { RowData rowData = (RowData) recordConverter.convert(data.rowData()); - int writerKey = hashKeyGenerator.generateKey(data, schema, spec, rowData); - String tableName = data.tableIdentifier().toString(); - out.collect( + // writerKey is unused in the forward path. + int writerKey = forward ? -1 : hashKeyGenerator.generateKey(data, schema, spec, rowData); + DynamicRecordInternal record = new DynamicRecordInternal( - tableName, + data.tableIdentifier().toString(), data.branch(), schema, rowData, spec, writerKey, data.upsertMode(), - DynamicSinkUtil.getEqualityFieldIds(data.equalityFields(), schema))); + DynamicSinkUtil.getEqualityFieldIds(data.equalityFields(), schema)); + if (forward) { + context.output(forwardStream, record); + } else { + collector.collect(record); + } } @Override diff --git a/flink/v1.20/flink/src/test/java/org/apache/iceberg/flink/sink/dynamic/TestDynamicIcebergSink.java b/flink/v1.20/flink/src/test/java/org/apache/iceberg/flink/sink/dynamic/TestDynamicIcebergSink.java index 27b1e3d84a8c..4337a2f83c89 100644 --- a/flink/v1.20/flink/src/test/java/org/apache/iceberg/flink/sink/dynamic/TestDynamicIcebergSink.java +++ b/flink/v1.20/flink/src/test/java/org/apache/iceberg/flink/sink/dynamic/TestDynamicIcebergSink.java @@ -44,6 +44,8 @@ import org.apache.flink.configuration.RestartStrategyOptions; import org.apache.flink.metrics.groups.UnregisteredMetricsGroup; import org.apache.flink.runtime.client.JobExecutionException; +import org.apache.flink.runtime.jobgraph.JobVertex; +import org.apache.flink.streaming.api.connector.sink2.CommittableMessage; import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.datastream.DataStreamSource; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; @@ -213,6 +215,56 @@ public void generate(DynamicIcebergDataImpl row, Collector out) { } } + /** Generator that always emits forward (null distributionMode) records. */ + private static class ForwardGenerator implements DynamicRecordGenerator { + @Override + public void generate(DynamicIcebergDataImpl row, Collector out) { + TableIdentifier tableIdentifier = TableIdentifier.of(DATABASE, row.tableName); + Schema schema = row.schemaProvided; + PartitionSpec spec = row.partitionSpec; + DynamicRecord dynamicRecord = + new DynamicRecord( + tableIdentifier, + row.branch, + schema, + converter(schema).toInternal(row.rowProvided), + spec); + dynamicRecord.setUpsertMode(row.upsertMode); + dynamicRecord.setEqualityFields(row.equalityFields); + out.collect(dynamicRecord); + } + } + + /** + * Generator that alternates between forward (null distributionMode) and shuffle records. Even + * indices go forward, odd indices go through shuffle. + */ + private static class MixedGenerator implements DynamicRecordGenerator { + private int count = 0; + + @Override + public void generate(DynamicIcebergDataImpl row, Collector out) { + TableIdentifier tableIdentifier = TableIdentifier.of(DATABASE, row.tableName); + Schema schema = row.schemaProvided; + PartitionSpec spec = row.partitionSpec; + boolean forward = (count++ % 2 == 0); + DistributionMode mode = + forward ? null : (spec.isPartitioned() ? DistributionMode.HASH : DistributionMode.NONE); + DynamicRecord dynamicRecord = + new DynamicRecord( + tableIdentifier, + row.branch, + schema, + converter(schema).toInternal(row.rowProvided), + spec, + mode, + 10); + dynamicRecord.setUpsertMode(row.upsertMode); + dynamicRecord.setEqualityFields(row.equalityFields); + out.collect(dynamicRecord); + } + } + private static DataFormatConverters.RowConverter converter(Schema schema) { RowType rowType = FlinkSchemaUtil.convert(schema); ResolvedSchema resolvedSchema = FlinkSchemaUtil.toResolvedSchema(rowType); @@ -238,6 +290,85 @@ void testWrite() throws Exception { runTest(rows); } + @Test + void testNoShuffleTopology() throws Exception { + DataStream dataStream = + env.fromData( + Collections.emptyList(), TypeInformation.of(new TypeHint() {})); + DynamicIcebergSink.forInput(dataStream) + .generator(new ForwardGenerator()) + .catalogLoader(CATALOG_EXTENSION.catalogLoader()) + .writeParallelism(2) + .immediateTableUpdate(false) + .overwrite(false) + .append(); + + boolean generatorAndSinkChained = false; + for (JobVertex vertex : env.getStreamGraph().getJobGraph().getVertices()) { + String vertexName = vertex.getName(); + boolean generatorInThisVertex = vertexName.contains("-generator"); + boolean sinkInThisVertex = vertexName.contains("-Forward-Writer"); + + generatorAndSinkChained = generatorInThisVertex && sinkInThisVertex; + if (generatorAndSinkChained) { + break; + } + } + + assertThat(generatorAndSinkChained).isTrue(); + } + + @Test + void testForwardWrite() throws Exception { + runForwardWriteTest(new ForwardGenerator()); + } + + @Test + void testMixedForwardAndShuffleWrite() throws Exception { + runForwardWriteTest(new MixedGenerator()); + } + + private void runForwardWriteTest(DynamicRecordGenerator generator) + throws Exception { + List rows = + Lists.newArrayList( + new DynamicIcebergDataImpl( + SimpleDataUtil.SCHEMA, + "t1", + SnapshotRef.MAIN_BRANCH, + PartitionSpec.unpartitioned()), + new DynamicIcebergDataImpl( + SimpleDataUtil.SCHEMA, + "t1", + SnapshotRef.MAIN_BRANCH, + PartitionSpec.unpartitioned()), + new DynamicIcebergDataImpl( + SimpleDataUtil.SCHEMA, + "t1", + SnapshotRef.MAIN_BRANCH, + PartitionSpec.unpartitioned()), + new DynamicIcebergDataImpl( + SimpleDataUtil.SCHEMA, + "t1", + SnapshotRef.MAIN_BRANCH, + PartitionSpec.unpartitioned())); + + DataStream dataStream = + env.fromData(rows, TypeInformation.of(new TypeHint<>() {})); + env.setParallelism(1); + + DynamicIcebergSink.forInput(dataStream) + .generator(generator) + .catalogLoader(CATALOG_EXTENSION.catalogLoader()) + .writeParallelism(1) + .immediateTableUpdate(true) + .append(); + + env.execute(); + + verifyResults(rows); + } + @Test void testWritePartitioned() throws Exception { PartitionSpec spec = PartitionSpec.builderFor(SimpleDataUtil.SCHEMA).bucket("id", 10).build(); @@ -1170,8 +1301,9 @@ void testOperatorUidsFormat() { // pre commit topology was off, but since it is stateless, users will still be able to restore // state, but we must keep the stateful operators UUIds like the committer consistent. assertThat(sinkUids) - .contains( + .containsOnly( "test--sink", + "test--forward-writer", "test--generator", "test--updater", "test--sink: test--pre-commit-topology", @@ -1179,8 +1311,9 @@ void testOperatorUidsFormat() { sinkUids = createSinkAndReturnUIds(""); assertThat(sinkUids) - .contains( + .containsOnly( "--sink", + "--forward-writer", "--generator", "--updater", "--sink: --pre-commit-topology", @@ -1188,8 +1321,9 @@ void testOperatorUidsFormat() { sinkUids = createSinkAndReturnUIds(null); assertThat(sinkUids) - .contains( + .containsOnly( "--sink", + "--forward-writer", "--generator", "--updater", "--sink: --pre-commit-topology", @@ -1359,7 +1493,9 @@ static class CommitHookEnabledDynamicIcebergSink extends DynamicIcebergSink.B @Override DynamicIcebergSink instantiateSink( - Map writeProperties, Configuration flinkConfig) { + Map writeProperties, + Configuration flinkConfig, + DataStream> forwardWriteResults) { return new CommitHookDynamicIcebergSink( commitHook, CATALOG_EXTENSION.catalogLoader(), @@ -1367,7 +1503,8 @@ DynamicIcebergSink instantiateSink( "uidPrefix", writeProperties, flinkConfig, - 100); + 100, + forwardWriteResults); } } @@ -1383,14 +1520,16 @@ static class CommitHookDynamicIcebergSink extends DynamicIcebergSink { String uidPrefix, Map writeProperties, Configuration flinkConfig, - int cacheMaximumSize) { + int cacheMaximumSize, + DataStream> forwardWritten) { super( catalogLoader, snapshotProperties, uidPrefix, writeProperties, flinkConfig, - cacheMaximumSize); + cacheMaximumSize, + forwardWritten); this.commitHook = commitHook; this.overwriteMode = new FlinkWriteConf(writeProperties, flinkConfig).overwriteMode(); } diff --git a/flink/v2.0/flink/src/main/java/org/apache/iceberg/flink/sink/dynamic/DynamicIcebergSink.java b/flink/v2.0/flink/src/main/java/org/apache/iceberg/flink/sink/dynamic/DynamicIcebergSink.java index 4b5c9bef41e1..6f5fb945a165 100644 --- a/flink/v2.0/flink/src/main/java/org/apache/iceberg/flink/sink/dynamic/DynamicIcebergSink.java +++ b/flink/v2.0/flink/src/main/java/org/apache/iceberg/flink/sink/dynamic/DynamicIcebergSink.java @@ -44,6 +44,7 @@ import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.datastream.DataStreamSink; import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator; +import org.apache.flink.streaming.runtime.operators.sink.SinkWriterOperatorFactory; import org.apache.flink.table.data.RowData; import org.apache.flink.util.OutputTag; import org.apache.iceberg.Table; @@ -79,13 +80,17 @@ public class DynamicIcebergSink private final Configuration flinkConfig; private final int cacheMaximumSize; + // Set by the builder before sinkTo() — forward writer results to union into pre-commit topology + private final transient DataStream> forwardWriteResults; + DynamicIcebergSink( CatalogLoader catalogLoader, Map snapshotProperties, String uidPrefix, Map writeProperties, Configuration flinkConfig, - int cacheMaximumSize) { + int cacheMaximumSize, + DataStream> forwardWriteResults) { this.catalogLoader = catalogLoader; this.snapshotProperties = snapshotProperties; this.uidPrefix = uidPrefix; @@ -96,6 +101,7 @@ public class DynamicIcebergSink // This is used to separate files generated by different sinks writing the same table. // Also used to generate the aggregator operator name this.sinkId = UUID.randomUUID().toString(); + this.forwardWriteResults = forwardWriteResults; } @Override @@ -144,7 +150,11 @@ public DataStream> addPreCommitTopology( TypeInformation> typeInformation = CommittableMessageTypeInfo.of(this::getCommittableSerializer); - return writeResults + // Union forward writer results with the shuffle writer results + DataStream> allResults = + writeResults.union(forwardWriteResults); + + return allResults .keyBy( committable -> { if (committable instanceof CommittableSummary) { @@ -167,6 +177,55 @@ public SimpleVersionedSerializer getWriteResultSerializer() return new DynamicWriteResultSerializer(); } + /** + * A lightweight Sink used with {@link SinkWriterOperatorFactory} for the forward write path. + * Implements {@link SupportsCommitter} so that {@code SinkWriterOperator} emits committables + * downstream. The committer is never called — committing is handled by the main sink. + */ + @VisibleForTesting + static class ForwardWriterSink + implements Sink, SupportsCommitter { + + private final CatalogLoader catalogLoader; + private final Map writeProperties; + private final Configuration flinkConfig; + private final int cacheMaximumSize; + + ForwardWriterSink( + CatalogLoader catalogLoader, + Map writeProperties, + Configuration flinkConfig, + int cacheMaximumSize) { + this.catalogLoader = catalogLoader; + this.writeProperties = writeProperties; + this.flinkConfig = flinkConfig; + this.cacheMaximumSize = cacheMaximumSize; + } + + @Override + public SinkWriter createWriter(WriterInitContext context) { + return new DynamicWriter( + catalogLoader.loadCatalog(), + writeProperties, + flinkConfig, + cacheMaximumSize, + new DynamicWriterMetrics(context.metricGroup()), + context.getTaskInfo().getIndexOfThisSubtask(), + context.getTaskInfo().getAttemptNumber()); + } + + @Override + public Committer createCommitter(CommitterInitContext context) { + throw new UnsupportedOperationException( + "WriterSink is used only for writing; committing is handled by the main sink"); + } + + @Override + public SimpleVersionedSerializer getCommittableSerializer() { + return new DynamicWriteResultSerializer(); + } + } + public static class Builder { private DataStream input; private DynamicRecordGenerator generator; @@ -357,43 +416,79 @@ private String operatorName(String suffix) { return uidPrefix != null ? uidPrefix + "-" + suffix : suffix; } - private DynamicIcebergSink build() { + private DynamicIcebergSink build( + SingleOutputStreamOperator converted, + DynamicRecordInternalType sideOutputType) { Preconditions.checkArgument( generator != null, "Please use withGenerator() to convert the input DataStream."); Preconditions.checkNotNull(catalogLoader, "Catalog loader shouldn't be null"); - uidPrefix = Optional.ofNullable(uidPrefix).orElse(""); - Configuration flinkConfig = readableConfig instanceof Configuration ? (Configuration) readableConfig : Configuration.fromMap(readableConfig.toMap()); - return instantiateSink(writeOptions, flinkConfig); + // Forward writer: chained with generator via forward edge, no data shuffle + ForwardWriterSink forwardWriterSink = + new ForwardWriterSink(catalogLoader, writeOptions, flinkConfig, cacheMaximumSize); + TypeInformation> writeResultTypeInfo = + CommittableMessageTypeInfo.of(DynamicWriteResultSerializer::new); + + DataStream> forwardWriteResults = + converted + .getSideOutput( + new OutputTag<>(DynamicRecordProcessor.DYNAMIC_FORWARD_STREAM, sideOutputType)) + .transform( + operatorName("Forward-Writer"), + writeResultTypeInfo, + new SinkWriterOperatorFactory<>(forwardWriterSink)) + .uid(prefixIfNotNull(uidPrefix, "-forward-writer")); + + // Inject forward write results into sink — they'll be unioned in addPreCommitTopology + return instantiateSink(writeOptions, flinkConfig, forwardWriteResults); } @VisibleForTesting DynamicIcebergSink instantiateSink( - Map writeProperties, Configuration flinkWriteConf) { + Map writeProperties, + Configuration flinkWriteConf, + DataStream> forwardWriteResults) { return new DynamicIcebergSink( catalogLoader, snapshotSummary, uidPrefix, writeProperties, flinkWriteConf, - cacheMaximumSize); + cacheMaximumSize, + forwardWriteResults); } /** * Append the iceberg sink operators to write records to iceberg table. * + *

The topology splits records by distribution mode: + * + *

    + *
  • Forward records ({@code null} distributionMode) go through a forward edge to a chained + * writer, avoiding any data shuffle. + *
  • Shuffle records (non-null distributionMode) go through the standard Sink2 pipeline with + * hash/round-robin distribution. + *
+ * + * Both writers feed into a single shared pre-commit aggregator and committer, ensuring atomic + * commits across both paths. + * * @return {@link DataStreamSink} for sink. */ public DataStreamSink append() { + uidPrefix = Optional.ofNullable(uidPrefix).orElse(""); + DynamicRecordInternalType type = new DynamicRecordInternalType(catalogLoader, false, cacheMaximumSize); - DynamicIcebergSink sink = build(); + DynamicRecordInternalType sideOutputType = + new DynamicRecordInternalType(catalogLoader, true, cacheMaximumSize); + SingleOutputStreamOperator converted = input .process( @@ -411,12 +506,14 @@ public DataStreamSink append() { .name(operatorName("generator")) .returns(type); - DataStreamSink rowDataDataStreamSink = + DynamicIcebergSink sink = build(converted, sideOutputType); + + // Shuffle path: table update side output + main output → sinkTo() + DataStream shuffleInput = converted .getSideOutput( new OutputTag<>( - DynamicRecordProcessor.DYNAMIC_TABLE_UPDATE_STREAM, - new DynamicRecordInternalType(catalogLoader, true, cacheMaximumSize))) + DynamicRecordProcessor.DYNAMIC_TABLE_UPDATE_STREAM, sideOutputType)) .keyBy((KeySelector) DynamicRecordInternal::tableName) .map( new DynamicTableUpdateOperator( @@ -430,16 +527,19 @@ public DataStreamSink append() { .uid(prefixIfNotNull(uidPrefix, "-updater")) .name(operatorName("Updater")) .returns(type) - .union(converted) - .sinkTo(sink) + .union(converted); + + DataStreamSink result = + shuffleInput + .sinkTo(sink) // Forward write results are implicitly injected here .uid(prefixIfNotNull(uidPrefix, "-sink")); FlinkWriteConf flinkWriteConf = new FlinkWriteConf(writeOptions, readableConfig); if (flinkWriteConf.writeParallelism() != null) { - rowDataDataStreamSink.setParallelism(flinkWriteConf.writeParallelism()); + result.setParallelism(flinkWriteConf.writeParallelism()); } - return rowDataDataStreamSink; + return result; } } diff --git a/flink/v2.0/flink/src/main/java/org/apache/iceberg/flink/sink/dynamic/DynamicRecord.java b/flink/v2.0/flink/src/main/java/org/apache/iceberg/flink/sink/dynamic/DynamicRecord.java index 9f445766083e..15b83a589382 100644 --- a/flink/v2.0/flink/src/main/java/org/apache/iceberg/flink/sink/dynamic/DynamicRecord.java +++ b/flink/v2.0/flink/src/main/java/org/apache/iceberg/flink/sink/dynamic/DynamicRecord.java @@ -34,20 +34,40 @@ public class DynamicRecord { private Schema schema; private RowData rowData; private PartitionSpec partitionSpec; - private DistributionMode distributionMode; + @Nullable private DistributionMode distributionMode; private int writeParallelism; private boolean upsertMode; @Nullable private Set equalityFields; /** - * Constructs a new DynamicRecord. + * Constructs a new DynamicRecord with forward (no shuffle) writes. * * @param tableIdentifier The target table identifier. * @param branch The target table branch. * @param schema The target table schema. * @param rowData The data matching the provided schema. * @param partitionSpec The target table {@link PartitionSpec}. - * @param distributionMode The {@link DistributionMode}. + */ + public DynamicRecord( + TableIdentifier tableIdentifier, + String branch, + Schema schema, + RowData rowData, + PartitionSpec partitionSpec) { + this(tableIdentifier, branch, schema, rowData, partitionSpec, null, -1); + } + + /** + * Constructs a new DynamicRecord. This record will be shuffled as specified by {@code + * distributionMode}. + * + * @param tableIdentifier The target table identifier. + * @param branch The target table branch. + * @param schema The target table schema. + * @param rowData The data matching the provided schema. + * @param partitionSpec The target table {@link PartitionSpec}. + * @param distributionMode The {@link DistributionMode}. {@code null} indicates forward (no + * shuffle) writes. * @param writeParallelism The number of parallel writers. Can be set to any value {@literal > 0}, * but will always be automatically capped by the maximum write parallelism, which is the * parallelism of the sink. Set to Integer.MAX_VALUE for always using the maximum available diff --git a/flink/v2.0/flink/src/main/java/org/apache/iceberg/flink/sink/dynamic/DynamicRecordProcessor.java b/flink/v2.0/flink/src/main/java/org/apache/iceberg/flink/sink/dynamic/DynamicRecordProcessor.java index 07dfad2780f7..fc6892b2cd9e 100644 --- a/flink/v2.0/flink/src/main/java/org/apache/iceberg/flink/sink/dynamic/DynamicRecordProcessor.java +++ b/flink/v2.0/flink/src/main/java/org/apache/iceberg/flink/sink/dynamic/DynamicRecordProcessor.java @@ -37,6 +37,8 @@ class DynamicRecordProcessor extends ProcessFunction generator; private final CatalogLoader catalogLoader; private final boolean immediateUpdate; @@ -51,6 +53,7 @@ class DynamicRecordProcessor extends ProcessFunction updateStream; + private transient OutputTag forwardStream; private transient Collector collector; private transient Context context; @@ -90,9 +93,14 @@ public void open(OpenContext openContext) throws Exception { this.hashKeyGenerator = new HashKeyGenerator( cacheMaximumSize, getRuntimeContext().getTaskInfo().getMaxNumberOfParallelSubtasks()); - if (immediateUpdate) { - updater = new TableUpdater(tableCache, catalog, caseSensitive, dropUnusedColumns); - } else { + // Always create updater — needed for forced immediate updates on forward records + this.updater = new TableUpdater(tableCache, catalog, caseSensitive, dropUnusedColumns); + // Always create forward stream tag for forward (distributionMode == null) records + this.forwardStream = + new OutputTag<>( + DYNAMIC_FORWARD_STREAM, + new DynamicRecordInternalType(catalogLoader, true, cacheMaximumSize)) {}; + if (!immediateUpdate) { updateStream = new OutputTag<>( DYNAMIC_TABLE_UPDATE_STREAM, @@ -112,6 +120,8 @@ public void processElement(T element, Context ctx, Collector newData = updater.update( data.tableIdentifier(), data.branch(), data.schema(), data.spec(), tableCreator); emit( - collector, data, newData.f0.resolvedTableSchema(), newData.f0.recordConverter(), - newData.f1); + newData.f1, + isForward); } else { + // Shuffled records with immediateUpdate=false go to the update side output int writerKey = hashKeyGenerator.generateKey( data, @@ -159,33 +174,38 @@ public void collect(DynamicRecord data) { } } else { emit( - collector, data, foundSchema.resolvedTableSchema(), foundSchema.recordConverter(), - foundSpec); + foundSpec, + isForward); } } private void emit( - Collector out, DynamicRecord data, Schema schema, DataConverter recordConverter, - PartitionSpec spec) { + PartitionSpec spec, + boolean forward) { RowData rowData = (RowData) recordConverter.convert(data.rowData()); - int writerKey = hashKeyGenerator.generateKey(data, schema, spec, rowData); - String tableName = data.tableIdentifier().toString(); - out.collect( + // writerKey is unused in the forward path. + int writerKey = forward ? -1 : hashKeyGenerator.generateKey(data, schema, spec, rowData); + DynamicRecordInternal record = new DynamicRecordInternal( - tableName, + data.tableIdentifier().toString(), data.branch(), schema, rowData, spec, writerKey, data.upsertMode(), - DynamicSinkUtil.getEqualityFieldIds(data.equalityFields(), schema))); + DynamicSinkUtil.getEqualityFieldIds(data.equalityFields(), schema)); + if (forward) { + context.output(forwardStream, record); + } else { + collector.collect(record); + } } @Override diff --git a/flink/v2.0/flink/src/test/java/org/apache/iceberg/flink/sink/dynamic/TestDynamicIcebergSink.java b/flink/v2.0/flink/src/test/java/org/apache/iceberg/flink/sink/dynamic/TestDynamicIcebergSink.java index 27b1e3d84a8c..2a46f8021cca 100644 --- a/flink/v2.0/flink/src/test/java/org/apache/iceberg/flink/sink/dynamic/TestDynamicIcebergSink.java +++ b/flink/v2.0/flink/src/test/java/org/apache/iceberg/flink/sink/dynamic/TestDynamicIcebergSink.java @@ -43,7 +43,10 @@ import org.apache.flink.configuration.Configuration; import org.apache.flink.configuration.RestartStrategyOptions; import org.apache.flink.metrics.groups.UnregisteredMetricsGroup; +import org.apache.flink.runtime.OperatorIDPair; import org.apache.flink.runtime.client.JobExecutionException; +import org.apache.flink.runtime.jobgraph.JobVertex; +import org.apache.flink.streaming.api.connector.sink2.CommittableMessage; import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.datastream.DataStreamSource; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; @@ -213,6 +216,56 @@ public void generate(DynamicIcebergDataImpl row, Collector out) { } } + /** Generator that always emits forward (null distributionMode) records. */ + private static class ForwardGenerator implements DynamicRecordGenerator { + @Override + public void generate(DynamicIcebergDataImpl row, Collector out) { + TableIdentifier tableIdentifier = TableIdentifier.of(DATABASE, row.tableName); + Schema schema = row.schemaProvided; + PartitionSpec spec = row.partitionSpec; + DynamicRecord dynamicRecord = + new DynamicRecord( + tableIdentifier, + row.branch, + schema, + converter(schema).toInternal(row.rowProvided), + spec); + dynamicRecord.setUpsertMode(row.upsertMode); + dynamicRecord.setEqualityFields(row.equalityFields); + out.collect(dynamicRecord); + } + } + + /** + * Generator that alternates between forward (null distributionMode) and shuffle records. Even + * indices go forward, odd indices go through shuffle. + */ + private static class MixedGenerator implements DynamicRecordGenerator { + private int count = 0; + + @Override + public void generate(DynamicIcebergDataImpl row, Collector out) { + TableIdentifier tableIdentifier = TableIdentifier.of(DATABASE, row.tableName); + Schema schema = row.schemaProvided; + PartitionSpec spec = row.partitionSpec; + boolean forward = (count++ % 2 == 0); + DistributionMode mode = + forward ? null : (spec.isPartitioned() ? DistributionMode.HASH : DistributionMode.NONE); + DynamicRecord dynamicRecord = + new DynamicRecord( + tableIdentifier, + row.branch, + schema, + converter(schema).toInternal(row.rowProvided), + spec, + mode, + 10); + dynamicRecord.setUpsertMode(row.upsertMode); + dynamicRecord.setEqualityFields(row.equalityFields); + out.collect(dynamicRecord); + } + } + private static DataFormatConverters.RowConverter converter(Schema schema) { RowType rowType = FlinkSchemaUtil.convert(schema); ResolvedSchema resolvedSchema = FlinkSchemaUtil.toResolvedSchema(rowType); @@ -238,6 +291,96 @@ void testWrite() throws Exception { runTest(rows); } + @Test + void testNoShuffleTopology() throws Exception { + DataStream dataStream = + env.fromData( + Collections.emptyList(), TypeInformation.of(new TypeHint() {})); + DynamicIcebergSink.forInput(dataStream) + .generator(new ForwardGenerator()) + .catalogLoader(CATALOG_EXTENSION.catalogLoader()) + .writeParallelism(2) + .immediateTableUpdate(false) + .overwrite(false) + .append(); + + boolean generatorAndSinkChained = false; + for (JobVertex vertex : env.getStreamGraph().getJobGraph().getVertices()) { + boolean generatorInThisVertex = false; + boolean sinkInThisVertex = false; + for (OperatorIDPair operatorID : vertex.getOperatorIDs()) { + String uid = operatorID.getUserDefinedOperatorUid(); + if (uid == null) { + continue; + } + + if (uid.endsWith("-forward-writer")) { + sinkInThisVertex = true; + } else if (uid.endsWith("-generator")) { + generatorInThisVertex = true; + } + } + + generatorAndSinkChained = generatorInThisVertex && sinkInThisVertex; + if (generatorAndSinkChained) { + break; + } + } + + assertThat(generatorAndSinkChained).isTrue(); + } + + @Test + void testForwardWrite() throws Exception { + runForwardWriteTest(new ForwardGenerator()); + } + + @Test + void testMixedForwardAndShuffleWrite() throws Exception { + runForwardWriteTest(new MixedGenerator()); + } + + private void runForwardWriteTest(DynamicRecordGenerator generator) + throws Exception { + List rows = + Lists.newArrayList( + new DynamicIcebergDataImpl( + SimpleDataUtil.SCHEMA, + "t1", + SnapshotRef.MAIN_BRANCH, + PartitionSpec.unpartitioned()), + new DynamicIcebergDataImpl( + SimpleDataUtil.SCHEMA, + "t1", + SnapshotRef.MAIN_BRANCH, + PartitionSpec.unpartitioned()), + new DynamicIcebergDataImpl( + SimpleDataUtil.SCHEMA, + "t1", + SnapshotRef.MAIN_BRANCH, + PartitionSpec.unpartitioned()), + new DynamicIcebergDataImpl( + SimpleDataUtil.SCHEMA, + "t1", + SnapshotRef.MAIN_BRANCH, + PartitionSpec.unpartitioned())); + + DataStream dataStream = + env.fromData(rows, TypeInformation.of(new TypeHint<>() {})); + env.setParallelism(1); + + DynamicIcebergSink.forInput(dataStream) + .generator(generator) + .catalogLoader(CATALOG_EXTENSION.catalogLoader()) + .writeParallelism(1) + .immediateTableUpdate(true) + .append(); + + env.execute(); + + verifyResults(rows); + } + @Test void testWritePartitioned() throws Exception { PartitionSpec spec = PartitionSpec.builderFor(SimpleDataUtil.SCHEMA).bucket("id", 10).build(); @@ -1170,8 +1313,9 @@ void testOperatorUidsFormat() { // pre commit topology was off, but since it is stateless, users will still be able to restore // state, but we must keep the stateful operators UUIds like the committer consistent. assertThat(sinkUids) - .contains( + .containsOnly( "test--sink", + "test--forward-writer", "test--generator", "test--updater", "test--sink: test--pre-commit-topology", @@ -1179,8 +1323,9 @@ void testOperatorUidsFormat() { sinkUids = createSinkAndReturnUIds(""); assertThat(sinkUids) - .contains( + .containsOnly( "--sink", + "--forward-writer", "--generator", "--updater", "--sink: --pre-commit-topology", @@ -1188,8 +1333,9 @@ void testOperatorUidsFormat() { sinkUids = createSinkAndReturnUIds(null); assertThat(sinkUids) - .contains( + .containsOnly( "--sink", + "--forward-writer", "--generator", "--updater", "--sink: --pre-commit-topology", @@ -1359,7 +1505,9 @@ static class CommitHookEnabledDynamicIcebergSink extends DynamicIcebergSink.B @Override DynamicIcebergSink instantiateSink( - Map writeProperties, Configuration flinkConfig) { + Map writeProperties, + Configuration flinkConfig, + DataStream> forwardWriteResults) { return new CommitHookDynamicIcebergSink( commitHook, CATALOG_EXTENSION.catalogLoader(), @@ -1367,7 +1515,8 @@ DynamicIcebergSink instantiateSink( "uidPrefix", writeProperties, flinkConfig, - 100); + 100, + forwardWriteResults); } } @@ -1383,14 +1532,16 @@ static class CommitHookDynamicIcebergSink extends DynamicIcebergSink { String uidPrefix, Map writeProperties, Configuration flinkConfig, - int cacheMaximumSize) { + int cacheMaximumSize, + DataStream> forwardWritten) { super( catalogLoader, snapshotProperties, uidPrefix, writeProperties, flinkConfig, - cacheMaximumSize); + cacheMaximumSize, + forwardWritten); this.commitHook = commitHook; this.overwriteMode = new FlinkWriteConf(writeProperties, flinkConfig).overwriteMode(); }