diff --git a/bindings/cpp/include/fluss.hpp b/bindings/cpp/include/fluss.hpp index b507da72..efb6a01c 100644 --- a/bindings/cpp/include/fluss.hpp +++ b/bindings/cpp/include/fluss.hpp @@ -1000,6 +1000,8 @@ struct Configuration { int64_t writer_batch_timeout_ms{100}; // Connect timeout in milliseconds for TCP transport connect uint64_t connect_timeout_ms{120000}; + // Request timeout in milliseconds for individual RPC calls + uint64_t request_timeout_ms{30000}; // Security protocol: "PLAINTEXT" (default, no auth) or "sasl" (SASL auth) std::string security_protocol{"PLAINTEXT"}; // SASL mechanism (only "PLAIN" is supported) diff --git a/bindings/cpp/src/ffi_converter.hpp b/bindings/cpp/src/ffi_converter.hpp index 93a60bf6..60628a5e 100644 --- a/bindings/cpp/src/ffi_converter.hpp +++ b/bindings/cpp/src/ffi_converter.hpp @@ -66,6 +66,7 @@ inline ffi::FfiConfig to_ffi_config(const Configuration& config) { ffi_config.scanner_log_max_poll_records = config.scanner_log_max_poll_records; ffi_config.writer_batch_timeout_ms = config.writer_batch_timeout_ms; ffi_config.connect_timeout_ms = config.connect_timeout_ms; + ffi_config.request_timeout_ms = config.request_timeout_ms; ffi_config.security_protocol = rust::String(config.security_protocol); ffi_config.security_sasl_mechanism = rust::String(config.security_sasl_mechanism); ffi_config.security_sasl_username = rust::String(config.security_sasl_username); diff --git a/bindings/cpp/src/lib.rs b/bindings/cpp/src/lib.rs index 36b9c516..fef35ab1 100644 --- a/bindings/cpp/src/lib.rs +++ b/bindings/cpp/src/lib.rs @@ -50,6 +50,7 @@ mod ffi { scanner_log_max_poll_records: usize, writer_batch_timeout_ms: i64, connect_timeout_ms: u64, + request_timeout_ms: u64, security_protocol: String, security_sasl_mechanism: String, security_sasl_username: String, @@ -669,6 +670,7 @@ fn new_connection(config: &ffi::FfiConfig) -> ffi::FfiPtrResult { scanner_remote_log_read_concurrency: config.scanner_remote_log_read_concurrency, scanner_log_max_poll_records: config.scanner_log_max_poll_records, connect_timeout_ms: config.connect_timeout_ms, + request_timeout_ms: config.request_timeout_ms, security_protocol: config.security_protocol.to_string(), security_sasl_mechanism: config.security_sasl_mechanism.to_string(), security_sasl_username: config.security_sasl_username.to_string(), diff --git a/bindings/python/fluss/__init__.pyi b/bindings/python/fluss/__init__.pyi index 417ac9b2..2b6a7beb 100644 --- a/bindings/python/fluss/__init__.pyi +++ b/bindings/python/fluss/__init__.pyi @@ -174,6 +174,10 @@ class Config: @scanner_log_max_poll_records.setter def scanner_log_max_poll_records(self, num: int) -> None: ... @property + def request_timeout_ms(self) -> int: ... + @request_timeout_ms.setter + def request_timeout_ms(self, timeout: int) -> None: ... + @property def writer_batch_timeout_ms(self) -> int: ... @writer_batch_timeout_ms.setter def writer_batch_timeout_ms(self, timeout: int) -> None: ... diff --git a/bindings/python/src/config.rs b/bindings/python/src/config.rs index f99f9c63..9a03d168 100644 --- a/bindings/python/src/config.rs +++ b/bindings/python/src/config.rs @@ -110,6 +110,11 @@ impl Config { FlussError::new_err(format!("Invalid value '{value}' for '{key}': {e}")) })?; } + "request-timeout" => { + config.request_timeout_ms = value.parse::().map_err(|e| { + FlussError::new_err(format!("Invalid value '{value}' for '{key}': {e}")) + })?; + } "security.protocol" => { config.security_protocol = value; } @@ -282,6 +287,18 @@ impl Config { self.inner.connect_timeout_ms = timeout; } + /// Get the request timeout in milliseconds + #[getter] + fn request_timeout_ms(&self) -> u64 { + self.inner.request_timeout_ms + } + + /// Set the request timeout in milliseconds + #[setter] + fn set_request_timeout_ms(&mut self, timeout: u64) { + self.inner.request_timeout_ms = timeout; + } + /// Get the security protocol #[getter] fn security_protocol(&self) -> String { diff --git a/crates/fluss/src/client/connection.rs b/crates/fluss/src/client/connection.rs index 703b5886..9ff8a239 100644 --- a/crates/fluss/src/client/connection.rs +++ b/crates/fluss/src/client/connection.rs @@ -41,6 +41,7 @@ impl FlussConnection { .map_err(|msg| Error::IllegalArgument { message: msg })?; let timeout = Duration::from_millis(arg.connect_timeout_ms); + let request_timeout = Duration::from_millis(arg.request_timeout_ms); let connections = if arg.is_sasl_enabled() { Arc::new( RpcClient::new() @@ -48,10 +49,15 @@ impl FlussConnection { arg.security_sasl_username.clone(), arg.security_sasl_password.clone(), ) - .with_timeout(timeout), + .with_connect_timeout(timeout) + .with_request_timeout(request_timeout), ) } else { - Arc::new(RpcClient::new().with_timeout(timeout)) + Arc::new( + RpcClient::new() + .with_connect_timeout(timeout) + .with_request_timeout(request_timeout), + ) }; let metadata = Metadata::new(arg.bootstrap_servers.as_str(), connections.clone()).await?; diff --git a/crates/fluss/src/config.rs b/crates/fluss/src/config.rs index 08ffbfae..336e64dd 100644 --- a/crates/fluss/src/config.rs +++ b/crates/fluss/src/config.rs @@ -31,6 +31,7 @@ const DEFAULT_WRITER_BATCH_TIMEOUT_MS: i64 = 100; const DEFAULT_ACKS: &str = "all"; const DEFAULT_CONNECT_TIMEOUT_MS: u64 = 120_000; +const DEFAULT_REQUEST_TIMEOUT_MS: u64 = 30_000; const DEFAULT_SECURITY_PROTOCOL: &str = "PLAINTEXT"; const DEFAULT_SASL_MECHANISM: &str = "PLAIN"; @@ -101,6 +102,11 @@ pub struct Config { #[arg(long, default_value_t = DEFAULT_CONNECT_TIMEOUT_MS)] pub connect_timeout_ms: u64, + /// Request timeout in milliseconds for individual RPC calls. + /// Default: 30000 (30 seconds). + #[arg(long, default_value_t = DEFAULT_REQUEST_TIMEOUT_MS)] + pub request_timeout_ms: u64, + #[arg(long, default_value_t = String::from(DEFAULT_SECURITY_PROTOCOL))] pub security_protocol: String, @@ -141,6 +147,7 @@ impl std::fmt::Debug for Config { ) .field("writer_batch_timeout_ms", &self.writer_batch_timeout_ms) .field("connect_timeout_ms", &self.connect_timeout_ms) + .field("request_timeout_ms", &self.request_timeout_ms) .field("security_protocol", &self.security_protocol) .field("security_sasl_mechanism", &self.security_sasl_mechanism) .field("security_sasl_username", &self.security_sasl_username) @@ -164,6 +171,7 @@ impl Default for Config { scanner_log_max_poll_records: DEFAULT_MAX_POLL_RECORDS, writer_batch_timeout_ms: DEFAULT_WRITER_BATCH_TIMEOUT_MS, connect_timeout_ms: DEFAULT_CONNECT_TIMEOUT_MS, + request_timeout_ms: DEFAULT_REQUEST_TIMEOUT_MS, security_protocol: String::from(DEFAULT_SECURITY_PROTOCOL), security_sasl_mechanism: String::from(DEFAULT_SASL_MECHANISM), security_sasl_username: String::new(), diff --git a/crates/fluss/src/rpc/error.rs b/crates/fluss/src/rpc/error.rs index da3a11e2..85be83ed 100644 --- a/crates/fluss/src/rpc/error.rs +++ b/crates/fluss/src/rpc/error.rs @@ -19,6 +19,7 @@ use crate::rpc::api_key::ApiKey; use crate::rpc::api_version::ApiVersion; use prost::DecodeError; use std::sync::Arc; +use std::time::Duration; use thiserror::Error; #[derive(Error, Debug)] @@ -51,4 +52,7 @@ pub enum RpcError { api_key: ApiKey, api_version: ApiVersion, }, + + #[error("Request timed out after {timeout:?} for api_key={api_key:?}")] + RequestTimeout { timeout: Duration, api_key: ApiKey }, } diff --git a/crates/fluss/src/rpc/server_connection.rs b/crates/fluss/src/rpc/server_connection.rs index 13c5d9ca..37f70611 100644 --- a/crates/fluss/src/rpc/server_connection.rs +++ b/crates/fluss/src/rpc/server_connection.rs @@ -70,7 +70,8 @@ impl fmt::Debug for SaslConfig { pub struct RpcClient { connections: RwLock>, client_id: Arc, - timeout: Option, + connect_timeout: Option, + request_timeout: Option, max_message_size: usize, sasl_config: Option, } @@ -80,14 +81,20 @@ impl RpcClient { RpcClient { connections: Default::default(), client_id: Arc::from(""), - timeout: None, + connect_timeout: None, + request_timeout: None, max_message_size: usize::MAX, sasl_config: None, } } - pub fn with_timeout(mut self, timeout: Duration) -> Self { - self.timeout = Some(timeout); + pub fn with_connect_timeout(mut self, timeout: Duration) -> Self { + self.connect_timeout = Some(timeout); + self + } + + pub fn with_request_timeout(mut self, timeout: Duration) -> Self { + self.request_timeout = Some(timeout); self } @@ -125,7 +132,7 @@ impl RpcClient { async fn connect(&self, server_node: &ServerNode) -> Result { let url = server_node.url(); - let transport = Transport::connect(&url, self.timeout) + let transport = Transport::connect(&url, self.connect_timeout) .await .map_err(|error| ConnectionError(error.to_string()))?; @@ -133,6 +140,7 @@ impl RpcClient { BufStream::new(transport), self.max_message_size, self.client_id.clone(), + self.request_timeout, ); let connection = ServerConnection::new(messenger); @@ -266,6 +274,12 @@ pub struct ServerConnectionInner { state: Arc>, + /// Per-request timeout applied to the response-wait phase only. + /// The send (write) phase is not covered; a stalled `send_message` can + /// exceed this duration. + /// TODO: Full RPC deadline semantics are a potential future enhancement. + request_timeout: Option, + join_handle: JoinHandle<()>, } @@ -273,7 +287,12 @@ impl ServerConnectionInner where RW: AsyncRead + AsyncWrite + Send + 'static, { - pub fn new(stream: RW, max_message_size: usize, client_id: Arc) -> Self { + pub fn new( + stream: RW, + max_message_size: usize, + client_id: Arc, + request_timeout: Option, + ) -> Self { let (stream_read, stream_write) = tokio::io::split(stream); let state = Arc::new(Mutex::new(ConnectionState::RequestMap(HashMap::default()))); let state_captured = Arc::clone(&state); @@ -301,9 +320,9 @@ where match map.remove(&header.request_id) { Some(active_request) => active_request, _ => { - log::warn!( + log::debug!( request_id:% = header.request_id; - "Got response for unknown request", + "Ignoring response for unknown request (likely timed out or cancelled)", ); continue; } @@ -337,6 +356,7 @@ where client_id, request_id: AtomicI32::new(0), state, + request_timeout, join_handle, } } @@ -388,8 +408,29 @@ where self.send_message(buf).await?; _cleanup_on_cancel.message_sent(); - let mut response = rx.await.map_err(|e| Error::UnexpectedError { - message: "Got recvError, some one close the channel".to_string(), + let recv_result = match self.request_timeout { + Some(timeout) => match tokio::time::timeout(timeout, rx).await { + Ok(result) => result, + Err(_elapsed) => { + if let ConnectionState::RequestMap(map) = self.state.lock().deref_mut() { + map.remove(&request_id); + } + return Err(RpcError::RequestTimeout { + timeout, + api_key: R::API_KEY, + } + .into()); + } + }, + None => rx.await, + }; + + let mut response = recv_result.map_err(|e| Error::UnexpectedError { + message: format!( + "Response channel closed for request_id={request_id} api_key={:?}; \ + connection may be closed or poisoned", + R::API_KEY + ), source: Some(Box::new(e)), })??; @@ -561,3 +602,72 @@ impl Drop for CleanupRequestStateOnCancel { } } } + +#[cfg(test)] +mod tests { + use super::*; + use crate::metadata::TablePath; + use crate::rpc::message::TableExistsRequest; + + #[tokio::test] + async fn test_request_timeout() { + // Create a duplex stream where the "server" side never responds. + let (client_stream, _server_stream) = tokio::io::duplex(1024); + + let conn = ServerConnectionInner::new( + BufStream::new(client_stream), + usize::MAX, + Arc::from("test"), + Some(Duration::from_millis(50)), + ); + + let table_path = TablePath::new("db", "table"); + let request = TableExistsRequest::new(&table_path); + let result = conn.request(request).await; + + assert!(result.is_err()); + let err = result.unwrap_err(); + assert!( + matches!( + err, + Error::RpcError { + source: RpcError::RequestTimeout { .. }, + .. + } + ), + "expected RequestTimeout, got: {err}" + ); + + // Timeout must not poison the connection — other requests should still work. + assert!(!conn.is_poisoned()); + + // The timed-out request must be removed from the request map (no state leak). + if let ConnectionState::RequestMap(map) = conn.state.lock().deref_mut() { + assert!(map.is_empty(), "request map should be empty after timeout"); + } else { + panic!("connection should not be poisoned after a timeout"); + } + } + + #[tokio::test] + async fn test_request_no_timeout() { + // With no request timeout configured, request should remain pending + // when the server does not respond. + let (client_stream, _server_stream) = tokio::io::duplex(1024); + + let conn = ServerConnectionInner::new( + BufStream::new(client_stream), + usize::MAX, + Arc::from("test"), + None, + ); + + let table_path = TablePath::new("db", "table"); + let request = TableExistsRequest::new(&table_path); + let pending = tokio::time::timeout(Duration::from_millis(50), conn.request(request)).await; + assert!( + pending.is_err(), + "expected request to remain pending without per-request timeout" + ); + } +} diff --git a/website/docs/user-guide/cpp/api-reference.md b/website/docs/user-guide/cpp/api-reference.md index debd311d..83264658 100644 --- a/website/docs/user-guide/cpp/api-reference.md +++ b/website/docs/user-guide/cpp/api-reference.md @@ -29,6 +29,7 @@ Complete API reference for the Fluss C++ client. | `scanner_remote_log_read_concurrency` | `size_t` | `4` | Streaming read concurrency within a remote log file | | `scanner_log_max_poll_records` | `size_t` | `500` | Maximum number of records returned in a single Poll() | | `connect_timeout_ms` | `uint64_t` | `120000` | TCP connect timeout in milliseconds | +| `request_timeout_ms` | `uint64_t` | `30000` | Per-request RPC timeout in milliseconds | | `security_protocol` | `std::string` | `"PLAINTEXT"` | `"PLAINTEXT"` (default) or `"sasl"` for SASL auth | | `security_sasl_mechanism` | `std::string` | `"PLAIN"` | SASL mechanism (only `"PLAIN"` is supported) | | `security_sasl_username` | `std::string` | (empty) | SASL username (required when protocol is `"sasl"`) | diff --git a/website/docs/user-guide/python/api-reference.md b/website/docs/user-guide/python/api-reference.md index e9113b69..872debf3 100644 --- a/website/docs/user-guide/python/api-reference.md +++ b/website/docs/user-guide/python/api-reference.md @@ -22,6 +22,7 @@ Complete API reference for the Fluss Python client. | `scanner_remote_log_read_concurrency` | `scanner.remote-log.read-concurrency` | Get/set streaming read concurrency within a remote log file | | `scanner_log_max_poll_records` | `scanner.log.max-poll-records` | Get/set max number of records returned in a single poll() | | `connect_timeout_ms` | `connect-timeout` | Get/set TCP connect timeout in milliseconds | +| `request_timeout_ms` | `request-timeout` | Get/set per-request RPC timeout in milliseconds | | `security_protocol` | `security.protocol` | Get/set security protocol (`"PLAINTEXT"` or `"sasl"`) | | `security_sasl_mechanism` | `security.sasl.mechanism` | Get/set SASL mechanism (only `"PLAIN"` is supported) | | `security_sasl_username` | `security.sasl.username` | Get/set SASL username (required when protocol is `"sasl"`) | diff --git a/website/docs/user-guide/rust/api-reference.md b/website/docs/user-guide/rust/api-reference.md index d539a860..2375e92e 100644 --- a/website/docs/user-guide/rust/api-reference.md +++ b/website/docs/user-guide/rust/api-reference.md @@ -21,6 +21,7 @@ Complete API reference for the Fluss Rust client. | `scanner_remote_log_read_concurrency` | `usize` | `4` | Streaming read concurrency within a remote log file | | `scanner_log_max_poll_records` | `usize` | `500` | Maximum number of records returned in a single poll() | | `connect_timeout_ms` | `u64` | `120000` | TCP connect timeout in milliseconds | +| `request_timeout_ms` | `u64` | `30000` | Per-request RPC timeout in milliseconds | | `security_protocol` | `String` | `"PLAINTEXT"` | `PLAINTEXT` (default) or `sasl` for SASL auth | | `security_sasl_mechanism` | `String` | `"PLAIN"` | SASL mechanism (only `PLAIN` is supported) | | `security_sasl_username` | `String` | (empty) | SASL username (required when protocol is `sasl`) |