Skip to content

[TASK-389] Idempotent writes#404

Open
fresh-borzoni wants to merge 5 commits intoapache:mainfrom
fresh-borzoni:rust-sdk-idempotent-writes
Open

[TASK-389] Idempotent writes#404
fresh-borzoni wants to merge 5 commits intoapache:mainfrom
fresh-borzoni:rust-sdk-idempotent-writes

Conversation

@fresh-borzoni
Copy link
Contributor

@fresh-borzoni fresh-borzoni commented Mar 1, 2026

Summary

closes #389

Implements idempotent writes for the Rust SDK. When enabled (default), the server allocates a writer ID and tracks per-bucket sequence numbers so it can detect and deduplicate retried batches. This makes retries safe - a transient network failure or leader failover no longer risks duplicate records.

How it works

Before the first batch is sent, the sender requests a writer ID from the server via a new InitWriter RPC. Each batch is then stamped with (writer_id, batch_sequence) before being sent. The server uses this pair to reject duplicates: if it sees the same sequence again, it returns the previous result instead of writing twice.

The IdempotenceManager owns per-bucket state: the next sequence to assign, the in-flight batch queue, and error recovery logic. When a batch completes successfully, it's removed from the in-flight queue and the next sequence advances. When a batch fails with a retriable error, it gets re-enqueued with its original sequence number - the server expects that exact sequence on retry.

Two server errors trigger a full reset of writer state: UnknownWriterId (server lost our writer session) and OutOfOrderSequence that can't be resolved by reordering. On reset, the writer ID is cleared, all pending sequences are invalidated, and a new writer ID is allocated on the next drain cycle. This matches Java's recovery semantics.

Retry scenarios and sequence reordering

With up to 5 batches in-flight per bucket, responses can arrive out of order. The idempotence manager handles this by tracking per-bucket state: last_acked_sequence, an ordered in-flight queue, and next_sequence.

Happy path - out-of-order completion. Batches with sequences 0, 1, 2 are in-flight. Batch 1 completes before batch 0. last_acked_sequence advances to 1 (it always tracks the highest completed sequence, not strictly sequential). When batch 0 later completes, last_acked_sequence stays at 1 since 0 < 1. The server accepts both because they each carry the correct sequence. No reordering needed on the client.

Retriable failure with sequence adjustment. Batches 0, 1, 2 are in-flight. Batch 1 fails with a retriable error (e.g., NetworkException) and will be retried. But the server has never seen sequence 1, so when batch 1 is re-enqueued, there's a gap: the server expects 0, 1, 2 in order but would see 0, 2 (success), then 1 (retry). To fix this, handle_failed_batch with adjust_sequences=true removes batch 1 from the in-flight queue, decrements batch 2's sequence from 2 -> 1, and rolls back next_sequence from 3 -> 2. Now batch 2 arrives with sequence 1 (filling the gap), and when batch 1 is retried after re-drain, it gets a fresh sequence. The adjusted batches are marked with a reset_batch_id flag so can_retry_for_error knows they were rewritten and treats OutOfOrderSequence as retriable for them.

Non-retriable failure - no adjustment. If batch 1 exhausts its retries, adjust_sequences=false. Sequences are left as-is. The server will see a gap (sequence 1 missing), which triggers OutOfOrderSequence for batch 2. Since batch 2 is not the "next expected" sequence (last_acked + 1), can_retry_for_error returns true, it's a reordering issue, not a genuine violation. Batch 2 gets retried. If it is the next expected sequence and still gets OutOfOrderSequence, that's a genuine server-side inconsistency, the writer ID is reset and all state is cleared.

UnknownWriterId / OutOfOrderSequence - full reset. These indicate the server lost our session or detected an unrecoverable gap. handle_failed_batch clears the writer ID and wipes all bucket entries. On the next drain cycle, the sender allocates a fresh writer ID, and maybe_update_writer_id resets each bucket's sequence to 0 (but only once its in-flight queue is empty, to avoid corrupting batches still awaiting responses from the old session).

Memory backpressure

The accumulator now enforces a memory budget (default 64MB) across all buffered batches. When the buffer is full, upsert()/append() block until in-flight batches complete and release memory. This prevents unbounded memory growth when the application writes faster than the network can drain. The MemoryLimiter is semaphore-style with byte granularity and RAII permits, memory is released automatically when a batch is completed or dropped.

Where we diverged from Java

Sequence assignment timing. Java assigns sequences at append() time, its sender thread blocks at startup until the writer ID is allocated, so the ID is always available. Our append() is sync on the caller's thread with no guarantee the async writer ID allocation has completed yet, so we assign at drain() time instead as the sender has already ensured the ID exists before draining. E.g. the sender calls maybe_wait_for_writer_id() before each drain cycle, which allocates the ID via InitWriter RPC if needed, guaranteeing it exists before any batch is stamped.

Sender concurrency model. Java's sender calls sendWriteData() in a fire-and-forget style, with Netty I/O threads handling responses asynchronously. We use a FuturesUnordered event loop in a single tokio task: drain batches, fire all per-leader RPCs concurrently, then interleave response handling with the next drain cycle. This gives the same parallelism without dedicated threads per connection. When one leader is slow, responses from faster leaders still flow in and trigger new drain cycles, e.g no head-of-line blocking.

Shutdown. Java uses a volatile boolean forceClose flag that the sender loop polls. Tokio tasks support external cancellation, so we use tokio::select! with a timeout: signal graceful drain, wait up to the timeout for the sender to finish, then JoinHandle::abort() if it hasn't. BroadcastOnce::Drop automatically notifies any callers awaiting write results when their in-flight futures are cancelled, so no writes silently disappear.

@fresh-borzoni fresh-borzoni force-pushed the rust-sdk-idempotent-writes branch from 3e93a40 to 5cfb933 Compare March 1, 2026 12:46
@fresh-borzoni fresh-borzoni marked this pull request as draft March 1, 2026 12:50
@fresh-borzoni fresh-borzoni force-pushed the rust-sdk-idempotent-writes branch from 5cfb933 to 6805917 Compare March 1, 2026 13:17
@fresh-borzoni fresh-borzoni marked this pull request as ready for review March 1, 2026 13:36
@fresh-borzoni
Copy link
Contributor Author

fresh-borzoni commented Mar 1, 2026

@luoyuxia @leekeiabstraction PTAL 🙏

I did my best with PR description to explain the feature, scenarios and why rust differs in some areas.

I'll handle docs in separate PR, they usually cause annoying conflicts, so I'd better avoid it in the same PR

@fresh-borzoni fresh-borzoni force-pushed the rust-sdk-idempotent-writes branch from 6805917 to b5ea105 Compare March 1, 2026 14:06
if futures.is_empty() {
// Nothing to drain. Sleep for the ready-check
// delay to avoid busy-spinning.
// TODO: add a Notify that append() signals so we
Copy link
Contributor Author

@fresh-borzoni fresh-borzoni Mar 1, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this busy spin matches Java logic, but in Rust it's easier to fix tbh, so I'll handle this next as PR is already getting big. But should be just tokio::sync::Notify

Seems that tokio::spawn + FuturesUnordered change is making our life easier

@fresh-borzoni
Copy link
Contributor Author

fixed clippy warn, missed it, sorry

Copy link

Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull request overview

Implements idempotent write support in the Rust SDK by introducing writer ID allocation and per-bucket batch sequencing, plus client-side memory backpressure to bound buffered write data.

Changes:

  • Add InitWriter RPC support (proto + API key + request/response message types) to allocate server-side writer IDs.
  • Introduce IdempotenceManager and integrate it into the accumulator/sender pipeline to stamp batches with (writer_id, batch_sequence) and handle retry/reset semantics.
  • Add write-buffer memory limiting and new configuration knobs (propagated to Python/C++ bindings) to provide backpressure and prevent unbounded buffering.

Reviewed changes

Copilot reviewed 17 out of 17 changed files in this pull request and generated 5 comments.

Show a summary per file
File Description
crates/fluss/src/rpc/message/mod.rs Registers and re-exports the new init_writer RPC message module.
crates/fluss/src/rpc/message/init_writer.rs Defines InitWriterRequest and wires it into the versioned RPC framework.
crates/fluss/src/rpc/api_key.rs Adds ApiKey::InitWriter and maps it to protocol key 1026.
crates/fluss/src/record/arrow.rs Adds writer state setter so Arrow batches can be restamped with writer metadata.
crates/fluss/src/proto/fluss_api.proto Adds InitWriterRequest/Response protobuf messages (proto2).
crates/fluss/src/config.rs Adds idempotence + buffer/backpressure config, validation, and tests.
crates/fluss/src/client/write/writer_client.rs Integrates idempotence manager + new sender shutdown model and timeout-based close.
crates/fluss/src/client/write/sender.rs Adds writer-id initialization, idempotent retry/reset handling, and an event-loop send/response model.
crates/fluss/src/client/write/mod.rs Exposes the new idempotence module internally to the write subsystem.
crates/fluss/src/client/write/idempotence.rs Implements per-bucket sequencing, in-flight tracking, retry eligibility, and reset semantics + unit tests.
crates/fluss/src/client/write/batch.rs Adds batch writer metadata fields and plumbing to apply writer state to both Arrow + KV batches.
crates/fluss/src/client/write/accumulator.rs Adds memory limiter/backpressure and drain/re-enqueue sequencing logic for idempotent writes.
bindings/python/src/config.rs Adds Python config parsing + getters/setters for idempotence and buffer settings.
bindings/python/fluss/init.pyi Updates Python typing stubs for new config properties.
bindings/cpp/src/lib.rs Extends C++ FFI config struct to include new writer/idempotence/buffer fields.
bindings/cpp/src/ffi_converter.hpp Converts new C++ config fields into the Rust FFI config struct.
bindings/cpp/include/fluss.hpp Exposes new writer/idempotence/buffer config fields in the public C++ configuration struct.

💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.

@fresh-borzoni
Copy link
Contributor Author

Addressed comments

Copy link
Contributor

@charlesdong1991 charlesdong1991 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Really nice PR! 👍 just minor nit comments/questions

@fresh-borzoni fresh-borzoni force-pushed the rust-sdk-idempotent-writes branch from ca79f6d to dd5c42f Compare March 5, 2026 18:14
Copy link
Contributor

@leekeiabstraction leekeiabstraction left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

TY for the PR, left a couple of comments

@fresh-borzoni fresh-borzoni force-pushed the rust-sdk-idempotent-writes branch from e914731 to 1392187 Compare March 7, 2026 01:39
@fresh-borzoni
Copy link
Contributor Author

Addressed comments

@fresh-borzoni fresh-borzoni force-pushed the rust-sdk-idempotent-writes branch 2 times, most recently from c5ba7a9 to 189c3de Compare March 7, 2026 02:11
@fresh-borzoni fresh-borzoni force-pushed the rust-sdk-idempotent-writes branch from 189c3de to 360a277 Compare March 7, 2026 02:34
Copy link
Contributor

@charlesdong1991 charlesdong1991 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Very nice PR!! Thanks!!

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

Idempotent writes with writer ID allocation and batch sequencing

4 participants