Background
The DynamicIcebergSink is designed for multi-table fan-out scenarios where a single Flink job ingests heterogeneous event streams into different Iceberg tables. A common production pattern is the wide-table model: all event types (log types) share one large Iceberg table whose schema is the union of all their columns, while each individual event type only carries a subset of those columns.
Example: A game analytics pipeline ingests 50 distinct log types into one wide Iceberg table with 500 columns. Log type A produces columns 1–40, log type B produces columns 200–230, and so on. No single event populates all 500 columns.
Problem
Without projected schema writes, DynamicIcebergSink must write a full-width row for every record. The caller is required to pad missing columns with null before handing the record to the sink. This causes significant overhead in wide-table scenarios:
1. Wasted CPU on null padding
The upstream operator must construct a RowData with the full table width (e.g., 500 fields), setting the vast majority to null. For high-throughput pipelines (millions of records/second), this is measurable CPU waste.
2. Wasted storage and I/O
Even with Parquet's efficient null encoding, writing explicit null columns in every row file inflates file sizes. A Parquet column chunk is always written even if all its values are null; the schema metadata overhead alone adds up at scale. In our production environment with 1-minute checkpoint intervals, this results in far more and larger small files than necessary.
3. Inflated serialization cost between Flink operators
DynamicRecord carries the full RowData across the network (Writer → Committer topology). A record with 500 columns is serialized even though only 40 of those columns are meaningful. This wastes both CPU and network bandwidth within the Flink job.
4. Poor write isolation between log types
DynamicWriter uses a WriteTarget as the writer factory key. With full-width rows, all records for the same table share one writer factory regardless of their logical log type. For use cases where log types need to be physically isolated (e.g., different sort orders per log type), this is inflexible.
Proposed Solution
Introduce an optional writeSchema field on DynamicRecord (and its internal counterpart DynamicRecordInternal). When set, writeSchema tells DynamicWriter to use only those columns when creating the Parquet TaskWriter, letting Iceberg fill the remaining optional columns with null at read time via its column projection mechanism.
DynamicRecord record = new DynamicRecord(...);
// Only specify the columns this log type actually produces
Schema logTypeASchema = tableSchema.select("event_time", "user_id", "level", "score");
record.setWriteSchema(logTypeASchema);
The RowData inside the record is expected to match writeSchema in column count and order — exactly like the existing schema-matching behavior, but without triggering schema evolution.
Key design points
writeSchema vs schema (record schema)
| Field |
Purpose |
schema |
Used for schema matching and evolution (existing behavior, unchanged) |
writeSchema |
Used only for Parquet file writing; must be a subset of the resolved table schema |
writeSchemaColumnIds in WriteTarget
WriteTarget (the writer factory key) gains an ordered list of field IDs derived from writeSchema. This ensures records with different projected schemas get independent TaskWriter instances, which is required for correctness (each writer is created with a specific RowType).
Validation in DynamicWriter
Before creating a writer factory for a projected schema, the following are enforced:
- No required columns may be skipped — Iceberg cannot fill a
required column with null at read time; skipping one would produce unreadable files.
- All partition columns must be included — the writer needs partition values to place files correctly.
- All equality-delete fields must be included — required for correct upsert semantics.
- All field IDs in
writeSchema must exist in the resolved table schema — prevents silent field mismatches.
Aggregation in DynamicWriteResultAggregator and DynamicCommitter
WriteResults produced by different projected schemas for the same table are merged at the table level before committing. A single checkpoint produces at most one Iceberg transaction per table, regardless of how many distinct writeSchema variants were active. This is consistent with the existing behavior for non-projected writes.
Serialization Compatibility
Introducing writeSchema requires changes to several serializers:
| Serializer |
Change |
DynamicRecordInternalSerializer |
Version bumped; writeSchema serialized as a length-prefixed JSON string (avoids DataOutputStream.writeUTF's 65535-byte limit for large schemas); backward-compatible deserialization preserved |
DynamicCommittableSerializer |
Version 1 → 2; V1 deserialization preserved for checkpoint recovery |
DynamicWriteResultSerializer |
Version 1 → 2; same backward-compatibility guarantee |
WriteTarget (versioned serializer) |
Version bumped to account for new writeSchemaColumnIds field |
Performance Impact
In production testing against a wide table (500 columns, 50 log types, average 40 active columns per log type, Flink 1.20):
- Write throughput: up to 4x improvement for wide schemas
- Parquet file size: several times smaller with 1-minute checkpoint intervals (fewer column chunks per file, significantly reduced small-file problem)
API Changes
// DynamicRecord.java (public API)
public class DynamicRecord {
// existing fields unchanged ...
/**
* Optional projected schema for writing. When set, the RowData columns
* must match this schema rather than the full table schema.
* The remaining optional columns in the table schema will be read back
* as null by Iceberg at query time.
*/
@Nullable
public Schema writeSchema() { ... }
public void setWriteSchema(@Nullable Schema writeSchema) { ... }
}
This is a purely additive, backward-compatible change. Callers that do not set writeSchema see no behavioral difference.
Alternatives Considered
A. Null-pad at the source / upstream operator
This is what the current API forces. It moves the burden to every caller and does not reduce the serialization or storage cost — it just shifts where the nulls are constructed.
B. Separate tables per log type
This defeats the purpose of the wide-table model (unified querying, single schema governance) and creates operational overhead (hundreds of tables to manage).
C. Use Iceberg's existing read-side column projection
Iceberg's read-side column projection is well-established. This proposal applies the same concept to the write side within DynamicIcebergSink, where per-record write schema variance is the norm.
Related Issues
Implementation Status
A working implementation exists and has been validated in production (Flink 1.20, ~2M records/second, 500-column wide table). We intend to extend support to Flink 1.19 and 2.0 and submit a PR. We welcome feedback on the API design, particularly whether writeSchema belongs on DynamicRecord directly or should be expressed differently.
Background
The
DynamicIcebergSinkis designed for multi-table fan-out scenarios where a single Flink job ingests heterogeneous event streams into different Iceberg tables. A common production pattern is the wide-table model: all event types (log types) share one large Iceberg table whose schema is the union of all their columns, while each individual event type only carries a subset of those columns.Example: A game analytics pipeline ingests 50 distinct log types into one wide Iceberg table with 500 columns. Log type A produces columns 1–40, log type B produces columns 200–230, and so on. No single event populates all 500 columns.
Problem
Without projected schema writes,
DynamicIcebergSinkmust write a full-width row for every record. The caller is required to pad missing columns withnullbefore handing the record to the sink. This causes significant overhead in wide-table scenarios:1. Wasted CPU on null padding
The upstream operator must construct a
RowDatawith the full table width (e.g., 500 fields), setting the vast majority tonull. For high-throughput pipelines (millions of records/second), this is measurable CPU waste.2. Wasted storage and I/O
Even with Parquet's efficient null encoding, writing explicit null columns in every row file inflates file sizes. A Parquet column chunk is always written even if all its values are null; the schema metadata overhead alone adds up at scale. In our production environment with 1-minute checkpoint intervals, this results in far more and larger small files than necessary.
3. Inflated serialization cost between Flink operators
DynamicRecordcarries the fullRowDataacross the network (Writer → Committer topology). A record with 500 columns is serialized even though only 40 of those columns are meaningful. This wastes both CPU and network bandwidth within the Flink job.4. Poor write isolation between log types
DynamicWriteruses aWriteTargetas the writer factory key. With full-width rows, all records for the same table share one writer factory regardless of their logical log type. For use cases where log types need to be physically isolated (e.g., different sort orders per log type), this is inflexible.Proposed Solution
Introduce an optional
writeSchemafield onDynamicRecord(and its internal counterpartDynamicRecordInternal). When set,writeSchematellsDynamicWriterto use only those columns when creating the ParquetTaskWriter, letting Iceberg fill the remaining optional columns withnullat read time via its column projection mechanism.The
RowDatainside the record is expected to matchwriteSchemain column count and order — exactly like the existing schema-matching behavior, but without triggering schema evolution.Key design points
writeSchemavsschema(record schema)schemawriteSchemawriteSchemaColumnIdsinWriteTargetWriteTarget(the writer factory key) gains an ordered list of field IDs derived fromwriteSchema. This ensures records with different projected schemas get independentTaskWriterinstances, which is required for correctness (each writer is created with a specificRowType).Validation in
DynamicWriterBefore creating a writer factory for a projected schema, the following are enforced:
requiredcolumn withnullat read time; skipping one would produce unreadable files.writeSchemamust exist in the resolved table schema — prevents silent field mismatches.Aggregation in
DynamicWriteResultAggregatorandDynamicCommitterWriteResults produced by different projected schemas for the same table are merged at the table level before committing. A single checkpoint produces at most one Iceberg transaction per table, regardless of how many distinctwriteSchemavariants were active. This is consistent with the existing behavior for non-projected writes.Serialization Compatibility
Introducing
writeSchemarequires changes to several serializers:DynamicRecordInternalSerializerwriteSchemaserialized as a length-prefixed JSON string (avoidsDataOutputStream.writeUTF's 65535-byte limit for large schemas); backward-compatible deserialization preservedDynamicCommittableSerializerDynamicWriteResultSerializerWriteTarget(versioned serializer)writeSchemaColumnIdsfieldPerformance Impact
In production testing against a wide table (500 columns, 50 log types, average 40 active columns per log type, Flink 1.20):
API Changes
This is a purely additive, backward-compatible change. Callers that do not set
writeSchemasee no behavioral difference.Alternatives Considered
A. Null-pad at the source / upstream operator
This is what the current API forces. It moves the burden to every caller and does not reduce the serialization or storage cost — it just shifts where the nulls are constructed.
B. Separate tables per log type
This defeats the purpose of the wide-table model (unified querying, single schema governance) and creates operational overhead (hundreds of tables to manage).
C. Use Iceberg's existing read-side column projection
Iceberg's read-side column projection is well-established. This proposal applies the same concept to the write side within
DynamicIcebergSink, where per-record write schema variance is the norm.Related Issues
DynamicWriteResultAggregatorproducing multiple committables per table/branch/checkpoint; the table-level result aggregation introduced by this feature also addresses that issue as a side effectImplementation Status
A working implementation exists and has been validated in production (Flink 1.20, ~2M records/second, 500-column wide table). We intend to extend support to Flink 1.19 and 2.0 and submit a PR. We welcome feedback on the API design, particularly whether
writeSchemabelongs onDynamicRecorddirectly or should be expressed differently.