diff --git a/flink/v1.20/flink/src/main/java/org/apache/iceberg/flink/sink/dynamic/DynamicIcebergSink.java b/flink/v1.20/flink/src/main/java/org/apache/iceberg/flink/sink/dynamic/DynamicIcebergSink.java index 218fa2d911c8..63f5e6191193 100644 --- a/flink/v1.20/flink/src/main/java/org/apache/iceberg/flink/sink/dynamic/DynamicIcebergSink.java +++ b/flink/v1.20/flink/src/main/java/org/apache/iceberg/flink/sink/dynamic/DynamicIcebergSink.java @@ -408,6 +408,7 @@ public DataStreamSink append() { tableCreator, caseSensitive, dropUnusedColumns)) + .setParallelism(input.getParallelism()) .uid(prefixIfNotNull(uidPrefix, "-generator")) .name(operatorName("generator")) .returns(type); diff --git a/flink/v1.20/flink/src/test/java/org/apache/iceberg/flink/sink/dynamic/TestDynamicIcebergSink.java b/flink/v1.20/flink/src/test/java/org/apache/iceberg/flink/sink/dynamic/TestDynamicIcebergSink.java index 27b1e3d84a8c..aa92ae8ceb34 100644 --- a/flink/v1.20/flink/src/test/java/org/apache/iceberg/flink/sink/dynamic/TestDynamicIcebergSink.java +++ b/flink/v1.20/flink/src/test/java/org/apache/iceberg/flink/sink/dynamic/TestDynamicIcebergSink.java @@ -1196,6 +1196,33 @@ void testOperatorUidsFormat() { "Sink Committer: --sink"); } + @Test + void testGeneratorDefaultParallelism() { + StreamExecutionEnvironment streamEnv = StreamExecutionEnvironment.getExecutionEnvironment(); + streamEnv.setParallelism(4); + + DataStreamSource source = + streamEnv.fromData(Collections.emptySet(), TypeInformation.of(new TypeHint<>() {})); + source.setParallelism(8); + + DynamicIcebergSink.forInput(source) + .generator(new Generator()) + .catalogLoader(CATALOG_EXTENSION.catalogLoader()) + .uidPrefix("test") + .append(); + + // Since the generator parallelism is not directly accessible via the returned DataStreamSink, + // inspect the stream graph to verify the generator inherits the input source parallelism. + int generatorParallelism = + streamEnv.getStreamGraph().getStreamNodes().stream() + .filter(node -> "test--generator".equals(node.getTransformationUID())) + .findFirst() + .map(StreamNode::getParallelism) + .orElseThrow(() -> new AssertionError("Generator node not found")); + + assertThat(generatorParallelism).isEqualTo(source.getParallelism()); + } + private Set createSinkAndReturnUIds(String uidPrefix) { StreamExecutionEnvironment streamEnv = StreamExecutionEnvironment.getExecutionEnvironment(); diff --git a/flink/v2.0/flink/src/main/java/org/apache/iceberg/flink/sink/dynamic/DynamicIcebergSink.java b/flink/v2.0/flink/src/main/java/org/apache/iceberg/flink/sink/dynamic/DynamicIcebergSink.java index 4b5c9bef41e1..a00eba492a19 100644 --- a/flink/v2.0/flink/src/main/java/org/apache/iceberg/flink/sink/dynamic/DynamicIcebergSink.java +++ b/flink/v2.0/flink/src/main/java/org/apache/iceberg/flink/sink/dynamic/DynamicIcebergSink.java @@ -407,6 +407,7 @@ public DataStreamSink append() { tableCreator, caseSensitive, dropUnusedColumns)) + .setParallelism(input.getParallelism()) .uid(prefixIfNotNull(uidPrefix, "-generator")) .name(operatorName("generator")) .returns(type); diff --git a/flink/v2.0/flink/src/test/java/org/apache/iceberg/flink/sink/dynamic/TestDynamicIcebergSink.java b/flink/v2.0/flink/src/test/java/org/apache/iceberg/flink/sink/dynamic/TestDynamicIcebergSink.java index 27b1e3d84a8c..aa92ae8ceb34 100644 --- a/flink/v2.0/flink/src/test/java/org/apache/iceberg/flink/sink/dynamic/TestDynamicIcebergSink.java +++ b/flink/v2.0/flink/src/test/java/org/apache/iceberg/flink/sink/dynamic/TestDynamicIcebergSink.java @@ -1196,6 +1196,33 @@ void testOperatorUidsFormat() { "Sink Committer: --sink"); } + @Test + void testGeneratorDefaultParallelism() { + StreamExecutionEnvironment streamEnv = StreamExecutionEnvironment.getExecutionEnvironment(); + streamEnv.setParallelism(4); + + DataStreamSource source = + streamEnv.fromData(Collections.emptySet(), TypeInformation.of(new TypeHint<>() {})); + source.setParallelism(8); + + DynamicIcebergSink.forInput(source) + .generator(new Generator()) + .catalogLoader(CATALOG_EXTENSION.catalogLoader()) + .uidPrefix("test") + .append(); + + // Since the generator parallelism is not directly accessible via the returned DataStreamSink, + // inspect the stream graph to verify the generator inherits the input source parallelism. + int generatorParallelism = + streamEnv.getStreamGraph().getStreamNodes().stream() + .filter(node -> "test--generator".equals(node.getTransformationUID())) + .findFirst() + .map(StreamNode::getParallelism) + .orElseThrow(() -> new AssertionError("Generator node not found")); + + assertThat(generatorParallelism).isEqualTo(source.getParallelism()); + } + private Set createSinkAndReturnUIds(String uidPrefix) { StreamExecutionEnvironment streamEnv = StreamExecutionEnvironment.getExecutionEnvironment(); diff --git a/flink/v2.1/flink/src/main/java/org/apache/iceberg/flink/sink/dynamic/DynamicIcebergSink.java b/flink/v2.1/flink/src/main/java/org/apache/iceberg/flink/sink/dynamic/DynamicIcebergSink.java index 4b5c9bef41e1..a00eba492a19 100644 --- a/flink/v2.1/flink/src/main/java/org/apache/iceberg/flink/sink/dynamic/DynamicIcebergSink.java +++ b/flink/v2.1/flink/src/main/java/org/apache/iceberg/flink/sink/dynamic/DynamicIcebergSink.java @@ -407,6 +407,7 @@ public DataStreamSink append() { tableCreator, caseSensitive, dropUnusedColumns)) + .setParallelism(input.getParallelism()) .uid(prefixIfNotNull(uidPrefix, "-generator")) .name(operatorName("generator")) .returns(type); diff --git a/flink/v2.1/flink/src/test/java/org/apache/iceberg/flink/sink/dynamic/TestDynamicIcebergSink.java b/flink/v2.1/flink/src/test/java/org/apache/iceberg/flink/sink/dynamic/TestDynamicIcebergSink.java index 27b1e3d84a8c..aa92ae8ceb34 100644 --- a/flink/v2.1/flink/src/test/java/org/apache/iceberg/flink/sink/dynamic/TestDynamicIcebergSink.java +++ b/flink/v2.1/flink/src/test/java/org/apache/iceberg/flink/sink/dynamic/TestDynamicIcebergSink.java @@ -1196,6 +1196,33 @@ void testOperatorUidsFormat() { "Sink Committer: --sink"); } + @Test + void testGeneratorDefaultParallelism() { + StreamExecutionEnvironment streamEnv = StreamExecutionEnvironment.getExecutionEnvironment(); + streamEnv.setParallelism(4); + + DataStreamSource source = + streamEnv.fromData(Collections.emptySet(), TypeInformation.of(new TypeHint<>() {})); + source.setParallelism(8); + + DynamicIcebergSink.forInput(source) + .generator(new Generator()) + .catalogLoader(CATALOG_EXTENSION.catalogLoader()) + .uidPrefix("test") + .append(); + + // Since the generator parallelism is not directly accessible via the returned DataStreamSink, + // inspect the stream graph to verify the generator inherits the input source parallelism. + int generatorParallelism = + streamEnv.getStreamGraph().getStreamNodes().stream() + .filter(node -> "test--generator".equals(node.getTransformationUID())) + .findFirst() + .map(StreamNode::getParallelism) + .orElseThrow(() -> new AssertionError("Generator node not found")); + + assertThat(generatorParallelism).isEqualTo(source.getParallelism()); + } + private Set createSinkAndReturnUIds(String uidPrefix) { StreamExecutionEnvironment streamEnv = StreamExecutionEnvironment.getExecutionEnvironment();