Skip to content

[client] Add LookupResult::to_record_batch()#411

Open
Prajwal-banakar wants to merge 3 commits intoapache:mainfrom
Prajwal-banakar:Add-LookupResult
Open

[client] Add LookupResult::to_record_batch()#411
Prajwal-banakar wants to merge 3 commits intoapache:mainfrom
Prajwal-banakar:Add-LookupResult

Conversation

@Prajwal-banakar
Copy link
Contributor

@Prajwal-banakar Prajwal-banakar commented Mar 2, 2026

Purpose

Linked issue: close #378

The purpose of this change is to expose lookup results in Arrow format by adding a to_record_batch() method to LookupResult. This enables integration with DataFusion and other Arrow-based tools.

Brief change log

  • Added to_record_batch() method to LookupResult in crates/fluss/src/client/table/lookup.rs
  • The method uses the existing RowAppendRecordBatchBuilder to append each row and build an Arrow RecordBatch
  • Returns an empty RecordBatch with the correct schema if the result set is empty
  • Added inherent append() and build_arrow_record_batch() methods on RowAppendRecordBatchBuilder to avoid leaking the internal ArrowRecordBatchInnerBuilder trait into the lookup module
  • Updated get_single_row() and get_rows() to use panic-safe slice access (bytes.get(SCHEMA_ID_LENGTH..)) for consistency
  • Updated get_rows() return type from Vec<CompactedRow> to Result<Vec<CompactedRow>> for consistency with the new pattern

Tests

  • All 190 existing unit tests pass with no regressions
  • Added 3 unit tests for to_record_batch(): empty result, valid row, and too-short payload error case
  • cargo fmt and cargo clippy pass cleanly

API and Format

  • No changes to storage format
  • New public method LookupResult::to_record_batch() -> Result<RecordBatch> added to the client API
  • get_rows() return type changed from Vec<CompactedRow> to Result<Vec<CompactedRow>>

Documentation

  • Updated LookupResult section in website/docs/user-guide/rust/api-reference.md
  • Added to_record_batch() usage example in
    website/docs/user-guide/rust/example/primary-key-tables.md

Copy link

Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull request overview

This PR adds an Arrow interoperability API on the client lookup path by exposing lookup results as an Arrow RecordBatch, enabling easier integration with DataFusion and other Arrow-based tooling.

Changes:

  • Added LookupResult::to_record_batch() -> Result<RecordBatch> to convert lookup result rows into a columnar Arrow RecordBatch.
  • Implemented the conversion using the existing RowAppendRecordBatchBuilder, producing an empty RecordBatch (with schema) when there are no rows.

💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.

Comment on lines +101 to +103
for bytes in &self.rows {
let row = CompactedRow::from_bytes(&self.row_type, &bytes[SCHEMA_ID_LENGTH..]);
builder.append(&row)?;
Copy link

Copilot AI Mar 3, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

&bytes[SCHEMA_ID_LENGTH..] will panic if a returned row payload is shorter than SCHEMA_ID_LENGTH (e.g., corrupted/partial response). Since this is a new public API, it would be better to avoid panicking here and instead return a structured Error by using a checked slice (bytes.get(SCHEMA_ID_LENGTH..)) and failing gracefully if it’s missing.

Copilot uses AI. Check for mistakes.
}
let arc_batch = builder.build_arrow_record_batch()?;
// Unwrap the Arc — if we're the only owner, take it directly; otherwise clone.
Ok(Arc::try_unwrap(arc_batch).unwrap_or_else(|arc: Arc<RecordBatch>| (*arc).clone()))
Copy link

Copilot AI Mar 3, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The Arc::try_unwrap(...).unwrap_or_else(...) logic is equivalent to Arc::unwrap_or_clone(arc_batch) on the current MSRV (1.85), which is simpler and avoids the extra closure/type annotation noise.

Suggested change
Ok(Arc::try_unwrap(arc_batch).unwrap_or_else(|arc: Arc<RecordBatch>| (*arc).clone()))
Ok(Arc::unwrap_or_clone(arc_batch))

Copilot uses AI. Check for mistakes.
@Prajwal-banakar
Copy link
Contributor Author

Hi @luoyuxia @fresh-borzoni addressed the copilot review, PTAL!

Copy link
Contributor

@fresh-borzoni fresh-borzoni left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@Prajwal-banakar Ty for the PR, left some comments
PTAL

use crate::client::table::partition_getter::PartitionGetter;
use crate::error::{Error, Result};
use crate::metadata::{PhysicalTablePath, RowType, TableBucket, TableInfo, TablePath};
use crate::record::ArrowRecordBatchInnerBuilder;
Copy link
Contributor

@fresh-borzoni fresh-borzoni Mar 3, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we use it?

If we use trait import just to make trait method to work, it might be a better idea to expose a helper method in RowAppendRecordBatchBuilder

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This import is still required. In Rust, trait methods can only be called if the trait is in scope. RowAppendRecordBatchBuilder implements ArrowRecordBatchInnerBuilder, so without this import, .append() and .build_arrow_record_batch() are not callable. Removing it breaks the build.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, I understand the trait needs to be in scope, my concern is that ArrowRecordBatchInnerBuilder is an internal trait leaking into this module.

Could you add inherent methods on RowAppendRecordBatchBuilder that delegate to the trait methods?
Then the trait stays internal and this module just calls builder.append() / builder.build_arrow_record_batch() without the import.

/// - `Ok(RecordBatch)` - All rows in columnar Arrow format. Returns an empty
/// batch (with the correct schema) if the result set is empty.
/// - `Err(Error)` - If the conversion fails.
pub fn to_record_batch(&self) -> Result<RecordBatch> {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

it's a new user facing API, shall we add tests and docs?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done added!

let mut builder = RowAppendRecordBatchBuilder::new(&self.row_type)?;

for bytes in &self.rows {
let payload = bytes.get(SCHEMA_ID_LENGTH..).ok_or_else(|| {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

do you mind to also change get_single_row() and get_rows() to the same pattern?
or it's inconsistent with this change

/// - `Ok(RecordBatch)` - All rows in columnar Arrow format. Returns an empty
/// batch (with the correct schema) if the result set is empty.
/// - `Err(Error)` - If the conversion fails.
pub fn to_record_batch(&self) -> Result<RecordBatch> {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

what about bindings?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could you clarify what you'd like here? The core to_record_batch() method is available on LookupResult. For Python, we could add a lookup_arrow() method on Lookuper that returns a PyArrow RecordBatch — but I wanted to confirm the expected API shape before adding it.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

let's leave the bindings out of scope for now
probably we wish to have just to_arrow on lookup result, but better in followup PR

builder.append(&row)?;
}

let arc_batch = builder.build_arrow_record_batch()?;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

builder.build_arrow_record_batch().map(Arc::unwrap_or_clone)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

Add LookupResult::to_record_batch()

3 participants