From 652c72e1ca6292908e7ebb9621b8b6e61626def8 Mon Sep 17 00:00:00 2001 From: charlesdong1991 Date: Sat, 28 Feb 2026 22:29:57 +0100 Subject: [PATCH 1/7] Add request timeout for rpc --- bindings/cpp/include/fluss.hpp | 2 + bindings/cpp/src/ffi_converter.hpp | 1 + bindings/cpp/src/lib.rs | 2 + bindings/python/fluss/__init__.pyi | 32 ++++++++ bindings/python/src/config.rs | 17 +++++ crates/fluss/src/client/connection.rs | 10 ++- crates/fluss/src/config.rs | 8 ++ crates/fluss/src/rpc/error.rs | 4 + crates/fluss/src/rpc/server_connection.rs | 91 +++++++++++++++++++++-- 9 files changed, 160 insertions(+), 7 deletions(-) diff --git a/bindings/cpp/include/fluss.hpp b/bindings/cpp/include/fluss.hpp index b507da72..efb6a01c 100644 --- a/bindings/cpp/include/fluss.hpp +++ b/bindings/cpp/include/fluss.hpp @@ -1000,6 +1000,8 @@ struct Configuration { int64_t writer_batch_timeout_ms{100}; // Connect timeout in milliseconds for TCP transport connect uint64_t connect_timeout_ms{120000}; + // Request timeout in milliseconds for individual RPC calls + uint64_t request_timeout_ms{30000}; // Security protocol: "PLAINTEXT" (default, no auth) or "sasl" (SASL auth) std::string security_protocol{"PLAINTEXT"}; // SASL mechanism (only "PLAIN" is supported) diff --git a/bindings/cpp/src/ffi_converter.hpp b/bindings/cpp/src/ffi_converter.hpp index 33757614..440e227a 100644 --- a/bindings/cpp/src/ffi_converter.hpp +++ b/bindings/cpp/src/ffi_converter.hpp @@ -58,6 +58,7 @@ inline ffi::FfiConfig to_ffi_config(const Configuration& config) { ffi_config.scanner_log_max_poll_records = config.scanner_log_max_poll_records; ffi_config.writer_batch_timeout_ms = config.writer_batch_timeout_ms; ffi_config.connect_timeout_ms = config.connect_timeout_ms; + ffi_config.request_timeout_ms = config.request_timeout_ms; ffi_config.security_protocol = rust::String(config.security_protocol); ffi_config.security_sasl_mechanism = rust::String(config.security_sasl_mechanism); ffi_config.security_sasl_username = rust::String(config.security_sasl_username); diff --git a/bindings/cpp/src/lib.rs b/bindings/cpp/src/lib.rs index c310fc83..2356372c 100644 --- a/bindings/cpp/src/lib.rs +++ b/bindings/cpp/src/lib.rs @@ -50,6 +50,7 @@ mod ffi { scanner_log_max_poll_records: usize, writer_batch_timeout_ms: i64, connect_timeout_ms: u64, + request_timeout_ms: u64, security_protocol: String, security_sasl_mechanism: String, security_sasl_username: String, @@ -654,6 +655,7 @@ fn new_connection(config: &ffi::FfiConfig) -> Result<*mut Connection, String> { scanner_remote_log_read_concurrency: config.scanner_remote_log_read_concurrency, scanner_log_max_poll_records: config.scanner_log_max_poll_records, connect_timeout_ms: config.connect_timeout_ms, + request_timeout_ms: config.request_timeout_ms, security_protocol: config.security_protocol.to_string(), security_sasl_mechanism: config.security_sasl_mechanism.to_string(), security_sasl_username: config.security_sasl_username.to_string(), diff --git a/bindings/python/fluss/__init__.pyi b/bindings/python/fluss/__init__.pyi index 4c2142d7..d3563495 100644 --- a/bindings/python/fluss/__init__.pyi +++ b/bindings/python/fluss/__init__.pyi @@ -169,6 +169,38 @@ class Config: def scanner_log_max_poll_records(self) -> int: ... @scanner_log_max_poll_records.setter def scanner_log_max_poll_records(self, num: int) -> None: ... + @property + def writer_batch_timeout_ms(self) -> int: ... + @writer_batch_timeout_ms.setter + def writer_batch_timeout_ms(self, timeout: int) -> None: ... + @property + def connect_timeout_ms(self) -> int: ... + @connect_timeout_ms.setter + def connect_timeout_ms(self, timeout: int) -> None: ... + @property + def request_timeout_ms(self) -> int: ... + @request_timeout_ms.setter + def request_timeout_ms(self, timeout: int) -> None: ... + @property + def writer_bucket_no_key_assigner(self) -> str: ... + @writer_bucket_no_key_assigner.setter + def writer_bucket_no_key_assigner(self, value: str) -> None: ... + @property + def security_protocol(self) -> str: ... + @security_protocol.setter + def security_protocol(self, protocol: str) -> None: ... + @property + def security_sasl_mechanism(self) -> str: ... + @security_sasl_mechanism.setter + def security_sasl_mechanism(self, mechanism: str) -> None: ... + @property + def security_sasl_username(self) -> str: ... + @security_sasl_username.setter + def security_sasl_username(self, username: str) -> None: ... + @property + def security_sasl_password(self) -> str: ... + @security_sasl_password.setter + def security_sasl_password(self, password: str) -> None: ... class FlussConnection: @staticmethod diff --git a/bindings/python/src/config.rs b/bindings/python/src/config.rs index 4582d43d..6566b1ae 100644 --- a/bindings/python/src/config.rs +++ b/bindings/python/src/config.rs @@ -113,6 +113,11 @@ impl Config { FlussError::new_err(format!("Invalid value '{value}' for '{key}': {e}")) })?; } + "request-timeout-ms" => { + config.request_timeout_ms = value.parse::().map_err(|e| { + FlussError::new_err(format!("Invalid value '{value}' for '{key}': {e}")) + })?; + } "security.protocol" => { config.security_protocol = value; } @@ -267,6 +272,18 @@ impl Config { self.inner.connect_timeout_ms = timeout; } + /// Get the request timeout in milliseconds + #[getter] + fn request_timeout_ms(&self) -> u64 { + self.inner.request_timeout_ms + } + + /// Set the request timeout in milliseconds + #[setter] + fn set_request_timeout_ms(&mut self, timeout: u64) { + self.inner.request_timeout_ms = timeout; + } + /// Get the security protocol #[getter] fn security_protocol(&self) -> String { diff --git a/crates/fluss/src/client/connection.rs b/crates/fluss/src/client/connection.rs index 703b5886..e3fd4ce0 100644 --- a/crates/fluss/src/client/connection.rs +++ b/crates/fluss/src/client/connection.rs @@ -41,6 +41,7 @@ impl FlussConnection { .map_err(|msg| Error::IllegalArgument { message: msg })?; let timeout = Duration::from_millis(arg.connect_timeout_ms); + let request_timeout = Duration::from_millis(arg.request_timeout_ms); let connections = if arg.is_sasl_enabled() { Arc::new( RpcClient::new() @@ -48,10 +49,15 @@ impl FlussConnection { arg.security_sasl_username.clone(), arg.security_sasl_password.clone(), ) - .with_timeout(timeout), + .with_timeout(timeout) + .with_request_timeout(request_timeout), ) } else { - Arc::new(RpcClient::new().with_timeout(timeout)) + Arc::new( + RpcClient::new() + .with_timeout(timeout) + .with_request_timeout(request_timeout), + ) }; let metadata = Metadata::new(arg.bootstrap_servers.as_str(), connections.clone()).await?; diff --git a/crates/fluss/src/config.rs b/crates/fluss/src/config.rs index 438c9483..78d765a0 100644 --- a/crates/fluss/src/config.rs +++ b/crates/fluss/src/config.rs @@ -31,6 +31,7 @@ const DEFAULT_WRITER_BATCH_TIMEOUT_MS: i64 = 100; const DEFAULT_ACKS: &str = "all"; const DEFAULT_CONNECT_TIMEOUT_MS: u64 = 120_000; +const DEFAULT_REQUEST_TIMEOUT_MS: u64 = 30_000; const DEFAULT_SECURITY_PROTOCOL: &str = "PLAINTEXT"; const DEFAULT_SASL_MECHANISM: &str = "PLAIN"; @@ -105,6 +106,11 @@ pub struct Config { #[arg(long, default_value_t = DEFAULT_CONNECT_TIMEOUT_MS)] pub connect_timeout_ms: u64, + /// Request timeout in milliseconds for individual RPC calls. + /// Default: 30000 (30 seconds). + #[arg(long, default_value_t = DEFAULT_REQUEST_TIMEOUT_MS)] + pub request_timeout_ms: u64, + #[arg(long, default_value_t = String::from(DEFAULT_SECURITY_PROTOCOL))] pub security_protocol: String, @@ -145,6 +151,7 @@ impl std::fmt::Debug for Config { ) .field("writer_batch_timeout_ms", &self.writer_batch_timeout_ms) .field("connect_timeout_ms", &self.connect_timeout_ms) + .field("request_timeout_ms", &self.request_timeout_ms) .field("security_protocol", &self.security_protocol) .field("security_sasl_mechanism", &self.security_sasl_mechanism) .field("security_sasl_username", &self.security_sasl_username) @@ -168,6 +175,7 @@ impl Default for Config { scanner_log_max_poll_records: DEFAULT_MAX_POLL_RECORDS, writer_batch_timeout_ms: DEFAULT_WRITER_BATCH_TIMEOUT_MS, connect_timeout_ms: DEFAULT_CONNECT_TIMEOUT_MS, + request_timeout_ms: DEFAULT_REQUEST_TIMEOUT_MS, security_protocol: String::from(DEFAULT_SECURITY_PROTOCOL), security_sasl_mechanism: String::from(DEFAULT_SASL_MECHANISM), security_sasl_username: String::new(), diff --git a/crates/fluss/src/rpc/error.rs b/crates/fluss/src/rpc/error.rs index da3a11e2..85be83ed 100644 --- a/crates/fluss/src/rpc/error.rs +++ b/crates/fluss/src/rpc/error.rs @@ -19,6 +19,7 @@ use crate::rpc::api_key::ApiKey; use crate::rpc::api_version::ApiVersion; use prost::DecodeError; use std::sync::Arc; +use std::time::Duration; use thiserror::Error; #[derive(Error, Debug)] @@ -51,4 +52,7 @@ pub enum RpcError { api_key: ApiKey, api_version: ApiVersion, }, + + #[error("Request timed out after {timeout:?} for api_key={api_key:?}")] + RequestTimeout { timeout: Duration, api_key: ApiKey }, } diff --git a/crates/fluss/src/rpc/server_connection.rs b/crates/fluss/src/rpc/server_connection.rs index 13c5d9ca..8617df05 100644 --- a/crates/fluss/src/rpc/server_connection.rs +++ b/crates/fluss/src/rpc/server_connection.rs @@ -71,6 +71,7 @@ pub struct RpcClient { connections: RwLock>, client_id: Arc, timeout: Option, + request_timeout: Option, max_message_size: usize, sasl_config: Option, } @@ -81,6 +82,7 @@ impl RpcClient { connections: Default::default(), client_id: Arc::from(""), timeout: None, + request_timeout: None, max_message_size: usize::MAX, sasl_config: None, } @@ -91,6 +93,11 @@ impl RpcClient { self } + pub fn with_request_timeout(mut self, timeout: Duration) -> Self { + self.request_timeout = Some(timeout); + self + } + pub fn with_sasl(mut self, username: String, password: String) -> Self { self.sasl_config = Some(SaslConfig { username, password }); self @@ -133,6 +140,7 @@ impl RpcClient { BufStream::new(transport), self.max_message_size, self.client_id.clone(), + self.request_timeout, ); let connection = ServerConnection::new(messenger); @@ -266,6 +274,8 @@ pub struct ServerConnectionInner { state: Arc>, + request_timeout: Option, + join_handle: JoinHandle<()>, } @@ -273,7 +283,12 @@ impl ServerConnectionInner where RW: AsyncRead + AsyncWrite + Send + 'static, { - pub fn new(stream: RW, max_message_size: usize, client_id: Arc) -> Self { + pub fn new( + stream: RW, + max_message_size: usize, + client_id: Arc, + request_timeout: Option, + ) -> Self { let (stream_read, stream_write) = tokio::io::split(stream); let state = Arc::new(Mutex::new(ConnectionState::RequestMap(HashMap::default()))); let state_captured = Arc::clone(&state); @@ -337,6 +352,7 @@ where client_id, request_id: AtomicI32::new(0), state, + request_timeout, join_handle, } } @@ -388,10 +404,28 @@ where self.send_message(buf).await?; _cleanup_on_cancel.message_sent(); - let mut response = rx.await.map_err(|e| Error::UnexpectedError { - message: "Got recvError, some one close the channel".to_string(), - source: Some(Box::new(e)), - })??; + let mut response = match self.request_timeout { + Some(timeout) => match tokio::time::timeout(timeout, rx).await { + Ok(result) => result.map_err(|e| Error::UnexpectedError { + message: "Got recvError, some one close the channel".to_string(), + source: Some(Box::new(e)), + })??, + Err(_elapsed) => { + if let ConnectionState::RequestMap(map) = self.state.lock().deref_mut() { + map.remove(&request_id); + } + return Err(RpcError::RequestTimeout { + timeout, + api_key: R::API_KEY, + } + .into()); + } + }, + None => rx.await.map_err(|e| Error::UnexpectedError { + message: "Got recvError, some one close the channel".to_string(), + source: Some(Box::new(e)), + })??, + }; if let Some(error_response) = response.header.error_response { return Err(Error::FlussAPIError { @@ -561,3 +595,50 @@ impl Drop for CleanupRequestStateOnCancel { } } } + +#[cfg(test)] +mod tests { + use super::*; + use crate::metadata::TablePath; + use crate::rpc::message::TableExistsRequest; + + #[tokio::test] + async fn test_request_timeout() { + // Create a duplex stream where the "server" side never responds. + let (client_stream, _server_stream) = tokio::io::duplex(1024); + + let conn = ServerConnectionInner::new( + BufStream::new(client_stream), + usize::MAX, + Arc::from("test"), + Some(Duration::from_millis(50)), + ); + + let table_path = TablePath::new("db", "table"); + let request = TableExistsRequest::new(&table_path); + let result = conn.request(request).await; + + assert!(result.is_err()); + let err = result.unwrap_err(); + let err_msg = err.to_string(); + assert!( + err_msg.contains("timed out"), + "expected timeout error, got: {err_msg}" + ); + } + + #[tokio::test] + async fn test_request_no_timeout() { + // With None timeout, verify a connection can still be constructed without panics. + let (client_stream, _server_stream) = tokio::io::duplex(1024); + + let conn = ServerConnectionInner::new( + BufStream::new(client_stream), + usize::MAX, + Arc::from("test"), + None, + ); + + assert!(!conn.is_poisoned()); + } +} From d14b7a3041970c3f29edd44f5295e3758be1f3b4 Mon Sep 17 00:00:00 2001 From: charlesdong1991 Date: Sat, 28 Feb 2026 22:33:42 +0100 Subject: [PATCH 2/7] update pyi --- bindings/python/fluss/__init__.pyi | 28 ---------------------------- 1 file changed, 28 deletions(-) diff --git a/bindings/python/fluss/__init__.pyi b/bindings/python/fluss/__init__.pyi index d3563495..61aca816 100644 --- a/bindings/python/fluss/__init__.pyi +++ b/bindings/python/fluss/__init__.pyi @@ -170,37 +170,9 @@ class Config: @scanner_log_max_poll_records.setter def scanner_log_max_poll_records(self, num: int) -> None: ... @property - def writer_batch_timeout_ms(self) -> int: ... - @writer_batch_timeout_ms.setter - def writer_batch_timeout_ms(self, timeout: int) -> None: ... - @property - def connect_timeout_ms(self) -> int: ... - @connect_timeout_ms.setter - def connect_timeout_ms(self, timeout: int) -> None: ... - @property def request_timeout_ms(self) -> int: ... @request_timeout_ms.setter def request_timeout_ms(self, timeout: int) -> None: ... - @property - def writer_bucket_no_key_assigner(self) -> str: ... - @writer_bucket_no_key_assigner.setter - def writer_bucket_no_key_assigner(self, value: str) -> None: ... - @property - def security_protocol(self) -> str: ... - @security_protocol.setter - def security_protocol(self, protocol: str) -> None: ... - @property - def security_sasl_mechanism(self) -> str: ... - @security_sasl_mechanism.setter - def security_sasl_mechanism(self, mechanism: str) -> None: ... - @property - def security_sasl_username(self) -> str: ... - @security_sasl_username.setter - def security_sasl_username(self, username: str) -> None: ... - @property - def security_sasl_password(self) -> str: ... - @security_sasl_password.setter - def security_sasl_password(self, password: str) -> None: ... class FlussConnection: @staticmethod From 463ef9233ed1f4e975cbf91f36cd3b9a98ced253 Mon Sep 17 00:00:00 2001 From: charlesdong1991 Date: Sun, 1 Mar 2026 11:21:22 +0100 Subject: [PATCH 3/7] address reviews --- bindings/python/src/config.rs | 2 +- crates/fluss/src/rpc/server_connection.rs | 52 +++++++++++++++---- .../python/example/configuration.md | 1 + 3 files changed, 45 insertions(+), 10 deletions(-) diff --git a/bindings/python/src/config.rs b/bindings/python/src/config.rs index 6566b1ae..a7a9b729 100644 --- a/bindings/python/src/config.rs +++ b/bindings/python/src/config.rs @@ -113,7 +113,7 @@ impl Config { FlussError::new_err(format!("Invalid value '{value}' for '{key}': {e}")) })?; } - "request-timeout-ms" => { + "request-timeout" | "request-timeout-ms" => { config.request_timeout_ms = value.parse::().map_err(|e| { FlussError::new_err(format!("Invalid value '{value}' for '{key}': {e}")) })?; diff --git a/crates/fluss/src/rpc/server_connection.rs b/crates/fluss/src/rpc/server_connection.rs index 8617df05..b8c5ebdd 100644 --- a/crates/fluss/src/rpc/server_connection.rs +++ b/crates/fluss/src/rpc/server_connection.rs @@ -274,6 +274,10 @@ pub struct ServerConnectionInner { state: Arc>, + /// Per-request timeout applied to the response-wait phase only. + /// The send (write) phase is not covered; a stalled `send_message` can + /// exceed this duration. + /// TODO: Full RPC deadline semantics are a potential future enhancement. request_timeout: Option, join_handle: JoinHandle<()>, @@ -316,9 +320,9 @@ where match map.remove(&header.request_id) { Some(active_request) => active_request, _ => { - log::warn!( + log::debug!( request_id:% = header.request_id; - "Got response for unknown request", + "Ignoring response for unknown request (likely timed out or cancelled)", ); continue; } @@ -407,7 +411,11 @@ where let mut response = match self.request_timeout { Some(timeout) => match tokio::time::timeout(timeout, rx).await { Ok(result) => result.map_err(|e| Error::UnexpectedError { - message: "Got recvError, some one close the channel".to_string(), + message: format!( + "Response channel closed for request_id={request_id} api_key={:?}; \ + connection may be closed or poisoned", + R::API_KEY + ), source: Some(Box::new(e)), })??, Err(_elapsed) => { @@ -422,7 +430,11 @@ where } }, None => rx.await.map_err(|e| Error::UnexpectedError { - message: "Got recvError, some one close the channel".to_string(), + message: format!( + "Response channel closed for request_id={request_id} api_key={:?}; \ + connection may be closed or poisoned", + R::API_KEY + ), source: Some(Box::new(e)), })??, }; @@ -620,16 +632,32 @@ mod tests { assert!(result.is_err()); let err = result.unwrap_err(); - let err_msg = err.to_string(); assert!( - err_msg.contains("timed out"), - "expected timeout error, got: {err_msg}" + matches!( + err, + Error::RpcError { + source: RpcError::RequestTimeout { .. }, + .. + } + ), + "expected RequestTimeout, got: {err}" ); + + // Timeout must not poison the connection — other requests should still work. + assert!(!conn.is_poisoned()); + + // The timed-out request must be removed from the request map (no state leak). + if let ConnectionState::RequestMap(map) = conn.state.lock().deref_mut() { + assert!(map.is_empty(), "request map should be empty after timeout"); + } else { + panic!("connection should not be poisoned after a timeout"); + } } #[tokio::test] async fn test_request_no_timeout() { - // With None timeout, verify a connection can still be constructed without panics. + // With no request timeout configured, request should remain pending + // when the server does not respond. let (client_stream, _server_stream) = tokio::io::duplex(1024); let conn = ServerConnectionInner::new( @@ -639,6 +667,12 @@ mod tests { None, ); - assert!(!conn.is_poisoned()); + let table_path = TablePath::new("db", "table"); + let request = TableExistsRequest::new(&table_path); + let pending = tokio::time::timeout(Duration::from_millis(50), conn.request(request)).await; + assert!( + pending.is_err(), + "expected request to remain pending without per-request timeout" + ); } } diff --git a/website/docs/user-guide/python/example/configuration.md b/website/docs/user-guide/python/example/configuration.md index 90b1249c..d73a3717 100644 --- a/website/docs/user-guide/python/example/configuration.md +++ b/website/docs/user-guide/python/example/configuration.md @@ -35,6 +35,7 @@ with await fluss.FlussConnection.create(config) as conn: | `scanner.remote-log.read-concurrency` | Streaming read concurrency within a remote log file | `4` | | `scanner.log.max-poll-records` | Max records returned in a single poll() | `500` | | `connect-timeout` | TCP connect timeout in milliseconds | `120000` | +| `request-timeout` | Per-request RPC timeout in milliseconds (also accepts `request-timeout-ms`) | `30000` | | `security.protocol` | `PLAINTEXT` (default) or `sasl` for SASL auth | `PLAINTEXT` | | `security.sasl.mechanism` | SASL mechanism (only `PLAIN` is supported) | `PLAIN` | | `security.sasl.username` | SASL username (required when protocol is `sasl`) | (empty) | From 2e3b23c3488fee00e858ecae1d29026b04de0f79 Mon Sep 17 00:00:00 2001 From: charlesdong1991 Date: Wed, 4 Mar 2026 08:15:35 +0100 Subject: [PATCH 4/7] add mising property --- bindings/python/fluss/__init__.pyi | 1 + 1 file changed, 1 insertion(+) diff --git a/bindings/python/fluss/__init__.pyi b/bindings/python/fluss/__init__.pyi index 5e29ae43..17a784ff 100644 --- a/bindings/python/fluss/__init__.pyi +++ b/bindings/python/fluss/__init__.pyi @@ -173,6 +173,7 @@ class Config: def request_timeout_ms(self) -> int: ... @request_timeout_ms.setter def request_timeout_ms(self, timeout: int) -> None: ... + @property def writer_batch_timeout_ms(self) -> int: ... @writer_batch_timeout_ms.setter def writer_batch_timeout_ms(self, timeout: int) -> None: ... From 2dcf00dfb224ac6dd75edde50ec2d42de711c82f Mon Sep 17 00:00:00 2001 From: charlesdong1991 Date: Wed, 4 Mar 2026 09:25:26 +0100 Subject: [PATCH 5/7] update rust docs --- website/docs/user-guide/rust/api-reference.md | 1 + website/docs/user-guide/rust/example/configuration.md | 1 + 2 files changed, 2 insertions(+) diff --git a/website/docs/user-guide/rust/api-reference.md b/website/docs/user-guide/rust/api-reference.md index d539a860..2375e92e 100644 --- a/website/docs/user-guide/rust/api-reference.md +++ b/website/docs/user-guide/rust/api-reference.md @@ -21,6 +21,7 @@ Complete API reference for the Fluss Rust client. | `scanner_remote_log_read_concurrency` | `usize` | `4` | Streaming read concurrency within a remote log file | | `scanner_log_max_poll_records` | `usize` | `500` | Maximum number of records returned in a single poll() | | `connect_timeout_ms` | `u64` | `120000` | TCP connect timeout in milliseconds | +| `request_timeout_ms` | `u64` | `30000` | Per-request RPC timeout in milliseconds | | `security_protocol` | `String` | `"PLAINTEXT"` | `PLAINTEXT` (default) or `sasl` for SASL auth | | `security_sasl_mechanism` | `String` | `"PLAIN"` | SASL mechanism (only `PLAIN` is supported) | | `security_sasl_username` | `String` | (empty) | SASL username (required when protocol is `sasl`) | diff --git a/website/docs/user-guide/rust/example/configuration.md b/website/docs/user-guide/rust/example/configuration.md index f6340c97..1d9484d5 100644 --- a/website/docs/user-guide/rust/example/configuration.md +++ b/website/docs/user-guide/rust/example/configuration.md @@ -31,6 +31,7 @@ let conn = FlussConnection::new(config).await?; | `scanner_remote_log_read_concurrency` | Streaming read concurrency within a remote log file | `4` | | `scanner_log_max_poll_records` | Maximum records returned in a single `poll()` | `500` | | `connect_timeout_ms` | TCP connect timeout in milliseconds | 120000 | +| `request_timeout_ms` | Per-request RPC timeout in milliseconds | 30000 | | `security_protocol` | `PLAINTEXT` (default) or `sasl` for SASL auth | `PLAINTEXT` | | `security_sasl_mechanism` | SASL mechanism (only `PLAIN` is supported) | `PLAIN` | | `security_sasl_username` | SASL username (required when protocol is `sasl`) | (empty) | From 6fca96446bda0b41f80377dadd38a97c72cfc71d Mon Sep 17 00:00:00 2001 From: charlesdong1991 Date: Sat, 7 Mar 2026 11:38:06 +0100 Subject: [PATCH 6/7] add request-timeout-ms to api reference --- website/docs/user-guide/python/api-reference.md | 1 + 1 file changed, 1 insertion(+) diff --git a/website/docs/user-guide/python/api-reference.md b/website/docs/user-guide/python/api-reference.md index e9113b69..872debf3 100644 --- a/website/docs/user-guide/python/api-reference.md +++ b/website/docs/user-guide/python/api-reference.md @@ -22,6 +22,7 @@ Complete API reference for the Fluss Python client. | `scanner_remote_log_read_concurrency` | `scanner.remote-log.read-concurrency` | Get/set streaming read concurrency within a remote log file | | `scanner_log_max_poll_records` | `scanner.log.max-poll-records` | Get/set max number of records returned in a single poll() | | `connect_timeout_ms` | `connect-timeout` | Get/set TCP connect timeout in milliseconds | +| `request_timeout_ms` | `request-timeout` | Get/set per-request RPC timeout in milliseconds | | `security_protocol` | `security.protocol` | Get/set security protocol (`"PLAINTEXT"` or `"sasl"`) | | `security_sasl_mechanism` | `security.sasl.mechanism` | Get/set SASL mechanism (only `"PLAIN"` is supported) | | `security_sasl_username` | `security.sasl.username` | Get/set SASL username (required when protocol is `"sasl"`) | From 0e446f8bc8c6184e2e042f15c03f5adc4b1ebee5 Mon Sep 17 00:00:00 2001 From: charlesdong1991 Date: Tue, 10 Mar 2026 20:48:18 +0100 Subject: [PATCH 7/7] address comments --- crates/fluss/src/rpc/server_connection.rs | 11 ++++++++--- website/docs/user-guide/cpp/api-reference.md | 2 +- website/docs/user-guide/python/api-reference.md | 2 +- website/docs/user-guide/rust/api-reference.md | 6 +++++- 4 files changed, 15 insertions(+), 6 deletions(-) diff --git a/crates/fluss/src/rpc/server_connection.rs b/crates/fluss/src/rpc/server_connection.rs index 37f70611..1a2fe16c 100644 --- a/crates/fluss/src/rpc/server_connection.rs +++ b/crates/fluss/src/rpc/server_connection.rs @@ -88,6 +88,11 @@ impl RpcClient { } } + /// Set the timeout used when establishing TCP connections. + /// + /// Compatibility note: this builder was previously named `with_timeout`. + /// It was renamed to make timeout semantics explicit now that + /// `with_request_timeout` is also available. pub fn with_connect_timeout(mut self, timeout: Duration) -> Self { self.connect_timeout = Some(timeout); self @@ -618,7 +623,7 @@ mod tests { BufStream::new(client_stream), usize::MAX, Arc::from("test"), - Some(Duration::from_millis(50)), + Some(Duration::from_millis(500)), ); let table_path = TablePath::new("db", "table"); @@ -664,10 +669,10 @@ mod tests { let table_path = TablePath::new("db", "table"); let request = TableExistsRequest::new(&table_path); - let pending = tokio::time::timeout(Duration::from_millis(50), conn.request(request)).await; + let pending = tokio::time::timeout(Duration::from_millis(300), conn.request(request)).await; assert!( pending.is_err(), - "expected request to remain pending without per-request timeout" + "expected request to remain pending without per-request timeout", ); } } diff --git a/website/docs/user-guide/cpp/api-reference.md b/website/docs/user-guide/cpp/api-reference.md index 83264658..beaeb099 100644 --- a/website/docs/user-guide/cpp/api-reference.md +++ b/website/docs/user-guide/cpp/api-reference.md @@ -29,7 +29,7 @@ Complete API reference for the Fluss C++ client. | `scanner_remote_log_read_concurrency` | `size_t` | `4` | Streaming read concurrency within a remote log file | | `scanner_log_max_poll_records` | `size_t` | `500` | Maximum number of records returned in a single Poll() | | `connect_timeout_ms` | `uint64_t` | `120000` | TCP connect timeout in milliseconds | -| `request_timeout_ms` | `uint64_t` | `30000` | Per-request RPC timeout in milliseconds | +| `request_timeout_ms` | `uint64_t` | `30000` | Timeout in ms while waiting for an RPC response after the request is sent (request write/send can take longer) | | `security_protocol` | `std::string` | `"PLAINTEXT"` | `"PLAINTEXT"` (default) or `"sasl"` for SASL auth | | `security_sasl_mechanism` | `std::string` | `"PLAIN"` | SASL mechanism (only `"PLAIN"` is supported) | | `security_sasl_username` | `std::string` | (empty) | SASL username (required when protocol is `"sasl"`) | diff --git a/website/docs/user-guide/python/api-reference.md b/website/docs/user-guide/python/api-reference.md index 872debf3..5ade3073 100644 --- a/website/docs/user-guide/python/api-reference.md +++ b/website/docs/user-guide/python/api-reference.md @@ -22,7 +22,7 @@ Complete API reference for the Fluss Python client. | `scanner_remote_log_read_concurrency` | `scanner.remote-log.read-concurrency` | Get/set streaming read concurrency within a remote log file | | `scanner_log_max_poll_records` | `scanner.log.max-poll-records` | Get/set max number of records returned in a single poll() | | `connect_timeout_ms` | `connect-timeout` | Get/set TCP connect timeout in milliseconds | -| `request_timeout_ms` | `request-timeout` | Get/set per-request RPC timeout in milliseconds | +| `request_timeout_ms` | `request-timeout` | Get/set max time in ms to wait for an RPC response after the request is sent (does not limit request write/send time) | | `security_protocol` | `security.protocol` | Get/set security protocol (`"PLAINTEXT"` or `"sasl"`) | | `security_sasl_mechanism` | `security.sasl.mechanism` | Get/set SASL mechanism (only `"PLAIN"` is supported) | | `security_sasl_username` | `security.sasl.username` | Get/set SASL username (required when protocol is `"sasl"`) | diff --git a/website/docs/user-guide/rust/api-reference.md b/website/docs/user-guide/rust/api-reference.md index 2375e92e..c7a971c6 100644 --- a/website/docs/user-guide/rust/api-reference.md +++ b/website/docs/user-guide/rust/api-reference.md @@ -5,6 +5,10 @@ sidebar_position: 2 Complete API reference for the Fluss Rust client. +> Compatibility note: `RpcClient::with_timeout` was renamed to +> `RpcClient::with_connect_timeout` to distinguish it from +> `RpcClient::with_request_timeout`. + ## `Config` | Field | Type | Default | Description | @@ -21,7 +25,7 @@ Complete API reference for the Fluss Rust client. | `scanner_remote_log_read_concurrency` | `usize` | `4` | Streaming read concurrency within a remote log file | | `scanner_log_max_poll_records` | `usize` | `500` | Maximum number of records returned in a single poll() | | `connect_timeout_ms` | `u64` | `120000` | TCP connect timeout in milliseconds | -| `request_timeout_ms` | `u64` | `30000` | Per-request RPC timeout in milliseconds | +| `request_timeout_ms` | `u64` | `30000` | Timeout in ms while waiting for an RPC response after the request is sent (request write/send can take longer) | | `security_protocol` | `String` | `"PLAINTEXT"` | `PLAINTEXT` (default) or `sasl` for SASL auth | | `security_sasl_mechanism` | `String` | `"PLAIN"` | SASL mechanism (only `PLAIN` is supported) | | `security_sasl_username` | `String` | (empty) | SASL username (required when protocol is `sasl`) |