Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 8 additions & 0 deletions bindings/cpp/include/fluss.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -997,6 +997,14 @@ struct Configuration {
size_t scanner_remote_log_read_concurrency{4};
// Maximum number of records returned in a single call to Poll() for LogScanner
size_t scanner_log_max_poll_records{500};
// Maximum bytes per fetch response for LogScanner (16 MB)
int32_t scanner_log_fetch_max_bytes{16 * 1024 * 1024};
// Minimum bytes to accumulate before server returns a fetch response
int32_t scanner_log_fetch_min_bytes{1};
// Maximum time (ms) the server may wait to satisfy min bytes
int32_t scanner_log_fetch_wait_max_time_ms{500};
// Maximum bytes per fetch response per bucket for LogScanner (1 MB)
int32_t scanner_log_fetch_max_bytes_for_bucket{1024 * 1024};
int64_t writer_batch_timeout_ms{100};
// Connect timeout in milliseconds for TCP transport connect
uint64_t connect_timeout_ms{120000};
Expand Down
4 changes: 4 additions & 0 deletions bindings/cpp/src/ffi_converter.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,10 @@ inline ffi::FfiConfig to_ffi_config(const Configuration& config) {
ffi_config.remote_file_download_thread_num = config.remote_file_download_thread_num;
ffi_config.scanner_remote_log_read_concurrency = config.scanner_remote_log_read_concurrency;
ffi_config.scanner_log_max_poll_records = config.scanner_log_max_poll_records;
ffi_config.scanner_log_fetch_max_bytes = config.scanner_log_fetch_max_bytes;
ffi_config.scanner_log_fetch_min_bytes = config.scanner_log_fetch_min_bytes;
ffi_config.scanner_log_fetch_wait_max_time_ms = config.scanner_log_fetch_wait_max_time_ms;
ffi_config.scanner_log_fetch_max_bytes_for_bucket = config.scanner_log_fetch_max_bytes_for_bucket;
ffi_config.writer_batch_timeout_ms = config.writer_batch_timeout_ms;
ffi_config.connect_timeout_ms = config.connect_timeout_ms;
ffi_config.security_protocol = rust::String(config.security_protocol);
Expand Down
8 changes: 8 additions & 0 deletions bindings/cpp/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,10 @@ mod ffi {
remote_file_download_thread_num: usize,
scanner_remote_log_read_concurrency: usize,
scanner_log_max_poll_records: usize,
scanner_log_fetch_max_bytes: i32,
scanner_log_fetch_min_bytes: i32,
scanner_log_fetch_wait_max_time_ms: i32,
scanner_log_fetch_max_bytes_for_bucket: i32,
writer_batch_timeout_ms: i64,
connect_timeout_ms: u64,
security_protocol: String,
Expand Down Expand Up @@ -653,6 +657,10 @@ fn new_connection(config: &ffi::FfiConfig) -> Result<*mut Connection, String> {
remote_file_download_thread_num: config.remote_file_download_thread_num,
scanner_remote_log_read_concurrency: config.scanner_remote_log_read_concurrency,
scanner_log_max_poll_records: config.scanner_log_max_poll_records,
scanner_log_fetch_max_bytes: config.scanner_log_fetch_max_bytes,
scanner_log_fetch_min_bytes: config.scanner_log_fetch_min_bytes,
scanner_log_fetch_wait_max_time_ms: config.scanner_log_fetch_wait_max_time_ms,
scanner_log_fetch_max_bytes_for_bucket: config.scanner_log_fetch_max_bytes_for_bucket,
connect_timeout_ms: config.connect_timeout_ms,
security_protocol: config.security_protocol.to_string(),
security_sasl_mechanism: config.security_sasl_mechanism.to_string(),
Expand Down
16 changes: 16 additions & 0 deletions bindings/python/fluss/__init__.pyi
Original file line number Diff line number Diff line change
Expand Up @@ -170,6 +170,22 @@ class Config:
@scanner_log_max_poll_records.setter
def scanner_log_max_poll_records(self, num: int) -> None: ...
@property
def scanner_log_fetch_max_bytes(self) -> int: ...
@scanner_log_fetch_max_bytes.setter
def scanner_log_fetch_max_bytes(self, bytes: int) -> None: ...
@property
def scanner_log_fetch_min_bytes(self) -> int: ...
@scanner_log_fetch_min_bytes.setter
def scanner_log_fetch_min_bytes(self, bytes: int) -> None: ...
@property
def scanner_log_fetch_wait_max_time_ms(self) -> int: ...
@scanner_log_fetch_wait_max_time_ms.setter
def scanner_log_fetch_wait_max_time_ms(self, ms: int) -> None: ...
@property
def scanner_log_fetch_max_bytes_for_bucket(self) -> int: ...
@scanner_log_fetch_max_bytes_for_bucket.setter
def scanner_log_fetch_max_bytes_for_bucket(self, bytes: int) -> None: ...
@property
def writer_batch_timeout_ms(self) -> int: ...
@writer_batch_timeout_ms.setter
def writer_batch_timeout_ms(self, timeout: int) -> None: ...
Expand Down
74 changes: 74 additions & 0 deletions bindings/python/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -97,6 +97,32 @@ impl Config {
))
})?;
}
"scanner.log.fetch.max-bytes" => {
config.scanner_log_fetch_max_bytes = value.parse::<i32>().map_err(|e| {
FlussError::new_err(format!("Invalid value '{value}' for '{key}': {e}"))
})?;
}
"scanner.log.fetch.min-bytes" => {
config.scanner_log_fetch_min_bytes = value.parse::<i32>().map_err(|e| {
FlussError::new_err(format!("Invalid value '{value}' for '{key}': {e}"))
})?;
}
"scanner.log.fetch.wait-max-time-ms" => {
config.scanner_log_fetch_wait_max_time_ms =
value.parse::<i32>().map_err(|e| {
FlussError::new_err(format!(
"Invalid value '{value}' for '{key}': {e}"
))
})?;
}
"scanner.log.fetch.max-bytes-for-bucket" => {
config.scanner_log_fetch_max_bytes_for_bucket =
value.parse::<i32>().map_err(|e| {
FlussError::new_err(format!(
"Invalid value '{value}' for '{key}': {e}"
))
})?;
}
"writer.bucket.no-key-assigner" => {
config.writer_bucket_no_key_assigner = match value.as_str() {
"round_robin" => fcore::config::NoKeyAssigner::RoundRobin,
Expand Down Expand Up @@ -314,6 +340,54 @@ impl Config {
fn set_security_sasl_password(&mut self, password: String) {
self.inner.security_sasl_password = password;
}

/// Get the maximum bytes per fetch response for LogScanner
#[getter]
fn scanner_log_fetch_max_bytes(&self) -> i32 {
self.inner.scanner_log_fetch_max_bytes
}

/// Set the maximum bytes per fetch response for LogScanner
#[setter]
fn set_scanner_log_fetch_max_bytes(&mut self, bytes: i32) {
self.inner.scanner_log_fetch_max_bytes = bytes;
}

/// Get the minimum bytes to accumulate before returning a fetch response
#[getter]
fn scanner_log_fetch_min_bytes(&self) -> i32 {
self.inner.scanner_log_fetch_min_bytes
}

/// Set the minimum bytes to accumulate before returning a fetch response
#[setter]
fn set_scanner_log_fetch_min_bytes(&mut self, bytes: i32) {
self.inner.scanner_log_fetch_min_bytes = bytes;
}

/// Get the maximum time (ms) the server may wait to satisfy min-bytes
#[getter]
fn scanner_log_fetch_wait_max_time_ms(&self) -> i32 {
self.inner.scanner_log_fetch_wait_max_time_ms
}

/// Set the maximum time (ms) the server may wait to satisfy min-bytes
#[setter]
fn set_scanner_log_fetch_wait_max_time_ms(&mut self, ms: i32) {
self.inner.scanner_log_fetch_wait_max_time_ms = ms;
}

/// Get the maximum bytes per fetch response per bucket for LogScanner
#[getter]
fn scanner_log_fetch_max_bytes_for_bucket(&self) -> i32 {
self.inner.scanner_log_fetch_max_bytes_for_bucket
}

/// Set the maximum bytes per fetch response per bucket for LogScanner
#[setter]
fn set_scanner_log_fetch_max_bytes_for_bucket(&mut self, bytes: i32) {
self.inner.scanner_log_fetch_max_bytes_for_bucket = bytes;
}
}

impl Config {
Expand Down
2 changes: 2 additions & 0 deletions crates/fluss/src/client/connection.rs
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,8 @@ impl FlussConnection {
pub async fn new(arg: Config) -> Result<Self> {
arg.validate_security()
.map_err(|msg| Error::IllegalArgument { message: msg })?;
arg.validate_scanner_fetch()
.map_err(|msg| Error::IllegalArgument { message: msg })?;

let timeout = Duration::from_millis(arg.connect_timeout_ms);
let connections = if arg.is_sasl_enabled() {
Expand Down
87 changes: 65 additions & 22 deletions crates/fluss/src/client/table/scanner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,17 +15,6 @@
// specific language governing permissions and limitations
// under the License.

use arrow_schema::SchemaRef;
use log::{debug, warn};
use parking_lot::{Mutex, RwLock};
use std::{
collections::{HashMap, HashSet},
slice::from_ref,
sync::Arc,
time::{Duration, Instant},
};
use tempfile::TempDir;

use crate::client::connection::FlussConnection;
use crate::client::credentials::SecurityTokenManager;
use crate::client::metadata::Metadata;
Expand All @@ -44,12 +33,16 @@ use crate::record::{
use crate::rpc::{RpcClient, RpcError, message};
use crate::util::FairBucketStatusMap;
use crate::{PartitionId, TableId};

const LOG_FETCH_MAX_BYTES: i32 = 16 * 1024 * 1024;
#[allow(dead_code)]
const LOG_FETCH_MAX_BYTES_FOR_BUCKET: i32 = 1024;
const LOG_FETCH_MIN_BYTES: i32 = 1;
const LOG_FETCH_WAIT_MAX_TIME: i32 = 500;
use arrow_schema::SchemaRef;
use log::{debug, warn};
use parking_lot::{Mutex, RwLock};
use std::{
collections::{HashMap, HashSet},
slice::from_ref,
sync::Arc,
time::{Duration, Instant},
};
use tempfile::TempDir;

pub struct TableScan<'a> {
conn: &'a FlussConnection,
Expand Down Expand Up @@ -637,6 +630,10 @@ struct LogFetcher {
log_fetch_buffer: Arc<LogFetchBuffer>,
nodes_with_pending_fetch_requests: Arc<Mutex<HashSet<i32>>>,
max_poll_records: usize,
fetch_max_bytes: i32,
fetch_min_bytes: i32,
fetch_wait_max_time_ms: i32,
fetch_max_bytes_for_bucket: i32,
}

struct FetchResponseContext {
Expand Down Expand Up @@ -697,6 +694,10 @@ impl LogFetcher {
log_fetch_buffer,
nodes_with_pending_fetch_requests: Arc::new(Mutex::new(HashSet::new())),
max_poll_records: config.scanner_log_max_poll_records,
fetch_max_bytes: config.scanner_log_fetch_max_bytes,
fetch_min_bytes: config.scanner_log_fetch_min_bytes,
fetch_wait_max_time_ms: config.scanner_log_fetch_wait_max_time_ms,
fetch_max_bytes_for_bucket: config.scanner_log_fetch_max_bytes_for_bucket,
})
}

Expand Down Expand Up @@ -1479,8 +1480,7 @@ impl LogFetcher {
partition_id: bucket.partition_id(),
bucket_id: bucket.bucket_id(),
fetch_offset: offset,
// 1M
max_fetch_bytes: 1024 * 1024,
max_fetch_bytes: self.fetch_max_bytes_for_bucket,
};

fetch_log_req_for_buckets
Expand Down Expand Up @@ -1514,10 +1514,10 @@ impl LogFetcher {

let fetch_log_request = FetchLogRequest {
follower_server_id: -1,
max_bytes: LOG_FETCH_MAX_BYTES,
max_bytes: self.fetch_max_bytes,
tables_req: vec![req_for_table],
max_wait_ms: Some(LOG_FETCH_WAIT_MAX_TIME),
min_bytes: Some(LOG_FETCH_MIN_BYTES),
max_wait_ms: Some(self.fetch_wait_max_time_ms),
min_bytes: Some(self.fetch_min_bytes),
};
(leader_id, fetch_log_request)
})
Expand Down Expand Up @@ -1990,4 +1990,47 @@ mod tests {
let result = validate_scan_support(&table_path, &table_info);
assert!(result.is_ok());
}
#[tokio::test]
async fn prepare_fetch_log_requests_uses_configured_fetch_params() -> Result<()> {
let table_path = TablePath::new("db".to_string(), "tbl".to_string());
let table_info = build_table_info(table_path.clone(), 1, 1);
let cluster = build_cluster_arc(&table_path, 1, 1);
let metadata = Arc::new(Metadata::new_for_test(cluster));
let status = Arc::new(LogScannerStatus::new());
status.assign_scan_bucket(TableBucket::new(1, 0), 0);

let config = crate::config::Config {
scanner_log_fetch_max_bytes: 1234,
scanner_log_fetch_min_bytes: 7,
scanner_log_fetch_wait_max_time_ms: 89,
scanner_log_fetch_max_bytes_for_bucket: 512,
..crate::config::Config::default()
};

let fetcher = LogFetcher::new(
table_info,
Arc::new(RpcClient::new()),
metadata,
status,
&config,
None,
)?;

let requests = fetcher.prepare_fetch_log_requests().await;
// In this test cluster, leader id should exist; but even if it changes,
// assert over all built requests.
assert!(!requests.is_empty());
for req in requests.values() {
assert_eq!(req.max_bytes, 1234);
assert_eq!(req.min_bytes, Some(7));
assert_eq!(req.max_wait_ms, Some(89));

for table_req in &req.tables_req {
for bucket_req in &table_req.buckets_req {
assert_eq!(bucket_req.max_fetch_bytes, 512);
}
}
}
Ok(())
}
}
Loading