Flink: Persist flink job id in DynamicWriteResultAggregator state#16011
Open
lrsb wants to merge 2 commits intoapache:mainfrom
Open
Flink: Persist flink job id in DynamicWriteResultAggregator state#16011lrsb wants to merge 2 commits intoapache:mainfrom
lrsb wants to merge 2 commits intoapache:mainfrom
Conversation
The DynamicCommitter deduplicates commits on the triplet (flink.job-id, flink.operator-id, max-committed-checkpoint-id) stored in each snapshot summary. The aggregator previously sampled the job id from the runtime environment on every operator open(), so a restart or rescale with a fresh Flink JobID would stamp restored write results with the new id. The committer's ancestor walk then could not find the snapshot committed under the old id, and silently re-committed the data, producing duplicate rows in the target table. Persist the aggregator's job id as operator state and reuse it on restore, so committables keep the job id under which their data was originally produced. This restores the dedup invariant across job restarts, autoscaler savepoint+resubmit cycles, and session-cluster resubmissions, which was broken under sustained catalog-side latency where commit responses were lost client-side.
DynamicWriteResultAggregator read the flink job id and operator id live from the runtime environment each time it built a DynamicCommittable. After a restart or rescale these values change, so committables produced from restored state were stamped with the new job id, defeating the persistence that DynamicCommitter dedup relies on. Use the aggregator's stored flinkJobId (restored from operator state) and operatorId fields when constructing committables, and normalize the operatorId source in open() to getOperatorUniqueID().
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Add this suggestion to a batch that can be applied as a single commit.This suggestion is invalid because no changes were made to the code.Suggestions cannot be applied while the pull request is closed.Suggestions cannot be applied while viewing a subset of changes.Only one suggestion per line can be applied in a batch.Add this suggestion to a batch that can be applied as a single commit.Applying suggestions on deleted lines is not supported.You must change the existing code in this line in order to create a valid suggestion.Outdated suggestions cannot be applied.This suggestion has been applied or marked resolved.Suggestions cannot be applied from pending reviews.Suggestions cannot be applied on multi-line comments.Suggestions cannot be applied while the pull request is queued to merge.Suggestion cannot be applied right now. Please check back later.
Fixes #16008
The DynamicCommitter deduplicates commits on the triplet (flink.job-id, flink.operator-id, max-committed-checkpoint-id) stored in each snapshot summary. The aggregator previously sampled the job id from the runtime environment on every operator open(), so a restart or rescale with a fresh Flink JobID would stamp restored write results with the new id. The committer's ancestor walk then could not find the snapshot committed under the old id, and silently re-committed the data, producing duplicate rows in the target table.
Persist the aggregator's job id as operator state and reuse it on restore, so committables keep the job id under which their data was originally produced. This restores the dedup invariant across job restarts, autoscaler savepoint+resubmit cycles, and session-cluster resubmissions, which was broken under sustained catalog-side latency where commit responses were lost client-side.