Flink: Add equality delete to DV conversion as a table maintenance task#15996
Flink: Add equality delete to DV conversion as a table maintenance task#15996mxm wants to merge 9 commits intoapache:mainfrom
Conversation
…rsion Adds the data types and serialization layer for converting equality deletes to deletion vectors (DVs) within the Flink table maintenance framework. This is the foundation for the conversion pipeline. Data types: - SerializedEqualityValues: content-based key wrapper for Flink keyBy - IndexCommand: planner-to-worker command (ADD_DATA_ROW / RESOLVE_DELETE) - ReadCommand: planner-to-reader command (DATA_FILE / EQ_DELETE_FILE) - DVPosition: worker output identifying a row to delete - DVMergeCommand: resolver-to-merger command with partition context - DVMergeResult: merger output with new and rewritten DV files - EqualityConvertPlanResult: planner metadata for resolver and committer Serialization: - EqualityFieldSerializer: deterministic key serialization using Iceberg Conversions.toByteBuffer, prefixed with field IDs to support multiple equality field sets without collisions
Adds the file reader and primary key index worker for the equality delete conversion pipeline. - EqualityConvertReader: reads data files and equality delete files, projects equality fields, serializes keys, and emits IndexCommands. Uses FormatModelRegistry for format-agnostic file reading. - EqualityConvertWorker: KeyedProcessFunction maintaining a shard of the primary key index in Flink state. Uses phase-aware buffering with event-time timers to guarantee that data rows are indexed before equality deletes are resolved against them. Detects main branch changes and clears stale state.
Adds the DV resolver and merger for the equality delete conversion pipeline. - EqualityConvertDVResolver: TwoInputStreamOperator that collects DVPositions from workers and EqualityConvertPlanResult from the planner. On watermark, groups positions by data file, resolves partition info and existing DVs from manifests, and emits DVMergeCommands. Includes staging DVs in the merge to prevent V3 violations (two DVs for the same data file). - EqualityConvertDVMerger: writes merged Puffin DV files. Loads existing DVs for in-memory merge via BaseDVFileWriter. On failure, emits an abort signal to prevent partial commits.
Adds the planner operator that orchestrates the conversion pipeline. Scans the staging branch for new data files and equality delete files, then emits ReadCommands with phase-based event timestamps and watermarks between phases to guarantee ordering: ``` Phase 0: main data files (index refresh) Phase 1: equality delete files (resolution) Phase 2: staging data files (index update) ``` Handles main branch changes by detecting external commits or compaction (REPLACE operations) and triggering index rebuilds.
Adds the committer operator that commits converted data files and DVs to the main branch. Receives DVMergeResults from EqualityConvertDVMerger operator and an EqualityConvertPlanResult from the planner. Once the done-timestamp watermark arrives, assembles a RowDelta with: - Data files from the staging branch - New and merged DVs from the conversion - Staging position deletes (excluding V3 violations) Implements idempotency via a snapshot summary property (equality-convert-staging-snapshot) to safely skip already-committed staging snapshots on replay after failure.
Adds ConvertEqualityDeletes, the API that wires the conversion pipeline as a Flink table maintenance task. Extends MaintenanceTaskBuilder to integrate with the maintenance framework's trigger, lock, and aggregator infrastructure. The pipeline reads equality delete files from a staging branch, converts them to deletion vectors using a primary key index in Flink state, and commits the results to the main branch. Mutual exclusion with compaction is enforced by the maintenance framework lock. Integration tests cover: basic conversion, multiple deletes, duplicate keys, multi-snapshot staging, DV merge across runs, compaction snapshots interleaving, partitioned tables, external commit reindexing, and staging with data-only snapshots.
|
|
||
| public enum Type { | ||
| DATA_FILE, | ||
| EQ_DELETE_FILE |
There was a problem hiding this comment.
How can we handle concurrent position delete changes?
Shall we have a type for it as well to read the position deletes too?
There was a problem hiding this comment.
Or not even concurrent position deletes, but just position deletes created by the job itself
There was a problem hiding this comment.
Good point. We handle positional delete files, i.e. we add them as-is to the plan result and merge them back to the main branch alongside with the data files and the DVs (see planner). While this may be allowed in V3 (not 100% sure), it would be preferable to merge them with the DVs.
Or not even concurrent position deletes, but just position deletes created by the job itself
I thought we wouldn't typically write positional deletes, but you are right that we create them even in a CDC job, e.g. when a writer receives an insert + delete.
| public static class Builder extends MaintenanceTaskBuilder<Builder> { | ||
| private String stagingBranch; | ||
| private String targetBranch = SnapshotRef.MAIN_BRANCH; | ||
| private int maxSnapshotsPerTrigger = Integer.MAX_VALUE; |
There was a problem hiding this comment.
Does this mean that we might convert multiple commits to the staging branch to a single commit on the main branch? I'm asking because if this is the case then we have to play very carefully to allow cases like: insert->update->delete->insert->update - the order of the event is very important
There was a problem hiding this comment.
Yes, when maxSnapshotsPerTrigger > 1 (the default is Integer.MAX_VALUE) multiple staging snapshots are collapsed into one main commit. This works because the resulting deletes are positional. We can handle multiple delete+insert operations. You're right that we must resolve equality deletes one by one for each snapshot to build the correct deletion vectors. To do that, we first build the index from the main branch, then resolve the equality deletes of the first staging snapshot, then continue building the index from the data files of the first snapshot on the staging branch. We use incrementing watermarks to separate these phases.
I realize handling multiple snapshots adds some complexity, so it might be worth to drop this feature. We still need the watermark logic to separate building the index and resolving the deletes.
| } | ||
|
|
||
| private Type type; | ||
| private Long mainSnapshotId; |
There was a problem hiding this comment.
why is this Long, and not long?
There was a problem hiding this comment.
If this was an empty table, mainSnapshotId would be null. We might want to special-case this and use long instead.
| Preconditions.checkArgument( | ||
| !stagingBranch.equals(targetBranch), | ||
| "stagingBranch and targetBranch must be different, but both are '%s'", | ||
| stagingBranch); |
There was a problem hiding this comment.
Technically we could use the converter on the same branch as the normal writer as well. Don't we?
There was a problem hiding this comment.
This could be just another type of compaction where we compact the position delete files to equality delete files
There was a problem hiding this comment.
Technically, yes. I wanted to prevent users from running into conflicts when they use the same write branch. We can lift that restriction.
| if (lastStagingSnapshotId != null && stagingSnapshot.snapshotId() == lastStagingSnapshotId) { | ||
| LOG.info( | ||
| "Staging branch '{}' snapshot {} already processed, skipping.", | ||
| stagingBranch, | ||
| stagingSnapshot.snapshotId()); | ||
| emitNoOpResult(triggerTs, baseTs); | ||
| return; | ||
| } |
There was a problem hiding this comment.
Do we need this?
Probably we are better off if we handle main branch changes even if we don't have anything to process immediately.
Later stagingSnapshots.isEmpty() could handle this case as well.
There was a problem hiding this comment.
Yes, probably worth to "be ready", even if we don't have anything to process. Could also be configurable.
| } | ||
|
|
||
| if (needsReindex) { | ||
| mainIndexEmittedSet.clear(); |
There was a problem hiding this comment.
Could you please help me describing how this will trigger a reindex? Is this a full reindex, or an incremental one?
There was a problem hiding this comment.
In the main loop in main loop we always trigger indexing from the main branch if the given equality field set hasn't been index. This is the path:
It will be a full re-index.
|
|
||
| // Per staging snapshot, emit read commands in three phases with ascending timestamps | ||
| // and watermarks between phases to guarantee ordering: | ||
| // Phase 0 (baseTs + offset): main data files (index refresh) |
There was a problem hiding this comment.
We need to emit the the position delete files minimally, as Flink writes them if a record is updated twice during the same snapshot. We might also want to emit the DV changes if we want to prepare for external writers updating the table and deleting rows.
There was a problem hiding this comment.
We emit the positional delete files, as per comment above, but we don't take care to merge them into DVs. I'll have to address that.
| * are scoped to data files visible at that snapshot. Supports multiple equality field sets. | ||
| */ | ||
| @Internal | ||
| public class EqualityConvertPlanner extends AbstractStreamOperator<ReadCommand> |
There was a problem hiding this comment.
This is very similar to what @talatuyarer tries to put together in #14264
Maybe we could reuse the plan logic from there.
There was a problem hiding this comment.
Let me have a look to see if we can share the logic in a way that we can use it here. I would prefer not depend on other work in progress PRs, but I agree it makes sense to consolidate the code paths where possible.
|
|
||
| private transient ValueState<Long> mainSnapshotVersion; | ||
| private transient ListState<DVPosition> dataRowPositions; | ||
| private transient ListState<TimestampedCommand> pendingCommands; |
There was a problem hiding this comment.
Executing multiple conversions add serious complexity. Does this worth it?
Shall we just do it one-by-one? We probably also would like to keep the original change sequence in the main branch too. That means committing changes one-by-one
There was a problem hiding this comment.
I thought it would be a clever thing to do, but I agree that it increases the complexity a lot. I'll simplify.
| if (DataOperations.REPLACE.equals(s.operation())) { | ||
| needsReindex = true; | ||
| break; | ||
| } |
There was a problem hiding this comment.
What will happend when a compact commit before our equal delete convert commit?
Some thing like
- The Planner reads staging snapshots containing equality deletes and emits ReadCommands to downstream readers.
- Readers resolve the equality delete against the current main branch data files and produce position deletes / DVs referencing fileA.parquet#N.
- Before the Committer can commit, an external Compaction rewrites fileA.parquet into fileMerged.parquet on the main branch.
- The Committer commits the DV that still references fileA.parquet, which no longer exists in the current main branch manifest.
There was a problem hiding this comment.
The mode of operation which I had in mind is that we run compaction via the Flink maintenance framework which prevents concurrent execution of maintenance tasks. If there is no locking, then there will a conflict during committing and the task will fail. When it runs again, it will pick up the changes from the main branch.
There was a problem hiding this comment.
If there is no locking, then there will a conflict during committing and the task will fail
I haven't tried this point before, can it be done right now?
| ByteBuffer buf = Conversions.toByteBuffer(fieldType, value); | ||
| dos.writeInt(buf.limit()); | ||
| dos.write(buf.array(), buf.arrayOffset() + buf.position(), buf.limit()); |
There was a problem hiding this comment.
Should we use limit() here or use remaining() since position may be not always 0?
There was a problem hiding this comment.
You're right, we should use remaining() just to prevent position() != 0.
| .union(resolved.getSideOutput(TaskResultAggregator.ERROR_STREAM)) | ||
| .union(merged.getSideOutput(TaskResultAggregator.ERROR_STREAM)) | ||
| .union(committed.getSideOutput(TaskResultAggregator.ERROR_STREAM))) |
There was a problem hiding this comment.
Should we also union dvPositions?
There was a problem hiding this comment.
We don't currently have an error stream for EqualityDeleteWorker which creates dvPositions, but we probably want that to prevent errors to fail the entire job.
| private PositionDeleteIndex loadPreviousDV( | ||
| String dataFilePath, Map<String, DeleteFile> existingDVs) { | ||
| DeleteFile existingDV = existingDVs.get(dataFilePath); | ||
| if (existingDV == null) { | ||
| return null; | ||
| } |
There was a problem hiding this comment.
Is there any possibility of reusing this method with PreviousDeleteLoader in Spark? I'm not entirely sure, as they seem somewhat similar.
There was a problem hiding this comment.
That's a great suggestion. We need to support positional deletes as per Peter's comments and it looks like BaseDeleteLoader which Spark uses also supports that.
| emitMainDataPhase(deletesByFieldIds, baseTs); | ||
| emitEqDeletePhase(deletesByFieldIds, baseTs, activeFieldSets); | ||
| emitSnapshotDataPhase(snapshotDataFiles, activeFieldSets, baseTs); |
There was a problem hiding this comment.
Could you help me understand whether this part is correct?
Assume the primary key field set is [id], and there are two staging snapshots within the same trigger:
- S1: Only adds data file F1 (e.g., id=1)
- S2: Adds equality delete D2 (deletes id=1)
At S1 moment
In EqualityConvertPlanner.java:
deletesByFieldIdsis emptyemitEqDeletePhaseis not executed,activeFieldSetsremains emptyemitSnapshotDataPhaseis called, but since there are no field sets when iteratingactiveFieldSets, it will not emit the DATA_FILE index command for F1
At S2 moment
emitEqDeletePhaseemits theEQ_DELETE_FILEcommand and adds[id]toactiveFieldSets- But this only affects the "current/subsequent" snapshot data phase, and will not go back to retroactively index F1 from S1
Staging data that appears before the 'first occurrence of an eq delete' for a certain field set will not be indexed.Then we will get more data than expected.
There was a problem hiding this comment.
That's correct. Thanks for spotting this. We need to build the set of equality deletes across all staging snapshots first. Alternatively, we could backfill the data for newly discovered equality fields.
In order to get rid of equality deletes, we first need to be able to resolving existing equality delete files into positional deletes. For this PR, we opted to use deletion vectors over regular delete files due to their efficiency in terms of space and lookup.
The idea is to allow writers to write equality deletes like they normally due (e.g. via
IcebergSink), but they commit to a staging branch first. The maintenance task monitors the staging branch, builds an index from the equality fields values of the data files, then converts the equality delete files into DVs, and commits everything back to the main branch, where it can be read efficiently using only positional deletes.This workflow will require additional changes to
IcebergSink, to write data to the staging branch, instead of the main branch and run the here-addedConvertEqualityDeletesmaintenance task in-line, via its post commit topology. I was tempted to add this right-away, but decided to keep the scope limited to the maintenance task only.The maintenance task topology looks like this:
Limitations
For keeping the scope somewhat limited, I skipped these:
Follow-ups
Those, I'm planning to work on next:
Commits
The breakdown is as follows:
These could be put into separate PRs, if needed.
Metrics
Under the maintenance metric group:
References