[client] Allow configuring scanner fetch parameters#417
[client] Allow configuring scanner fetch parameters#417Prajwal-banakar wants to merge 5 commits intoapache:mainfrom
Conversation
fresh-borzoni
left a comment
There was a problem hiding this comment.
@Prajwal-banakar Ty for the PR, left some comments, PTAL
|
|
||
| /// Maximum bytes per fetch response for LogScanner. | ||
| /// Default: 16777216 (16MB) | ||
| #[arg(long, default_value_t = DEFAULT_SCANNER_LOG_FETCH_MAX_BYTES)] |
There was a problem hiding this comment.
New config fields need Python bindings (FfiConfig + .pyi stub), C++ bindings, and website docs (configuration.md for both languages). Also missing from api-reference.md.
There was a problem hiding this comment.
bindings and docs updated, PTAL!
There was a problem hiding this comment.
Ty, ffi_converter.hpp wiring is still missing
There was a problem hiding this comment.
Thanks for catching it, updated!
| max_fetch_bytes: 1024 * 1024, | ||
| max_fetch_bytes: self | ||
| .fetch_max_bytes | ||
| .min(DEFAULT_BUCKET_MAX_FETCH_BYTES), |
There was a problem hiding this comment.
This couples the total-response limit with the per-bucket cap, which is incorrect
Java has client.scanner.log.fetch.max-bytes-for-bucket for this
| projected_fields: Option<Vec<usize>>, | ||
| ) -> Result<Self> { | ||
| config | ||
| .validate_scanner_fetch() |
There was a problem hiding this comment.
mb we should validate during connection creation like we do with SASL fields?
|
@Prajwal-banakar Ty, but I don't see cpp bindings have been wired |
| pub async fn new(arg: Config) -> Result<Self> { | ||
| arg.validate_security() | ||
| .map_err(|msg| Error::IllegalArgument { message: msg })?; | ||
| arg.validate_scanner_fetch() // ← add this |
There was a problem hiding this comment.
we don't need this comment
There was a problem hiding this comment.
sorry, i just wired cpp bindings, PTAL!
fresh-borzoni
left a comment
There was a problem hiding this comment.
Ty, @Prajwal-banakar Left comments, PTAL
|
|
||
| /// Maximum bytes per fetch response for LogScanner. | ||
| /// Default: 16777216 (16MB) | ||
| #[arg(long, default_value_t = DEFAULT_SCANNER_LOG_FETCH_MAX_BYTES)] |
There was a problem hiding this comment.
Ty, ffi_converter.hpp wiring is still missing
| "scanner_log_fetch_min_bytes", | ||
| &self.scanner_log_fetch_min_bytes, | ||
| ) | ||
| .field( |
There was a problem hiding this comment.
missing scanner_log_fetch_max_bytes_for_bucket
crates/fluss/src/config.rs
Outdated
| pub writer_batch_timeout_ms: i64, | ||
|
|
||
| /// Maximum bytes per fetch response **per bucket** for LogScanner. | ||
| /// Default: 1048576 (1MB) (or whatever DEFAULT_BUCKET_MAX_FETCH_BYTES is) |
There was a problem hiding this comment.
let's remove 'or whatever ...' part, it's inconsistent
fresh-borzoni
left a comment
There was a problem hiding this comment.
@Prajwal-banakar TY, left minor comment, PTAL
bindings/cpp/src/ffi_converter.hpp
Outdated
| ffi_config.scanner_log_fetch_wait_max_time_ms = config.scanner_log_fetch_wait_max_time_ms; | ||
| ffi_config.scanner_log_fetch_max_bytes_for_bucket = config.scanner_log_fetch_max_bytes_for_bucket; | ||
| ffi_config.writer_batch_timeout_ms = config.writer_batch_timeout_ms; | ||
| ffi_config.writer_batch_timeout_ms = config.writer_batch_timeout_ms; |
bindings/python/src/config.rs
Outdated
| )) | ||
| })?; | ||
| } | ||
| "scanner.log.fetch-max-bytes" => { |
There was a problem hiding this comment.
Java uses scanner.log.fetch prefix, we should match it, also don't forget docs changes here
|
Hi @fresh-borzoni sorry for the repeated back-and-forth I'm still getting familiar with the codebase. Fixed the remaining issues, Thanks for the patience and thorough review! |
fresh-borzoni
left a comment
There was a problem hiding this comment.
@Prajwal-banakar All good, I understand it takes time, so happy to review and help.
Thank you, LGTM
Purpose
Linked issue: close #391
Make log scanner fetch parameters configurable (max/min bytes and max wait time) instead of using hardcoded constants, so users can tune scan behavior via
Config.Brief change log
client/table/scanner.rs.LogFetcherand wired them fromcrate::config::Config.Tests
cargo fmt --allcargo clippy -p fluss-rs --all-featurescargo test -p fluss-rscargo test -p fluss-rs --docAPI and Format
Config; no breaking changes intended if defaults remain the same)Documentation