Conversation
3e93a40 to
5cfb933
Compare
5cfb933 to
6805917
Compare
|
@luoyuxia @leekeiabstraction PTAL 🙏 I did my best with PR description to explain the feature, scenarios and why rust differs in some areas. I'll handle docs in separate PR, they usually cause annoying conflicts, so I'd better avoid it in the same PR |
6805917 to
b5ea105
Compare
| if futures.is_empty() { | ||
| // Nothing to drain. Sleep for the ready-check | ||
| // delay to avoid busy-spinning. | ||
| // TODO: add a Notify that append() signals so we |
There was a problem hiding this comment.
this busy spin matches Java logic, but in Rust it's easier to fix tbh, so I'll handle this next as PR is already getting big. But should be just tokio::sync::Notify
Seems that tokio::spawn + FuturesUnordered change is making our life easier
|
fixed clippy warn, missed it, sorry |
There was a problem hiding this comment.
Pull request overview
Implements idempotent write support in the Rust SDK by introducing writer ID allocation and per-bucket batch sequencing, plus client-side memory backpressure to bound buffered write data.
Changes:
- Add
InitWriterRPC support (proto + API key + request/response message types) to allocate server-side writer IDs. - Introduce
IdempotenceManagerand integrate it into the accumulator/sender pipeline to stamp batches with(writer_id, batch_sequence)and handle retry/reset semantics. - Add write-buffer memory limiting and new configuration knobs (propagated to Python/C++ bindings) to provide backpressure and prevent unbounded buffering.
Reviewed changes
Copilot reviewed 17 out of 17 changed files in this pull request and generated 5 comments.
Show a summary per file
| File | Description |
|---|---|
| crates/fluss/src/rpc/message/mod.rs | Registers and re-exports the new init_writer RPC message module. |
| crates/fluss/src/rpc/message/init_writer.rs | Defines InitWriterRequest and wires it into the versioned RPC framework. |
| crates/fluss/src/rpc/api_key.rs | Adds ApiKey::InitWriter and maps it to protocol key 1026. |
| crates/fluss/src/record/arrow.rs | Adds writer state setter so Arrow batches can be restamped with writer metadata. |
| crates/fluss/src/proto/fluss_api.proto | Adds InitWriterRequest/Response protobuf messages (proto2). |
| crates/fluss/src/config.rs | Adds idempotence + buffer/backpressure config, validation, and tests. |
| crates/fluss/src/client/write/writer_client.rs | Integrates idempotence manager + new sender shutdown model and timeout-based close. |
| crates/fluss/src/client/write/sender.rs | Adds writer-id initialization, idempotent retry/reset handling, and an event-loop send/response model. |
| crates/fluss/src/client/write/mod.rs | Exposes the new idempotence module internally to the write subsystem. |
| crates/fluss/src/client/write/idempotence.rs | Implements per-bucket sequencing, in-flight tracking, retry eligibility, and reset semantics + unit tests. |
| crates/fluss/src/client/write/batch.rs | Adds batch writer metadata fields and plumbing to apply writer state to both Arrow + KV batches. |
| crates/fluss/src/client/write/accumulator.rs | Adds memory limiter/backpressure and drain/re-enqueue sequencing logic for idempotent writes. |
| bindings/python/src/config.rs | Adds Python config parsing + getters/setters for idempotence and buffer settings. |
| bindings/python/fluss/init.pyi | Updates Python typing stubs for new config properties. |
| bindings/cpp/src/lib.rs | Extends C++ FFI config struct to include new writer/idempotence/buffer fields. |
| bindings/cpp/src/ffi_converter.hpp | Converts new C++ config fields into the Rust FFI config struct. |
| bindings/cpp/include/fluss.hpp | Exposes new writer/idempotence/buffer config fields in the public C++ configuration struct. |
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
|
Addressed comments |
charlesdong1991
left a comment
There was a problem hiding this comment.
Really nice PR! 👍 just minor nit comments/questions
ca79f6d to
dd5c42f
Compare
leekeiabstraction
left a comment
There was a problem hiding this comment.
TY for the PR, left a couple of comments
e914731 to
1392187
Compare
|
Addressed comments |
c5ba7a9 to
189c3de
Compare
189c3de to
360a277
Compare
charlesdong1991
left a comment
There was a problem hiding this comment.
Very nice PR!! Thanks!!
Summary
closes #389
Implements idempotent writes for the Rust SDK. When enabled (default), the server allocates a writer ID and tracks per-bucket sequence numbers so it can detect and deduplicate retried batches. This makes retries safe - a transient network failure or leader failover no longer risks duplicate records.
How it works
Before the first batch is sent, the sender requests a writer ID from the server via a new
InitWriterRPC. Each batch is then stamped with(writer_id, batch_sequence)before being sent. The server uses this pair to reject duplicates: if it sees the same sequence again, it returns the previous result instead of writing twice.The
IdempotenceManagerowns per-bucket state: the next sequence to assign, the in-flight batch queue, and error recovery logic. When a batch completes successfully, it's removed from the in-flight queue and the next sequence advances. When a batch fails with a retriable error, it gets re-enqueued with its original sequence number - the server expects that exact sequence on retry.Two server errors trigger a full reset of writer state:
UnknownWriterId(server lost our writer session) andOutOfOrderSequencethat can't be resolved by reordering. On reset, the writer ID is cleared, all pending sequences are invalidated, and a new writer ID is allocated on the next drain cycle. This matches Java's recovery semantics.Retry scenarios and sequence reordering
With up to 5 batches in-flight per bucket, responses can arrive out of order. The idempotence manager handles this by tracking per-bucket state:
last_acked_sequence, an ordered in-flight queue, andnext_sequence.Happy path - out-of-order completion. Batches with sequences 0, 1, 2 are in-flight. Batch 1 completes before batch 0.
last_acked_sequenceadvances to 1 (it always tracks the highest completed sequence, not strictly sequential). When batch 0 later completes,last_acked_sequencestays at 1 since 0 < 1. The server accepts both because they each carry the correct sequence. No reordering needed on the client.Retriable failure with sequence adjustment. Batches 0, 1, 2 are in-flight. Batch 1 fails with a retriable error (e.g.,
NetworkException) and will be retried. But the server has never seen sequence 1, so when batch 1 is re-enqueued, there's a gap: the server expects 0, 1, 2 in order but would see 0, 2 (success), then 1 (retry). To fix this,handle_failed_batchwithadjust_sequences=trueremoves batch 1 from the in-flight queue, decrements batch 2's sequence from 2 -> 1, and rolls backnext_sequencefrom 3 -> 2. Now batch 2 arrives with sequence 1 (filling the gap), and when batch 1 is retried after re-drain, it gets a fresh sequence. The adjusted batches are marked with areset_batch_idflag socan_retry_for_errorknows they were rewritten and treatsOutOfOrderSequenceas retriable for them.Non-retriable failure - no adjustment. If batch 1 exhausts its retries,
adjust_sequences=false. Sequences are left as-is. The server will see a gap (sequence 1 missing), which triggersOutOfOrderSequencefor batch 2. Since batch 2 is not the "next expected" sequence (last_acked + 1),can_retry_for_errorreturns true, it's a reordering issue, not a genuine violation. Batch 2 gets retried. If it is the next expected sequence and still getsOutOfOrderSequence, that's a genuine server-side inconsistency, the writer ID is reset and all state is cleared.UnknownWriterId/OutOfOrderSequence- full reset. These indicate the server lost our session or detected an unrecoverable gap.handle_failed_batchclears the writer ID and wipes all bucket entries. On the next drain cycle, the sender allocates a fresh writer ID, andmaybe_update_writer_idresets each bucket's sequence to 0 (but only once its in-flight queue is empty, to avoid corrupting batches still awaiting responses from the old session).Memory backpressure
The accumulator now enforces a memory budget (default 64MB) across all buffered batches. When the buffer is full,
upsert()/append()block until in-flight batches complete and release memory. This prevents unbounded memory growth when the application writes faster than the network can drain. TheMemoryLimiteris semaphore-style with byte granularity and RAII permits, memory is released automatically when a batch is completed or dropped.Where we diverged from Java
Sequence assignment timing. Java assigns sequences at
append()time, its sender thread blocks at startup until the writer ID is allocated, so the ID is always available. Ourappend()is sync on the caller's thread with no guarantee the async writer ID allocation has completed yet, so we assign atdrain()time instead as the sender has already ensured the ID exists before draining. E.g. the sender callsmaybe_wait_for_writer_id()before each drain cycle, which allocates the ID viaInitWriterRPC if needed, guaranteeing it exists before any batch is stamped.Sender concurrency model. Java's sender calls
sendWriteData()in a fire-and-forget style, with Netty I/O threads handling responses asynchronously. We use aFuturesUnorderedevent loop in a single tokio task: drain batches, fire all per-leader RPCs concurrently, then interleave response handling with the next drain cycle. This gives the same parallelism without dedicated threads per connection. When one leader is slow, responses from faster leaders still flow in and trigger new drain cycles, e.g no head-of-line blocking.Shutdown. Java uses a
volatile boolean forceCloseflag that the sender loop polls. Tokio tasks support external cancellation, so we usetokio::select!with a timeout: signal graceful drain, wait up to the timeout for the sender to finish, thenJoinHandle::abort()if it hasn't.BroadcastOnce::Dropautomatically notifies any callers awaiting write results when their in-flight futures are cancelled, so no writes silently disappear.