[client] Add LookupResult::to_record_batch()#411
[client] Add LookupResult::to_record_batch()#411Prajwal-banakar wants to merge 3 commits intoapache:mainfrom
Conversation
There was a problem hiding this comment.
Pull request overview
This PR adds an Arrow interoperability API on the client lookup path by exposing lookup results as an Arrow RecordBatch, enabling easier integration with DataFusion and other Arrow-based tooling.
Changes:
- Added
LookupResult::to_record_batch() -> Result<RecordBatch>to convert lookup result rows into a columnar ArrowRecordBatch. - Implemented the conversion using the existing
RowAppendRecordBatchBuilder, producing an emptyRecordBatch(with schema) when there are no rows.
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
| for bytes in &self.rows { | ||
| let row = CompactedRow::from_bytes(&self.row_type, &bytes[SCHEMA_ID_LENGTH..]); | ||
| builder.append(&row)?; |
There was a problem hiding this comment.
&bytes[SCHEMA_ID_LENGTH..] will panic if a returned row payload is shorter than SCHEMA_ID_LENGTH (e.g., corrupted/partial response). Since this is a new public API, it would be better to avoid panicking here and instead return a structured Error by using a checked slice (bytes.get(SCHEMA_ID_LENGTH..)) and failing gracefully if it’s missing.
| } | ||
| let arc_batch = builder.build_arrow_record_batch()?; | ||
| // Unwrap the Arc — if we're the only owner, take it directly; otherwise clone. | ||
| Ok(Arc::try_unwrap(arc_batch).unwrap_or_else(|arc: Arc<RecordBatch>| (*arc).clone())) |
There was a problem hiding this comment.
The Arc::try_unwrap(...).unwrap_or_else(...) logic is equivalent to Arc::unwrap_or_clone(arc_batch) on the current MSRV (1.85), which is simpler and avoids the extra closure/type annotation noise.
| Ok(Arc::try_unwrap(arc_batch).unwrap_or_else(|arc: Arc<RecordBatch>| (*arc).clone())) | |
| Ok(Arc::unwrap_or_clone(arc_batch)) |
|
Hi @luoyuxia @fresh-borzoni addressed the copilot review, PTAL! |
fresh-borzoni
left a comment
There was a problem hiding this comment.
@Prajwal-banakar Ty for the PR, left some comments
PTAL
| use crate::client::table::partition_getter::PartitionGetter; | ||
| use crate::error::{Error, Result}; | ||
| use crate::metadata::{PhysicalTablePath, RowType, TableBucket, TableInfo, TablePath}; | ||
| use crate::record::ArrowRecordBatchInnerBuilder; |
There was a problem hiding this comment.
Do we use it?
If we use trait import just to make trait method to work, it might be a better idea to expose a helper method in RowAppendRecordBatchBuilder
There was a problem hiding this comment.
This import is still required. In Rust, trait methods can only be called if the trait is in scope. RowAppendRecordBatchBuilder implements ArrowRecordBatchInnerBuilder, so without this import, .append() and .build_arrow_record_batch() are not callable. Removing it breaks the build.
There was a problem hiding this comment.
Yes, I understand the trait needs to be in scope, my concern is that ArrowRecordBatchInnerBuilder is an internal trait leaking into this module.
Could you add inherent methods on RowAppendRecordBatchBuilder that delegate to the trait methods?
Then the trait stays internal and this module just calls builder.append() / builder.build_arrow_record_batch() without the import.
| /// - `Ok(RecordBatch)` - All rows in columnar Arrow format. Returns an empty | ||
| /// batch (with the correct schema) if the result set is empty. | ||
| /// - `Err(Error)` - If the conversion fails. | ||
| pub fn to_record_batch(&self) -> Result<RecordBatch> { |
There was a problem hiding this comment.
it's a new user facing API, shall we add tests and docs?
| let mut builder = RowAppendRecordBatchBuilder::new(&self.row_type)?; | ||
|
|
||
| for bytes in &self.rows { | ||
| let payload = bytes.get(SCHEMA_ID_LENGTH..).ok_or_else(|| { |
There was a problem hiding this comment.
do you mind to also change get_single_row() and get_rows() to the same pattern?
or it's inconsistent with this change
| /// - `Ok(RecordBatch)` - All rows in columnar Arrow format. Returns an empty | ||
| /// batch (with the correct schema) if the result set is empty. | ||
| /// - `Err(Error)` - If the conversion fails. | ||
| pub fn to_record_batch(&self) -> Result<RecordBatch> { |
There was a problem hiding this comment.
Could you clarify what you'd like here? The core to_record_batch() method is available on LookupResult. For Python, we could add a lookup_arrow() method on Lookuper that returns a PyArrow RecordBatch — but I wanted to confirm the expected API shape before adding it.
There was a problem hiding this comment.
let's leave the bindings out of scope for now
probably we wish to have just to_arrow on lookup result, but better in followup PR
| builder.append(&row)?; | ||
| } | ||
|
|
||
| let arc_batch = builder.build_arrow_record_batch()?; |
There was a problem hiding this comment.
builder.build_arrow_record_batch().map(Arc::unwrap_or_clone)
Purpose
Linked issue: close #378
The purpose of this change is to expose lookup results in Arrow format by adding a
to_record_batch()method toLookupResult. This enables integration with DataFusion and other Arrow-based tools.Brief change log
to_record_batch()method toLookupResultincrates/fluss/src/client/table/lookup.rsRowAppendRecordBatchBuilderto append each row and build an ArrowRecordBatchRecordBatchwith the correct schema if the result set is emptyappend()andbuild_arrow_record_batch()methods onRowAppendRecordBatchBuilderto avoid leaking the internalArrowRecordBatchInnerBuildertrait into the lookup moduleget_single_row()andget_rows()to use panic-safe slice access (bytes.get(SCHEMA_ID_LENGTH..)) for consistencyget_rows()return type fromVec<CompactedRow>toResult<Vec<CompactedRow>>for consistency with the new patternTests
to_record_batch(): empty result, valid row, and too-short payload error casecargo fmtandcargo clippypass cleanlyAPI and Format
LookupResult::to_record_batch() -> Result<RecordBatch>added to the client APIget_rows()return type changed fromVec<CompactedRow>toResult<Vec<CompactedRow>>Documentation
LookupResultsection inwebsite/docs/user-guide/rust/api-reference.mdto_record_batch()usage example inwebsite/docs/user-guide/rust/example/primary-key-tables.md