diff --git a/bindings/cpp/include/fluss.hpp b/bindings/cpp/include/fluss.hpp index b507da72..5029d9c3 100644 --- a/bindings/cpp/include/fluss.hpp +++ b/bindings/cpp/include/fluss.hpp @@ -997,6 +997,14 @@ struct Configuration { size_t scanner_remote_log_read_concurrency{4}; // Maximum number of records returned in a single call to Poll() for LogScanner size_t scanner_log_max_poll_records{500}; + // Maximum bytes per fetch response for LogScanner (16 MB) + int32_t scanner_log_fetch_max_bytes{16 * 1024 * 1024}; + // Minimum bytes to accumulate before server returns a fetch response + int32_t scanner_log_fetch_min_bytes{1}; + // Maximum time (ms) the server may wait to satisfy min bytes + int32_t scanner_log_fetch_wait_max_time_ms{500}; + // Maximum bytes per fetch response per bucket for LogScanner (1 MB) + int32_t scanner_log_fetch_max_bytes_for_bucket{1024 * 1024}; int64_t writer_batch_timeout_ms{100}; // Connect timeout in milliseconds for TCP transport connect uint64_t connect_timeout_ms{120000}; diff --git a/bindings/cpp/src/ffi_converter.hpp b/bindings/cpp/src/ffi_converter.hpp index 33757614..c55e8a69 100644 --- a/bindings/cpp/src/ffi_converter.hpp +++ b/bindings/cpp/src/ffi_converter.hpp @@ -56,6 +56,10 @@ inline ffi::FfiConfig to_ffi_config(const Configuration& config) { ffi_config.remote_file_download_thread_num = config.remote_file_download_thread_num; ffi_config.scanner_remote_log_read_concurrency = config.scanner_remote_log_read_concurrency; ffi_config.scanner_log_max_poll_records = config.scanner_log_max_poll_records; + ffi_config.scanner_log_fetch_max_bytes = config.scanner_log_fetch_max_bytes; + ffi_config.scanner_log_fetch_min_bytes = config.scanner_log_fetch_min_bytes; + ffi_config.scanner_log_fetch_wait_max_time_ms = config.scanner_log_fetch_wait_max_time_ms; + ffi_config.scanner_log_fetch_max_bytes_for_bucket = config.scanner_log_fetch_max_bytes_for_bucket; ffi_config.writer_batch_timeout_ms = config.writer_batch_timeout_ms; ffi_config.connect_timeout_ms = config.connect_timeout_ms; ffi_config.security_protocol = rust::String(config.security_protocol); diff --git a/bindings/cpp/src/lib.rs b/bindings/cpp/src/lib.rs index c310fc83..982d8f02 100644 --- a/bindings/cpp/src/lib.rs +++ b/bindings/cpp/src/lib.rs @@ -48,6 +48,10 @@ mod ffi { remote_file_download_thread_num: usize, scanner_remote_log_read_concurrency: usize, scanner_log_max_poll_records: usize, + scanner_log_fetch_max_bytes: i32, + scanner_log_fetch_min_bytes: i32, + scanner_log_fetch_wait_max_time_ms: i32, + scanner_log_fetch_max_bytes_for_bucket: i32, writer_batch_timeout_ms: i64, connect_timeout_ms: u64, security_protocol: String, @@ -653,6 +657,10 @@ fn new_connection(config: &ffi::FfiConfig) -> Result<*mut Connection, String> { remote_file_download_thread_num: config.remote_file_download_thread_num, scanner_remote_log_read_concurrency: config.scanner_remote_log_read_concurrency, scanner_log_max_poll_records: config.scanner_log_max_poll_records, + scanner_log_fetch_max_bytes: config.scanner_log_fetch_max_bytes, + scanner_log_fetch_min_bytes: config.scanner_log_fetch_min_bytes, + scanner_log_fetch_wait_max_time_ms: config.scanner_log_fetch_wait_max_time_ms, + scanner_log_fetch_max_bytes_for_bucket: config.scanner_log_fetch_max_bytes_for_bucket, connect_timeout_ms: config.connect_timeout_ms, security_protocol: config.security_protocol.to_string(), security_sasl_mechanism: config.security_sasl_mechanism.to_string(), diff --git a/bindings/python/fluss/__init__.pyi b/bindings/python/fluss/__init__.pyi index c387d734..bc20717f 100644 --- a/bindings/python/fluss/__init__.pyi +++ b/bindings/python/fluss/__init__.pyi @@ -170,6 +170,22 @@ class Config: @scanner_log_max_poll_records.setter def scanner_log_max_poll_records(self, num: int) -> None: ... @property + def scanner_log_fetch_max_bytes(self) -> int: ... + @scanner_log_fetch_max_bytes.setter + def scanner_log_fetch_max_bytes(self, bytes: int) -> None: ... + @property + def scanner_log_fetch_min_bytes(self) -> int: ... + @scanner_log_fetch_min_bytes.setter + def scanner_log_fetch_min_bytes(self, bytes: int) -> None: ... + @property + def scanner_log_fetch_wait_max_time_ms(self) -> int: ... + @scanner_log_fetch_wait_max_time_ms.setter + def scanner_log_fetch_wait_max_time_ms(self, ms: int) -> None: ... + @property + def scanner_log_fetch_max_bytes_for_bucket(self) -> int: ... + @scanner_log_fetch_max_bytes_for_bucket.setter + def scanner_log_fetch_max_bytes_for_bucket(self, bytes: int) -> None: ... + @property def writer_batch_timeout_ms(self) -> int: ... @writer_batch_timeout_ms.setter def writer_batch_timeout_ms(self, timeout: int) -> None: ... diff --git a/bindings/python/src/config.rs b/bindings/python/src/config.rs index 4582d43d..ec16dee3 100644 --- a/bindings/python/src/config.rs +++ b/bindings/python/src/config.rs @@ -97,6 +97,32 @@ impl Config { )) })?; } + "scanner.log.fetch.max-bytes" => { + config.scanner_log_fetch_max_bytes = value.parse::().map_err(|e| { + FlussError::new_err(format!("Invalid value '{value}' for '{key}': {e}")) + })?; + } + "scanner.log.fetch.min-bytes" => { + config.scanner_log_fetch_min_bytes = value.parse::().map_err(|e| { + FlussError::new_err(format!("Invalid value '{value}' for '{key}': {e}")) + })?; + } + "scanner.log.fetch.wait-max-time-ms" => { + config.scanner_log_fetch_wait_max_time_ms = + value.parse::().map_err(|e| { + FlussError::new_err(format!( + "Invalid value '{value}' for '{key}': {e}" + )) + })?; + } + "scanner.log.fetch.max-bytes-for-bucket" => { + config.scanner_log_fetch_max_bytes_for_bucket = + value.parse::().map_err(|e| { + FlussError::new_err(format!( + "Invalid value '{value}' for '{key}': {e}" + )) + })?; + } "writer.bucket.no-key-assigner" => { config.writer_bucket_no_key_assigner = match value.as_str() { "round_robin" => fcore::config::NoKeyAssigner::RoundRobin, @@ -314,6 +340,54 @@ impl Config { fn set_security_sasl_password(&mut self, password: String) { self.inner.security_sasl_password = password; } + + /// Get the maximum bytes per fetch response for LogScanner + #[getter] + fn scanner_log_fetch_max_bytes(&self) -> i32 { + self.inner.scanner_log_fetch_max_bytes + } + + /// Set the maximum bytes per fetch response for LogScanner + #[setter] + fn set_scanner_log_fetch_max_bytes(&mut self, bytes: i32) { + self.inner.scanner_log_fetch_max_bytes = bytes; + } + + /// Get the minimum bytes to accumulate before returning a fetch response + #[getter] + fn scanner_log_fetch_min_bytes(&self) -> i32 { + self.inner.scanner_log_fetch_min_bytes + } + + /// Set the minimum bytes to accumulate before returning a fetch response + #[setter] + fn set_scanner_log_fetch_min_bytes(&mut self, bytes: i32) { + self.inner.scanner_log_fetch_min_bytes = bytes; + } + + /// Get the maximum time (ms) the server may wait to satisfy min-bytes + #[getter] + fn scanner_log_fetch_wait_max_time_ms(&self) -> i32 { + self.inner.scanner_log_fetch_wait_max_time_ms + } + + /// Set the maximum time (ms) the server may wait to satisfy min-bytes + #[setter] + fn set_scanner_log_fetch_wait_max_time_ms(&mut self, ms: i32) { + self.inner.scanner_log_fetch_wait_max_time_ms = ms; + } + + /// Get the maximum bytes per fetch response per bucket for LogScanner + #[getter] + fn scanner_log_fetch_max_bytes_for_bucket(&self) -> i32 { + self.inner.scanner_log_fetch_max_bytes_for_bucket + } + + /// Set the maximum bytes per fetch response per bucket for LogScanner + #[setter] + fn set_scanner_log_fetch_max_bytes_for_bucket(&mut self, bytes: i32) { + self.inner.scanner_log_fetch_max_bytes_for_bucket = bytes; + } } impl Config { diff --git a/crates/fluss/src/client/connection.rs b/crates/fluss/src/client/connection.rs index 703b5886..2610f6da 100644 --- a/crates/fluss/src/client/connection.rs +++ b/crates/fluss/src/client/connection.rs @@ -39,6 +39,8 @@ impl FlussConnection { pub async fn new(arg: Config) -> Result { arg.validate_security() .map_err(|msg| Error::IllegalArgument { message: msg })?; + arg.validate_scanner_fetch() + .map_err(|msg| Error::IllegalArgument { message: msg })?; let timeout = Duration::from_millis(arg.connect_timeout_ms); let connections = if arg.is_sasl_enabled() { diff --git a/crates/fluss/src/client/table/scanner.rs b/crates/fluss/src/client/table/scanner.rs index e837ba76..43025393 100644 --- a/crates/fluss/src/client/table/scanner.rs +++ b/crates/fluss/src/client/table/scanner.rs @@ -15,17 +15,6 @@ // specific language governing permissions and limitations // under the License. -use arrow_schema::SchemaRef; -use log::{debug, warn}; -use parking_lot::{Mutex, RwLock}; -use std::{ - collections::{HashMap, HashSet}, - slice::from_ref, - sync::Arc, - time::{Duration, Instant}, -}; -use tempfile::TempDir; - use crate::client::connection::FlussConnection; use crate::client::credentials::SecurityTokenManager; use crate::client::metadata::Metadata; @@ -44,12 +33,16 @@ use crate::record::{ use crate::rpc::{RpcClient, RpcError, message}; use crate::util::FairBucketStatusMap; use crate::{PartitionId, TableId}; - -const LOG_FETCH_MAX_BYTES: i32 = 16 * 1024 * 1024; -#[allow(dead_code)] -const LOG_FETCH_MAX_BYTES_FOR_BUCKET: i32 = 1024; -const LOG_FETCH_MIN_BYTES: i32 = 1; -const LOG_FETCH_WAIT_MAX_TIME: i32 = 500; +use arrow_schema::SchemaRef; +use log::{debug, warn}; +use parking_lot::{Mutex, RwLock}; +use std::{ + collections::{HashMap, HashSet}, + slice::from_ref, + sync::Arc, + time::{Duration, Instant}, +}; +use tempfile::TempDir; pub struct TableScan<'a> { conn: &'a FlussConnection, @@ -637,6 +630,10 @@ struct LogFetcher { log_fetch_buffer: Arc, nodes_with_pending_fetch_requests: Arc>>, max_poll_records: usize, + fetch_max_bytes: i32, + fetch_min_bytes: i32, + fetch_wait_max_time_ms: i32, + fetch_max_bytes_for_bucket: i32, } struct FetchResponseContext { @@ -697,6 +694,10 @@ impl LogFetcher { log_fetch_buffer, nodes_with_pending_fetch_requests: Arc::new(Mutex::new(HashSet::new())), max_poll_records: config.scanner_log_max_poll_records, + fetch_max_bytes: config.scanner_log_fetch_max_bytes, + fetch_min_bytes: config.scanner_log_fetch_min_bytes, + fetch_wait_max_time_ms: config.scanner_log_fetch_wait_max_time_ms, + fetch_max_bytes_for_bucket: config.scanner_log_fetch_max_bytes_for_bucket, }) } @@ -1479,8 +1480,7 @@ impl LogFetcher { partition_id: bucket.partition_id(), bucket_id: bucket.bucket_id(), fetch_offset: offset, - // 1M - max_fetch_bytes: 1024 * 1024, + max_fetch_bytes: self.fetch_max_bytes_for_bucket, }; fetch_log_req_for_buckets @@ -1514,10 +1514,10 @@ impl LogFetcher { let fetch_log_request = FetchLogRequest { follower_server_id: -1, - max_bytes: LOG_FETCH_MAX_BYTES, + max_bytes: self.fetch_max_bytes, tables_req: vec![req_for_table], - max_wait_ms: Some(LOG_FETCH_WAIT_MAX_TIME), - min_bytes: Some(LOG_FETCH_MIN_BYTES), + max_wait_ms: Some(self.fetch_wait_max_time_ms), + min_bytes: Some(self.fetch_min_bytes), }; (leader_id, fetch_log_request) }) @@ -1990,4 +1990,47 @@ mod tests { let result = validate_scan_support(&table_path, &table_info); assert!(result.is_ok()); } + #[tokio::test] + async fn prepare_fetch_log_requests_uses_configured_fetch_params() -> Result<()> { + let table_path = TablePath::new("db".to_string(), "tbl".to_string()); + let table_info = build_table_info(table_path.clone(), 1, 1); + let cluster = build_cluster_arc(&table_path, 1, 1); + let metadata = Arc::new(Metadata::new_for_test(cluster)); + let status = Arc::new(LogScannerStatus::new()); + status.assign_scan_bucket(TableBucket::new(1, 0), 0); + + let config = crate::config::Config { + scanner_log_fetch_max_bytes: 1234, + scanner_log_fetch_min_bytes: 7, + scanner_log_fetch_wait_max_time_ms: 89, + scanner_log_fetch_max_bytes_for_bucket: 512, + ..crate::config::Config::default() + }; + + let fetcher = LogFetcher::new( + table_info, + Arc::new(RpcClient::new()), + metadata, + status, + &config, + None, + )?; + + let requests = fetcher.prepare_fetch_log_requests().await; + // In this test cluster, leader id should exist; but even if it changes, + // assert over all built requests. + assert!(!requests.is_empty()); + for req in requests.values() { + assert_eq!(req.max_bytes, 1234); + assert_eq!(req.min_bytes, Some(7)); + assert_eq!(req.max_wait_ms, Some(89)); + + for table_req in &req.tables_req { + for bucket_req in &table_req.buckets_req { + assert_eq!(bucket_req.max_fetch_bytes, 512); + } + } + } + Ok(()) + } } diff --git a/crates/fluss/src/config.rs b/crates/fluss/src/config.rs index 438c9483..870fdce7 100644 --- a/crates/fluss/src/config.rs +++ b/crates/fluss/src/config.rs @@ -27,7 +27,11 @@ const DEFAULT_PREFETCH_NUM: usize = 4; const DEFAULT_DOWNLOAD_THREADS: usize = 3; const DEFAULT_SCANNER_REMOTE_LOG_READ_CONCURRENCY: usize = 4; const DEFAULT_MAX_POLL_RECORDS: usize = 500; +const DEFAULT_SCANNER_LOG_FETCH_MAX_BYTES: i32 = 16 * 1024 * 1024; +const DEFAULT_SCANNER_LOG_FETCH_MIN_BYTES: i32 = 1; +const DEFAULT_SCANNER_LOG_FETCH_WAIT_MAX_TIME_MS: i32 = 500; const DEFAULT_WRITER_BATCH_TIMEOUT_MS: i64 = 100; +const DEFAULT_SCANNER_LOG_FETCH_MAX_BYTES_FOR_BUCKET: i32 = 1024 * 1024; const DEFAULT_ACKS: &str = "all"; const DEFAULT_CONNECT_TIMEOUT_MS: u64 = 120_000; @@ -95,11 +99,31 @@ pub struct Config { #[arg(long, default_value_t = DEFAULT_MAX_POLL_RECORDS)] pub scanner_log_max_poll_records: usize, + /// Maximum bytes per fetch response for LogScanner. + /// Default: 16777216 (16MB) + #[arg(long, default_value_t = DEFAULT_SCANNER_LOG_FETCH_MAX_BYTES)] + pub scanner_log_fetch_max_bytes: i32, + + /// Minimum bytes to accumulate before returning a fetch response. + /// Default: 1 + #[arg(long, default_value_t = DEFAULT_SCANNER_LOG_FETCH_MIN_BYTES)] + pub scanner_log_fetch_min_bytes: i32, + + /// Maximum time the server may wait (ms) to satisfy min-bytes. + /// Default: 500 + #[arg(long, default_value_t = DEFAULT_SCANNER_LOG_FETCH_WAIT_MAX_TIME_MS)] + pub scanner_log_fetch_wait_max_time_ms: i32, + /// The maximum time to wait for a batch to be completed in milliseconds. /// Default: 100 (matching Java CLIENT_WRITER_BATCH_TIMEOUT) #[arg(long, default_value_t = DEFAULT_WRITER_BATCH_TIMEOUT_MS)] pub writer_batch_timeout_ms: i64, + /// Maximum bytes per fetch response **per bucket** for LogScanner. + /// Default: 1048576 (1MB) + #[arg(long, default_value_t = DEFAULT_SCANNER_LOG_FETCH_MAX_BYTES_FOR_BUCKET)] + pub scanner_log_fetch_max_bytes_for_bucket: i32, + /// Connect timeout in milliseconds for TCP transport connect. /// Default: 120000 (120 seconds). #[arg(long, default_value_t = DEFAULT_CONNECT_TIMEOUT_MS)] @@ -143,6 +167,22 @@ impl std::fmt::Debug for Config { "scanner_log_max_poll_records", &self.scanner_log_max_poll_records, ) + .field( + "scanner_log_fetch_max_bytes", + &self.scanner_log_fetch_max_bytes, + ) + .field( + "scanner_log_fetch_min_bytes", + &self.scanner_log_fetch_min_bytes, + ) + .field( + "scanner_log_fetch_max_bytes_for_bucket", + &self.scanner_log_fetch_max_bytes_for_bucket, + ) + .field( + "scanner_log_fetch_wait_max_time_ms", + &self.scanner_log_fetch_wait_max_time_ms, + ) .field("writer_batch_timeout_ms", &self.writer_batch_timeout_ms) .field("connect_timeout_ms", &self.connect_timeout_ms) .field("security_protocol", &self.security_protocol) @@ -166,6 +206,10 @@ impl Default for Config { remote_file_download_thread_num: DEFAULT_DOWNLOAD_THREADS, scanner_remote_log_read_concurrency: DEFAULT_SCANNER_REMOTE_LOG_READ_CONCURRENCY, scanner_log_max_poll_records: DEFAULT_MAX_POLL_RECORDS, + scanner_log_fetch_max_bytes: DEFAULT_SCANNER_LOG_FETCH_MAX_BYTES, + scanner_log_fetch_min_bytes: DEFAULT_SCANNER_LOG_FETCH_MIN_BYTES, + scanner_log_fetch_wait_max_time_ms: DEFAULT_SCANNER_LOG_FETCH_WAIT_MAX_TIME_MS, + scanner_log_fetch_max_bytes_for_bucket: DEFAULT_SCANNER_LOG_FETCH_MAX_BYTES_FOR_BUCKET, writer_batch_timeout_ms: DEFAULT_WRITER_BATCH_TIMEOUT_MS, connect_timeout_ms: DEFAULT_CONNECT_TIMEOUT_MS, security_protocol: String::from(DEFAULT_SECURITY_PROTOCOL), @@ -209,6 +253,32 @@ impl Config { } Ok(()) } + pub fn validate_scanner_fetch(&self) -> Result<(), String> { + if self.scanner_log_fetch_min_bytes <= 0 { + return Err("scanner_log_fetch_min_bytes must be > 0".to_string()); + } + if self.scanner_log_fetch_max_bytes <= 0 { + return Err("scanner_log_fetch_max_bytes must be > 0".to_string()); + } + if self.scanner_log_fetch_max_bytes < self.scanner_log_fetch_min_bytes { + return Err( + "scanner_log_fetch_max_bytes must be >= scanner_log_fetch_min_bytes".to_string(), + ); + } + if self.scanner_log_fetch_wait_max_time_ms < 0 { + return Err("scanner_log_fetch_wait_max_time_ms must be >= 0".to_string()); + } + if self.scanner_log_fetch_max_bytes_for_bucket <= 0 { + return Err("scanner_log_fetch_max_bytes_for_bucket must be > 0".to_string()); + } + if self.scanner_log_fetch_max_bytes_for_bucket > self.scanner_log_fetch_max_bytes { + return Err( + "scanner_log_fetch_max_bytes_for_bucket must be <= scanner_log_fetch_max_bytes" + .to_string(), + ); + } + Ok(()) + } } #[cfg(test)] @@ -278,4 +348,31 @@ mod tests { }; assert!(config.validate_security().is_err()); } + #[test] + fn test_scanner_fetch_defaults_valid() { + let config = Config::default(); + assert!(config.validate_scanner_fetch().is_ok()); + assert_eq!(config.scanner_log_fetch_max_bytes, 16 * 1024 * 1024); + assert_eq!(config.scanner_log_fetch_min_bytes, 1); + assert_eq!(config.scanner_log_fetch_wait_max_time_ms, 500); + } + + #[test] + fn test_scanner_fetch_invalid_ranges() { + let config = Config { + scanner_log_fetch_min_bytes: 2, + scanner_log_fetch_max_bytes: 1, + ..Config::default() + }; + assert!(config.validate_scanner_fetch().is_err()); + } + + #[test] + fn test_scanner_fetch_negative_wait() { + let config = Config { + scanner_log_fetch_wait_max_time_ms: -1, + ..Config::default() + }; + assert!(config.validate_scanner_fetch().is_err()); + } } diff --git a/website/docs/user-guide/cpp/api-reference.md b/website/docs/user-guide/cpp/api-reference.md index debd311d..d14cf16d 100644 --- a/website/docs/user-guide/cpp/api-reference.md +++ b/website/docs/user-guide/cpp/api-reference.md @@ -28,6 +28,10 @@ Complete API reference for the Fluss C++ client. | `remote_file_download_thread_num` | `size_t` | `3` | Number of threads for remote log downloads | | `scanner_remote_log_read_concurrency` | `size_t` | `4` | Streaming read concurrency within a remote log file | | `scanner_log_max_poll_records` | `size_t` | `500` | Maximum number of records returned in a single Poll() | +| `scanner_log_fetch_max_bytes` | `int32_t` | `16777216` (16 MB) | Maximum bytes per fetch response for LogScanner | +| `scanner_log_fetch_min_bytes` | `int32_t` | `1` | Minimum bytes the server must accumulate before returning a fetch response | +| `scanner_log_fetch_wait_max_time_ms` | `int32_t` | `500` | Maximum time (ms) the server may wait to satisfy min-bytes | +| `scanner_log_fetch_max_bytes_for_bucket`| `int32_t` | `1048576` (1 MB) | Maximum bytes per fetch response per bucket for LogScanner | | `connect_timeout_ms` | `uint64_t` | `120000` | TCP connect timeout in milliseconds | | `security_protocol` | `std::string` | `"PLAINTEXT"` | `"PLAINTEXT"` (default) or `"sasl"` for SASL auth | | `security_sasl_mechanism` | `std::string` | `"PLAIN"` | SASL mechanism (only `"PLAIN"` is supported) | diff --git a/website/docs/user-guide/cpp/example/configuration.md b/website/docs/user-guide/cpp/example/configuration.md index f4b6309b..7fb5a6a9 100644 --- a/website/docs/user-guide/cpp/example/configuration.md +++ b/website/docs/user-guide/cpp/example/configuration.md @@ -36,6 +36,10 @@ config.scanner_remote_log_prefetch_num = 4; // Remote log pre config.remote_file_download_thread_num = 3; // Download threads config.scanner_remote_log_read_concurrency = 4; // In-file remote log read concurrency config.scanner_log_max_poll_records = 500; // Max records per poll +config.scanner_log_fetch_max_bytes = 16 * 1024 * 1024; // Max bytes per fetch response (16 MB) +config.scanner_log_fetch_min_bytes = 1; // Min bytes before server returns fetch response +config.scanner_log_fetch_wait_max_time_ms = 500; // Max wait (ms) to satisfy min-bytes +config.scanner_log_fetch_max_bytes_for_bucket = 1024 * 1024; // Max bytes per fetch response per bucket (1 MB) config.connect_timeout_ms = 120000; // TCP connect timeout (ms) ``` diff --git a/website/docs/user-guide/python/api-reference.md b/website/docs/user-guide/python/api-reference.md index 8f7ab61b..d313a31e 100644 --- a/website/docs/user-guide/python/api-reference.md +++ b/website/docs/user-guide/python/api-reference.md @@ -21,6 +21,10 @@ Complete API reference for the Fluss Python client. | `remote_file_download_thread_num` | Get/set number of threads for remote log downloads | | `scanner_remote_log_read_concurrency` | Get/set streaming read concurrency within a remote log file | | `scanner_log_max_poll_records` | Get/set max number of records returned in a single poll() | +| `scanner_log_fetch_max_bytes` | Get/set maximum bytes per fetch response for LogScanner | +| `scanner_log_fetch_min_bytes` | Get/set minimum bytes the server must accumulate before returning a fetch response | +| `scanner_log_fetch_wait_max_time_ms` | Get/set maximum time (ms) the server may wait to satisfy min-bytes | +| `scanner_log_fetch_max_bytes_for_bucket` | Get/set maximum bytes per fetch response per bucket for LogScanner | | `connect_timeout_ms` | Get/set TCP connect timeout in milliseconds | | `security_protocol` | Get/set security protocol (`"PLAINTEXT"` or `"sasl"`) | | `security_sasl_mechanism` | Get/set SASL mechanism (only `"PLAIN"` is supported) | diff --git a/website/docs/user-guide/python/example/configuration.md b/website/docs/user-guide/python/example/configuration.md index 90b1249c..9f3c7abb 100644 --- a/website/docs/user-guide/python/example/configuration.md +++ b/website/docs/user-guide/python/example/configuration.md @@ -34,6 +34,10 @@ with await fluss.FlussConnection.create(config) as conn: | `remote-file.download-thread-num` | Number of threads for remote log downloads | `3` | | `scanner.remote-log.read-concurrency` | Streaming read concurrency within a remote log file | `4` | | `scanner.log.max-poll-records` | Max records returned in a single poll() | `500` | +| `scanner.log.fetch.max-bytes` | Maximum bytes per fetch response for LogScanner | `16777216` (16 MB) | +| `scanner.log.fetch.min-bytes` | Minimum bytes the server must accumulate before returning a fetch response | `1` | +| `scanner.log.fetch.wait-max-time-ms` | Maximum time (ms) the server may wait to satisfy min-bytes | `500` | +| `scanner.log.fetch.max-bytes-for-bucket`| Maximum bytes per fetch response per bucket for LogScanner | `1048576` (1 MB) | | `connect-timeout` | TCP connect timeout in milliseconds | `120000` | | `security.protocol` | `PLAINTEXT` (default) or `sasl` for SASL auth | `PLAINTEXT` | | `security.sasl.mechanism` | SASL mechanism (only `PLAIN` is supported) | `PLAIN` | diff --git a/website/docs/user-guide/rust/api-reference.md b/website/docs/user-guide/rust/api-reference.md index d539a860..7f455226 100644 --- a/website/docs/user-guide/rust/api-reference.md +++ b/website/docs/user-guide/rust/api-reference.md @@ -20,6 +20,10 @@ Complete API reference for the Fluss Rust client. | `remote_file_download_thread_num` | `usize` | `3` | Number of threads for remote log downloads | | `scanner_remote_log_read_concurrency` | `usize` | `4` | Streaming read concurrency within a remote log file | | `scanner_log_max_poll_records` | `usize` | `500` | Maximum number of records returned in a single poll() | +| `scanner_log_fetch_max_bytes` | `i32` | `16777216` (16 MB) | Maximum bytes per fetch response for LogScanner | +| `scanner_log_fetch_min_bytes` | `i32` | `1` | Minimum bytes the server must accumulate before returning a fetch response | +| `scanner_log_fetch_wait_max_time_ms` | `i32` | `500` | Maximum time (ms) the server may wait to satisfy min-bytes | +| `scanner_log_fetch_max_bytes_for_bucket`| `i32` | `1048576` (1 MB) | Maximum bytes per fetch response per bucket for LogScanner | | `connect_timeout_ms` | `u64` | `120000` | TCP connect timeout in milliseconds | | `security_protocol` | `String` | `"PLAINTEXT"` | `PLAINTEXT` (default) or `sasl` for SASL auth | | `security_sasl_mechanism` | `String` | `"PLAIN"` | SASL mechanism (only `PLAIN` is supported) | diff --git a/website/docs/user-guide/rust/example/configuration.md b/website/docs/user-guide/rust/example/configuration.md index f6340c97..10e2692a 100644 --- a/website/docs/user-guide/rust/example/configuration.md +++ b/website/docs/user-guide/rust/example/configuration.md @@ -30,8 +30,12 @@ let conn = FlussConnection::new(config).await?; | `remote_file_download_thread_num` | Number of concurrent remote log file downloads | `3` | | `scanner_remote_log_read_concurrency` | Streaming read concurrency within a remote log file | `4` | | `scanner_log_max_poll_records` | Maximum records returned in a single `poll()` | `500` | +| `scanner_log_fetch_max_bytes` | Maximum bytes per fetch response for LogScanner | 16 MB | +| `scanner_log_fetch_min_bytes` | Minimum bytes the server must accumulate before returning a fetch response | `1` | +| `scanner_log_fetch_wait_max_time_ms` | Maximum time (ms) the server may wait to satisfy min-bytes | `500` | +| `scanner_log_fetch_max_bytes_for_bucket`| Maximum bytes per fetch response per bucket for LogScanner | 1 MB | | `connect_timeout_ms` | TCP connect timeout in milliseconds | 120000 | -| `security_protocol` | `PLAINTEXT` (default) or `sasl` for SASL auth | `PLAINTEXT` | +| `security_protocol` | `PLAINTEXT` (default) or `sasl` for SASL auth | `PLAINTEXT` | | `security_sasl_mechanism` | SASL mechanism (only `PLAIN` is supported) | `PLAIN` | | `security_sasl_username` | SASL username (required when protocol is `sasl`) | (empty) | | `security_sasl_password` | SASL password (required when protocol is `sasl`) | (empty) |