Skip to content

[Flink] DynamicIcebergSink: Support projected schema writes for wide-table optimization #15925

@lintingbin

Description

@lintingbin

Background

The DynamicIcebergSink is designed for multi-table fan-out scenarios where a single Flink job ingests heterogeneous event streams into different Iceberg tables. A common production pattern is the wide-table model: all event types (log types) share one large Iceberg table whose schema is the union of all their columns, while each individual event type only carries a subset of those columns.

Example: A game analytics pipeline ingests 50 distinct log types into one wide Iceberg table with 500 columns. Log type A produces columns 1–40, log type B produces columns 200–230, and so on. No single event populates all 500 columns.

Problem

Without projected schema writes, DynamicIcebergSink must write a full-width row for every record. The caller is required to pad missing columns with null before handing the record to the sink. This causes significant overhead in wide-table scenarios:

1. Wasted CPU on null padding

The upstream operator must construct a RowData with the full table width (e.g., 500 fields), setting the vast majority to null. For high-throughput pipelines (millions of records/second), this is measurable CPU waste.

2. Wasted storage and I/O

Even with Parquet's efficient null encoding, writing explicit null columns in every row file inflates file sizes. A Parquet column chunk is always written even if all its values are null; the schema metadata overhead alone adds up at scale. In our production environment with 1-minute checkpoint intervals, this results in far more and larger small files than necessary.

3. Inflated serialization cost between Flink operators

DynamicRecord carries the full RowData across the network (Writer → Committer topology). A record with 500 columns is serialized even though only 40 of those columns are meaningful. This wastes both CPU and network bandwidth within the Flink job.

4. Poor write isolation between log types

DynamicWriter uses a WriteTarget as the writer factory key. With full-width rows, all records for the same table share one writer factory regardless of their logical log type. For use cases where log types need to be physically isolated (e.g., different sort orders per log type), this is inflexible.

Proposed Solution

Introduce an optional writeSchema field on DynamicRecord (and its internal counterpart DynamicRecordInternal). When set, writeSchema tells DynamicWriter to use only those columns when creating the Parquet TaskWriter, letting Iceberg fill the remaining optional columns with null at read time via its column projection mechanism.

DynamicRecord record = new DynamicRecord(...);
// Only specify the columns this log type actually produces
Schema logTypeASchema = tableSchema.select("event_time", "user_id", "level", "score");
record.setWriteSchema(logTypeASchema);

The RowData inside the record is expected to match writeSchema in column count and order — exactly like the existing schema-matching behavior, but without triggering schema evolution.

Key design points

writeSchema vs schema (record schema)

Field Purpose
schema Used for schema matching and evolution (existing behavior, unchanged)
writeSchema Used only for Parquet file writing; must be a subset of the resolved table schema

writeSchemaColumnIds in WriteTarget

WriteTarget (the writer factory key) gains an ordered list of field IDs derived from writeSchema. This ensures records with different projected schemas get independent TaskWriter instances, which is required for correctness (each writer is created with a specific RowType).

Validation in DynamicWriter

Before creating a writer factory for a projected schema, the following are enforced:

  • No required columns may be skipped — Iceberg cannot fill a required column with null at read time; skipping one would produce unreadable files.
  • All partition columns must be included — the writer needs partition values to place files correctly.
  • All equality-delete fields must be included — required for correct upsert semantics.
  • All field IDs in writeSchema must exist in the resolved table schema — prevents silent field mismatches.

Aggregation in DynamicWriteResultAggregator and DynamicCommitter

WriteResults produced by different projected schemas for the same table are merged at the table level before committing. A single checkpoint produces at most one Iceberg transaction per table, regardless of how many distinct writeSchema variants were active. This is consistent with the existing behavior for non-projected writes.

Serialization Compatibility

Introducing writeSchema requires changes to several serializers:

Serializer Change
DynamicRecordInternalSerializer Version bumped; writeSchema serialized as a length-prefixed JSON string (avoids DataOutputStream.writeUTF's 65535-byte limit for large schemas); backward-compatible deserialization preserved
DynamicCommittableSerializer Version 1 → 2; V1 deserialization preserved for checkpoint recovery
DynamicWriteResultSerializer Version 1 → 2; same backward-compatibility guarantee
WriteTarget (versioned serializer) Version bumped to account for new writeSchemaColumnIds field

Performance Impact

In production testing against a wide table (500 columns, 50 log types, average 40 active columns per log type, Flink 1.20):

  • Write throughput: up to 4x improvement for wide schemas
  • Parquet file size: several times smaller with 1-minute checkpoint intervals (fewer column chunks per file, significantly reduced small-file problem)

API Changes

// DynamicRecord.java (public API)
public class DynamicRecord {
    // existing fields unchanged ...

    /**
     * Optional projected schema for writing. When set, the RowData columns
     * must match this schema rather than the full table schema.
     * The remaining optional columns in the table schema will be read back
     * as null by Iceberg at query time.
     */
    @Nullable
    public Schema writeSchema() { ... }

    public void setWriteSchema(@Nullable Schema writeSchema) { ... }
}

This is a purely additive, backward-compatible change. Callers that do not set writeSchema see no behavioral difference.

Alternatives Considered

A. Null-pad at the source / upstream operator
This is what the current API forces. It moves the burden to every caller and does not reduce the serialization or storage cost — it just shifts where the nulls are constructed.

B. Separate tables per log type
This defeats the purpose of the wide-table model (unified querying, single schema governance) and creates operational overhead (hundreds of tables to manage).

C. Use Iceberg's existing read-side column projection
Iceberg's read-side column projection is well-established. This proposal applies the same concept to the write side within DynamicIcebergSink, where per-record write schema variance is the norm.

Related Issues

Implementation Status

A working implementation exists and has been validated in production (Flink 1.20, ~2M records/second, 500-column wide table). We intend to extend support to Flink 1.19 and 2.0 and submit a PR. We welcome feedback on the API design, particularly whether writeSchema belongs on DynamicRecord directly or should be expressed differently.

Metadata

Metadata

Assignees

No one assigned

    Labels

    No labels
    No labels

    Type

    No type
    No fields configured for issues without a type.

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions