Skip to content

Flink: Persist flink job id in DynamicWriteResultAggregator state#16011

Open
lrsb wants to merge 2 commits intoapache:mainfrom
lrsb:github
Open

Flink: Persist flink job id in DynamicWriteResultAggregator state#16011
lrsb wants to merge 2 commits intoapache:mainfrom
lrsb:github

Conversation

@lrsb
Copy link
Copy Markdown

@lrsb lrsb commented Apr 17, 2026

Fixes #16008

The DynamicCommitter deduplicates commits on the triplet (flink.job-id, flink.operator-id, max-committed-checkpoint-id) stored in each snapshot summary. The aggregator previously sampled the job id from the runtime environment on every operator open(), so a restart or rescale with a fresh Flink JobID would stamp restored write results with the new id. The committer's ancestor walk then could not find the snapshot committed under the old id, and silently re-committed the data, producing duplicate rows in the target table.

Persist the aggregator's job id as operator state and reuse it on restore, so committables keep the job id under which their data was originally produced. This restores the dedup invariant across job restarts, autoscaler savepoint+resubmit cycles, and session-cluster resubmissions, which was broken under sustained catalog-side latency where commit responses were lost client-side.

@github-actions github-actions bot added the flink label Apr 17, 2026
@lrsb lrsb marked this pull request as ready for review April 17, 2026 12:41
@lrsb lrsb marked this pull request as draft April 17, 2026 13:15
lrsb added 2 commits April 17, 2026 15:25
The DynamicCommitter deduplicates commits on the triplet
(flink.job-id, flink.operator-id, max-committed-checkpoint-id) stored
in each snapshot summary. The aggregator previously sampled the job id
from the runtime environment on every operator open(), so a restart or
rescale with a fresh Flink JobID would stamp restored write results
with the new id. The committer's ancestor walk then could not find the
snapshot committed under the old id, and silently re-committed the
data, producing duplicate rows in the target table.

Persist the aggregator's job id as operator state and reuse it on
restore, so committables keep the job id under which their data was
originally produced. This restores the dedup invariant across job
restarts, autoscaler savepoint+resubmit cycles, and session-cluster
resubmissions, which was broken under sustained catalog-side latency
where commit responses were lost client-side.
DynamicWriteResultAggregator read the flink job id and operator id
live from the runtime environment each time it built a DynamicCommittable.
After a restart or rescale these values change, so committables produced
from restored state were stamped with the new job id, defeating the
persistence that DynamicCommitter dedup relies on.

Use the aggregator's stored flinkJobId (restored from operator state)
and operatorId fields when constructing committables, and normalize the
operatorId source in open() to getOperatorUniqueID().
@lrsb lrsb marked this pull request as ready for review April 17, 2026 13:27
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

Projects

None yet

1 participant