Skip to content

Flink: Add equality delete to DV conversion as a table maintenance task#15996

Open
mxm wants to merge 9 commits intoapache:mainfrom
mxm:resolve-equality-deletes
Open

Flink: Add equality delete to DV conversion as a table maintenance task#15996
mxm wants to merge 9 commits intoapache:mainfrom
mxm:resolve-equality-deletes

Conversation

@mxm
Copy link
Copy Markdown
Contributor

@mxm mxm commented Apr 16, 2026

In order to get rid of equality deletes, we first need to be able to resolving existing equality delete files into positional deletes. For this PR, we opted to use deletion vectors over regular delete files due to their efficiency in terms of space and lookup.

The idea is to allow writers to write equality deletes like they normally due (e.g. via IcebergSink), but they commit to a staging branch first. The maintenance task monitors the staging branch, builds an index from the equality fields values of the data files, then converts the equality delete files into DVs, and commits everything back to the main branch, where it can be read efficiently using only positional deletes.

This workflow will require additional changes to IcebergSink, to write data to the staging branch, instead of the main branch and run the here-added ConvertEqualityDeletes maintenance task in-line, via its post commit topology. I was tempted to add this right-away, but decided to keep the scope limited to the maintenance task only.

The maintenance task topology looks like this:

Planner (p=1) -> Reader (p=N) -> Worker (p=N, keyed) -> DVResolver (p=1) -> DVMerger (p=N) -> Committer (p=1)

Limitations

For keeping the scope somewhat limited, I skipped these:

  • Row lineage is not preserved (can be handled in a follow-up)
  • No staging branch lifecycle management, e.g. create / clean up branch (will be added with the integration into IcebergSink)
  • DVResolver does a full manifest scan (no partition pruning)

Follow-ups

Those, I'm planning to work on next:

  • Integrate ConvertEqualityDeletes maintenance task as in-line compaction in IcebergSink
  • Clean up processed snapshots on the staging branch
  • Add documentation

Commits

The breakdown is as follows:

  1. 8427518 Data model and key serialization - data exchange types and key serialization
  2. 5928dc2 Operators - file reader and keyed PK index worker with buffering
  3. e5285ad DV resolution and writing - group positions by data file, merge with existing DVs
  4. 691e209 Planner - staging branch scanning, orchestration, reindex detection
  5. 4fb0608 Committer - RowDelta assembly, idempotency via snapshot properties, V3 enforcement
  6. 76ab1d9 API and integration tests - ConvertEqualityDeletes builder, end-to-end tests

These could be put into separate PRs, if needed.

Metrics

Under the maintenance metric group:

  • Planner: processedEqDeleteFileNum, processedStagingSnapshotNum, skippedNoOpCycles, reindexCount
  • Worker: indexedKeyNum, resolvedDeleteNum
  • Committer: addedDvNum, commitDurationMs, addedDataFileNum, addedDataFileSize, error

References

  1. Design document: https://docs.google.com/document/d/1Jz4Fjt-6jRmwqbgHX_u0ohuyTB9ytDzfslS7lYraIjk/edit?tab=t.5qlfbj9bk8i3 (This PR implements the "use lock to avoid conflicts" solution)
  2. Mailing list discussion: https://lists.apache.org/thread/gldqrycmo24r0vo77wzcj9s9b04rtvy7

mxm added 6 commits April 16, 2026 14:23
…rsion

Adds the data types and serialization layer for converting equality
deletes to deletion vectors (DVs) within the Flink table maintenance
framework. This is the foundation for the conversion pipeline.

Data types:
- SerializedEqualityValues: content-based key wrapper for Flink keyBy
- IndexCommand: planner-to-worker command (ADD_DATA_ROW / RESOLVE_DELETE)
- ReadCommand: planner-to-reader command (DATA_FILE / EQ_DELETE_FILE)
- DVPosition: worker output identifying a row to delete
- DVMergeCommand: resolver-to-merger command with partition context
- DVMergeResult: merger output with new and rewritten DV files
- EqualityConvertPlanResult: planner metadata for resolver and committer

Serialization:
- EqualityFieldSerializer: deterministic key serialization using
  Iceberg Conversions.toByteBuffer, prefixed with field IDs to
  support multiple equality field sets without collisions
Adds the file reader and primary key index worker for the equality
delete conversion pipeline.

- EqualityConvertReader: reads data files and equality delete files,
  projects equality fields, serializes keys, and emits IndexCommands.
  Uses FormatModelRegistry for format-agnostic file reading.

- EqualityConvertWorker: KeyedProcessFunction maintaining a shard of
  the primary key index in Flink state. Uses phase-aware buffering
  with event-time timers to guarantee that data rows are indexed
  before equality deletes are resolved against them. Detects main
  branch changes and clears stale state.
Adds the DV resolver and merger for the equality delete conversion
pipeline.

- EqualityConvertDVResolver: TwoInputStreamOperator that collects
  DVPositions from workers and EqualityConvertPlanResult from the
  planner. On watermark, groups positions by data file, resolves
  partition info and existing DVs from manifests, and emits
  DVMergeCommands. Includes staging DVs in the merge to prevent
  V3 violations (two DVs for the same data file).

- EqualityConvertDVMerger: writes merged Puffin DV files. Loads
  existing DVs for in-memory merge via BaseDVFileWriter. On failure,
  emits an abort signal to prevent partial commits.
Adds the planner operator that orchestrates the conversion pipeline.

Scans the staging branch for new data files and equality delete files,
then emits ReadCommands with phase-based event timestamps and
watermarks between phases to guarantee ordering:

```
  Phase 0: main data files (index refresh)
  Phase 1: equality delete files (resolution)
  Phase 2: staging data files (index update)
```

Handles main branch changes by detecting external commits or
compaction (REPLACE operations) and triggering index rebuilds.
Adds the committer operator that commits converted data files and DVs
to the main branch.

Receives DVMergeResults from EqualityConvertDVMerger operator and an
EqualityConvertPlanResult from the planner. Once the done-timestamp
watermark arrives, assembles a RowDelta with:
- Data files from the staging branch
- New and merged DVs from the conversion
- Staging position deletes (excluding V3 violations)

Implements idempotency via a snapshot summary property
(equality-convert-staging-snapshot) to safely skip already-committed
staging snapshots on replay after failure.
Adds ConvertEqualityDeletes, the API that wires the conversion pipeline as a
Flink table maintenance task. Extends MaintenanceTaskBuilder to integrate with
the maintenance framework's trigger, lock, and aggregator infrastructure.

The pipeline reads equality delete files from a staging branch,
converts them to deletion vectors using a primary key index in Flink
state, and commits the results to the main branch. Mutual exclusion
with compaction is enforced by the maintenance framework lock.

Integration tests cover: basic conversion, multiple deletes, duplicate
keys, multi-snapshot staging, DV merge across runs, compaction snapshots
interleaving, partitioned tables, external commit reindexing, and
staging with data-only snapshots.
@github-actions github-actions bot added the flink label Apr 16, 2026

public enum Type {
DATA_FILE,
EQ_DELETE_FILE
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How can we handle concurrent position delete changes?
Shall we have a type for it as well to read the position deletes too?

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Or not even concurrent position deletes, but just position deletes created by the job itself

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good point. We handle positional delete files, i.e. we add them as-is to the plan result and merge them back to the main branch alongside with the data files and the DVs (see planner). While this may be allowed in V3 (not 100% sure), it would be preferable to merge them with the DVs.

Or not even concurrent position deletes, but just position deletes created by the job itself

I thought we wouldn't typically write positional deletes, but you are right that we create them even in a CDC job, e.g. when a writer receives an insert + delete.

public static class Builder extends MaintenanceTaskBuilder<Builder> {
private String stagingBranch;
private String targetBranch = SnapshotRef.MAIN_BRANCH;
private int maxSnapshotsPerTrigger = Integer.MAX_VALUE;
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Does this mean that we might convert multiple commits to the staging branch to a single commit on the main branch? I'm asking because if this is the case then we have to play very carefully to allow cases like: insert->update->delete->insert->update - the order of the event is very important

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, when maxSnapshotsPerTrigger > 1 (the default is Integer.MAX_VALUE) multiple staging snapshots are collapsed into one main commit. This works because the resulting deletes are positional. We can handle multiple delete+insert operations. You're right that we must resolve equality deletes one by one for each snapshot to build the correct deletion vectors. To do that, we first build the index from the main branch, then resolve the equality deletes of the first staging snapshot, then continue building the index from the data files of the first snapshot on the staging branch. We use incrementing watermarks to separate these phases.

I realize handling multiple snapshots adds some complexity, so it might be worth to drop this feature. We still need the watermark logic to separate building the index and resolving the deletes.

}

private Type type;
private Long mainSnapshotId;
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why is this Long, and not long?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If this was an empty table, mainSnapshotId would be null. We might want to special-case this and use long instead.

Comment on lines +121 to +124
Preconditions.checkArgument(
!stagingBranch.equals(targetBranch),
"stagingBranch and targetBranch must be different, but both are '%s'",
stagingBranch);
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Technically we could use the converter on the same branch as the normal writer as well. Don't we?

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This could be just another type of compaction where we compact the position delete files to equality delete files

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Technically, yes. I wanted to prevent users from running into conflicts when they use the same write branch. We can lift that restriction.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Removed in 63ecdd8.

Comment on lines +253 to +260
if (lastStagingSnapshotId != null && stagingSnapshot.snapshotId() == lastStagingSnapshotId) {
LOG.info(
"Staging branch '{}' snapshot {} already processed, skipping.",
stagingBranch,
stagingSnapshot.snapshotId());
emitNoOpResult(triggerTs, baseTs);
return;
}
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we need this?
Probably we are better off if we handle main branch changes even if we don't have anything to process immediately.

Later stagingSnapshots.isEmpty() could handle this case as well.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, probably worth to "be ready", even if we don't have anything to process. Could also be configurable.

}

if (needsReindex) {
mainIndexEmittedSet.clear();
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could you please help me describing how this will trigger a reindex? Is this a full reindex, or an incremental one?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In the main loop in main loop we always trigger indexing from the main branch if the given equality field set hasn't been index. This is the path:

It will be a full re-index.


// Per staging snapshot, emit read commands in three phases with ascending timestamps
// and watermarks between phases to guarantee ordering:
// Phase 0 (baseTs + offset): main data files (index refresh)
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We need to emit the the position delete files minimally, as Flink writes them if a record is updated twice during the same snapshot. We might also want to emit the DV changes if we want to prepare for external writers updating the table and deleting rows.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We emit the positional delete files, as per comment above, but we don't take care to merge them into DVs. I'll have to address that.

* are scoped to data files visible at that snapshot. Supports multiple equality field sets.
*/
@Internal
public class EqualityConvertPlanner extends AbstractStreamOperator<ReadCommand>
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is very similar to what @talatuyarer tries to put together in #14264
Maybe we could reuse the plan logic from there.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let me have a look to see if we can share the logic in a way that we can use it here. I would prefer not depend on other work in progress PRs, but I agree it makes sense to consolidate the code paths where possible.


private transient ValueState<Long> mainSnapshotVersion;
private transient ListState<DVPosition> dataRowPositions;
private transient ListState<TimestampedCommand> pendingCommands;
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Executing multiple conversions add serious complexity. Does this worth it?
Shall we just do it one-by-one? We probably also would like to keep the original change sequence in the main branch too. That means committing changes one-by-one

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I thought it would be a clever thing to do, but I agree that it increases the complexity a lot. I'll simplify.

Comment on lines +315 to +318
if (DataOperations.REPLACE.equals(s.operation())) {
needsReindex = true;
break;
}
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What will happend when a compact commit before our equal delete convert commit?

Some thing like

  1. The Planner reads staging snapshots containing equality deletes and emits ReadCommands to downstream readers.
  2. Readers resolve the equality delete against the current main branch data files and produce position deletes / DVs referencing fileA.parquet#N.
  3. Before the Committer can commit, an external Compaction rewrites fileA.parquet into fileMerged.parquet on the main branch.
  4. The Committer commits the DV that still references fileA.parquet, which no longer exists in the current main branch manifest.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The mode of operation which I had in mind is that we run compaction via the Flink maintenance framework which prevents concurrent execution of maintenance tasks. If there is no locking, then there will a conflict during committing and the task will fail. When it runs again, it will pick up the changes from the main branch.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If there is no locking, then there will a conflict during committing and the task will fail

I haven't tried this point before, can it be done right now?

Comment on lines +79 to +81
ByteBuffer buf = Conversions.toByteBuffer(fieldType, value);
dos.writeInt(buf.limit());
dos.write(buf.array(), buf.arrayOffset() + buf.position(), buf.limit());
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we use limit() here or use remaining() since position may be not always 0?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You're right, we should use remaining() just to prevent position() != 0.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Resolved via 22221a9.

Comment on lines +217 to +219
.union(resolved.getSideOutput(TaskResultAggregator.ERROR_STREAM))
.union(merged.getSideOutput(TaskResultAggregator.ERROR_STREAM))
.union(committed.getSideOutput(TaskResultAggregator.ERROR_STREAM)))
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we also union dvPositions?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We don't currently have an error stream for EqualityDeleteWorker which creates dvPositions, but we probably want that to prevent errors to fail the entire job.

Comment on lines +151 to +156
private PositionDeleteIndex loadPreviousDV(
String dataFilePath, Map<String, DeleteFile> existingDVs) {
DeleteFile existingDV = existingDVs.get(dataFilePath);
if (existingDV == null) {
return null;
}
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is there any possibility of reusing this method with PreviousDeleteLoader in Spark? I'm not entirely sure, as they seem somewhat similar.

private static class PreviousDeleteLoader implements Function<CharSequence, PositionDeleteIndex> {
private final Map<String, DeleteFileSet> deleteFiles;

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That's a great suggestion. We need to support positional deletes as per Peter's comments and it looks like BaseDeleteLoader which Spark uses also supports that.

Comment on lines +386 to +388
emitMainDataPhase(deletesByFieldIds, baseTs);
emitEqDeletePhase(deletesByFieldIds, baseTs, activeFieldSets);
emitSnapshotDataPhase(snapshotDataFiles, activeFieldSets, baseTs);
Copy link
Copy Markdown
Contributor

@Guosmilesmile Guosmilesmile Apr 17, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could you help me understand whether this part is correct?

Assume the primary key field set is [id], and there are two staging snapshots within the same trigger:

  • S1: Only adds data file F1 (e.g., id=1)
  • S2: Adds equality delete D2 (deletes id=1)

At S1 moment
In EqualityConvertPlanner.java:

  • deletesByFieldIds is empty
  • emitEqDeletePhase is not executed, activeFieldSets remains empty
  • emitSnapshotDataPhase is called, but since there are no field sets when iterating activeFieldSets, it will not emit the DATA_FILE index command for F1

At S2 moment

  • emitEqDeletePhase emits the EQ_DELETE_FILE command and adds [id] to activeFieldSets
  • But this only affects the "current/subsequent" snapshot data phase, and will not go back to retroactively index F1 from S1

Staging data that appears before the 'first occurrence of an eq delete' for a certain field set will not be indexed.Then we will get more data than expected.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That's correct. Thanks for spotting this. We need to build the set of equality deletes across all staging snapshots first. Alternatively, we could backfill the data for newly discovered equality fields.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This should be resolved via 2ab5a34.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants