Skip to content

[client] Allow configuring scanner fetch parameters#417

Open
Prajwal-banakar wants to merge 5 commits intoapache:mainfrom
Prajwal-banakar:configure-scanner
Open

[client] Allow configuring scanner fetch parameters#417
Prajwal-banakar wants to merge 5 commits intoapache:mainfrom
Prajwal-banakar:configure-scanner

Conversation

@Prajwal-banakar
Copy link
Contributor

Purpose

Linked issue: close #391

Make log scanner fetch parameters configurable (max/min bytes and max wait time) instead of using hardcoded constants, so users can tune scan behavior via Config.

Brief change log

  • Removed hardcoded scanner fetch constants from client/table/scanner.rs.
  • Added fetch tuning fields to LogFetcher and wired them from crate::config::Config.
  • Updated fetch request construction to use configured values (including per-bucket max fetch cap).
  • Added/updated unit tests to validate configured fetch params are applied.

Tests

  • cargo fmt --all
  • cargo clippy -p fluss-rs --all-features
  • cargo test -p fluss-rs
  • Doc-tests: cargo test -p fluss-rs --doc

API and Format

  • API: Yes (adds/uses scanner fetch tuning knobs in Config; no breaking changes intended if defaults remain the same)
  • Storage format: No

Documentation

  • No new user-facing docs added (existing configuration/usage remains the same; defaults unchanged).

Copy link
Contributor

@fresh-borzoni fresh-borzoni left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@Prajwal-banakar Ty for the PR, left some comments, PTAL


/// Maximum bytes per fetch response for LogScanner.
/// Default: 16777216 (16MB)
#[arg(long, default_value_t = DEFAULT_SCANNER_LOG_FETCH_MAX_BYTES)]
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

New config fields need Python bindings (FfiConfig + .pyi stub), C++ bindings, and website docs (configuration.md for both languages). Also missing from api-reference.md.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

bindings and docs updated, PTAL!

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ty, ffi_converter.hpp wiring is still missing

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for catching it, updated!

max_fetch_bytes: 1024 * 1024,
max_fetch_bytes: self
.fetch_max_bytes
.min(DEFAULT_BUCKET_MAX_FETCH_BYTES),
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This couples the total-response limit with the per-bucket cap, which is incorrect
Java has client.scanner.log.fetch.max-bytes-for-bucket for this

projected_fields: Option<Vec<usize>>,
) -> Result<Self> {
config
.validate_scanner_fetch()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

mb we should validate during connection creation like we do with SASL fields?

@fresh-borzoni
Copy link
Contributor

@Prajwal-banakar Ty, but I don't see cpp bindings have been wired

pub async fn new(arg: Config) -> Result<Self> {
arg.validate_security()
.map_err(|msg| Error::IllegalArgument { message: msg })?;
arg.validate_scanner_fetch() // ← add this
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we don't need this comment

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

sorry, i just wired cpp bindings, PTAL!

Copy link
Contributor

@fresh-borzoni fresh-borzoni left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ty, @Prajwal-banakar Left comments, PTAL


/// Maximum bytes per fetch response for LogScanner.
/// Default: 16777216 (16MB)
#[arg(long, default_value_t = DEFAULT_SCANNER_LOG_FETCH_MAX_BYTES)]
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ty, ffi_converter.hpp wiring is still missing

"scanner_log_fetch_min_bytes",
&self.scanner_log_fetch_min_bytes,
)
.field(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

missing scanner_log_fetch_max_bytes_for_bucket

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

updated

pub writer_batch_timeout_ms: i64,

/// Maximum bytes per fetch response **per bucket** for LogScanner.
/// Default: 1048576 (1MB) (or whatever DEFAULT_BUCKET_MAX_FETCH_BYTES is)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

let's remove 'or whatever ...' part, it's inconsistent

Copy link
Contributor

@fresh-borzoni fresh-borzoni left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@Prajwal-banakar TY, left minor comment, PTAL

ffi_config.scanner_log_fetch_wait_max_time_ms = config.scanner_log_fetch_wait_max_time_ms;
ffi_config.scanner_log_fetch_max_bytes_for_bucket = config.scanner_log_fetch_max_bytes_for_bucket;
ffi_config.writer_batch_timeout_ms = config.writer_batch_timeout_ms;
ffi_config.writer_batch_timeout_ms = config.writer_batch_timeout_ms;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

duplicated line

))
})?;
}
"scanner.log.fetch-max-bytes" => {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Java uses scanner.log.fetch prefix, we should match it, also don't forget docs changes here

@Prajwal-banakar
Copy link
Contributor Author

Hi @fresh-borzoni sorry for the repeated back-and-forth I'm still getting familiar with the codebase. Fixed the remaining issues, Thanks for the patience and thorough review!

Copy link
Contributor

@fresh-borzoni fresh-borzoni left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@Prajwal-banakar All good, I understand it takes time, so happy to review and help.

Thank you, LGTM

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

Allow to configure scanner fetch parameters

2 participants