-
Notifications
You must be signed in to change notification settings - Fork 35
[client] Add LookupResult::to_record_batch() #411
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -20,13 +20,15 @@ use crate::client::metadata::Metadata; | |
| use crate::client::table::partition_getter::PartitionGetter; | ||
| use crate::error::{Error, Result}; | ||
| use crate::metadata::{PhysicalTablePath, RowType, TableBucket, TableInfo, TablePath}; | ||
| use crate::record::RowAppendRecordBatchBuilder; | ||
| use crate::record::kv::SCHEMA_ID_LENGTH; | ||
| use crate::row::InternalRow; | ||
| use crate::row::compacted::CompactedRow; | ||
| use crate::row::encode::{KeyEncoder, KeyEncoderFactory}; | ||
| use crate::rpc::ApiError; | ||
| use crate::rpc::RpcClient; | ||
| use crate::rpc::message::LookupRequest; | ||
| use arrow::array::RecordBatch; | ||
| use std::sync::Arc; | ||
|
|
||
| /// The result of a lookup operation. | ||
|
|
@@ -59,32 +61,80 @@ impl LookupResult { | |
| /// `CompactedRow` borrows from this result set and cannot outlive it. | ||
| /// | ||
| /// # Returns | ||
| /// - `Ok(Some(row))`: If exactly one row exists. | ||
| /// - `Ok(None)`: If the result set is empty. | ||
| /// - `Err(Error::UnexpectedError)`: If the result set contains more than one row. | ||
| /// - `Ok(rows)` - All rows in the result set. | ||
| /// - `Err(Error)` - If any row payload is too short to contain a schema id. | ||
| /// | ||
| pub fn get_single_row(&self) -> Result<Option<CompactedRow<'_>>> { | ||
| match self.rows.len() { | ||
| 0 => Ok(None), | ||
| 1 => Ok(Some(CompactedRow::from_bytes( | ||
| &self.row_type, | ||
| &self.rows[0][SCHEMA_ID_LENGTH..], | ||
| ))), | ||
| 1 => { | ||
| let payload = | ||
| self.rows[0] | ||
| .get(SCHEMA_ID_LENGTH..) | ||
| .ok_or_else(|| Error::UnexpectedError { | ||
| message: format!( | ||
| "Row payload too short: {} bytes, need at least {} for schema id", | ||
| self.rows[0].len(), | ||
| SCHEMA_ID_LENGTH | ||
| ), | ||
| source: None, | ||
| })?; | ||
| Ok(Some(CompactedRow::from_bytes(&self.row_type, payload))) | ||
| } | ||
| _ => Err(Error::UnexpectedError { | ||
| message: "LookupResult contains multiple rows, use get_rows() instead".to_string(), | ||
| source: None, | ||
| }), | ||
| } | ||
| } | ||
|
|
||
| /// Returns all rows as CompactedRows. | ||
| pub fn get_rows(&self) -> Vec<CompactedRow<'_>> { | ||
| pub fn get_rows(&self) -> Result<Vec<CompactedRow<'_>>> { | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. we change public API, shall we update docs? |
||
| self.rows | ||
| .iter() | ||
| // TODO Add schema id check and fetch when implementing prefix lookup | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. this comment should stay |
||
| .map(|bytes| CompactedRow::from_bytes(&self.row_type, &bytes[SCHEMA_ID_LENGTH..])) | ||
| .map(|bytes| { | ||
| let payload = | ||
| bytes | ||
| .get(SCHEMA_ID_LENGTH..) | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. schema checks are duplicates, let's move to helper, wdyt? |
||
| .ok_or_else(|| Error::UnexpectedError { | ||
| message: format!( | ||
| "Row payload too short: {} bytes, need at least {} for schema id", | ||
| bytes.len(), | ||
| SCHEMA_ID_LENGTH | ||
| ), | ||
| source: None, | ||
| })?; | ||
| Ok(CompactedRow::from_bytes(&self.row_type, payload)) | ||
| }) | ||
| .collect() | ||
| } | ||
| /// Converts all rows in this result into an Arrow [`RecordBatch`]. | ||
| /// | ||
| /// This is useful for integration with DataFusion or other Arrow-based tools. | ||
| /// | ||
| /// # Returns | ||
| /// - `Ok(RecordBatch)` - All rows in columnar Arrow format. Returns an empty | ||
| /// batch (with the correct schema) if the result set is empty. | ||
| /// - `Err(Error)` - If the conversion fails. | ||
| pub fn to_record_batch(&self) -> Result<RecordBatch> { | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. what about bindings?
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Could you clarify what you'd like here? The core to_record_batch() method is available on LookupResult. For Python, we could add a lookup_arrow() method on Lookuper that returns a PyArrow RecordBatch — but I wanted to confirm the expected API shape before adding it.
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. let's leave the bindings out of scope for now |
||
| let mut builder = RowAppendRecordBatchBuilder::new(&self.row_type)?; | ||
|
|
||
| for bytes in &self.rows { | ||
| let payload = bytes.get(SCHEMA_ID_LENGTH..).ok_or_else(|| { | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. do you mind to also change |
||
| Error::RowConvertError { | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. different error types for the same schema check - let's use RowConvertError then in other places |
||
| message: format!( | ||
| "LookupResult row payload too short: {} bytes, need at least {} bytes for schema id", | ||
| bytes.len(), | ||
| SCHEMA_ID_LENGTH | ||
| ), | ||
| } | ||
| })?; | ||
|
|
||
| let row = CompactedRow::from_bytes(&self.row_type, payload); | ||
| builder.append(&row)?; | ||
| } | ||
|
|
||
| builder.build_arrow_record_batch().map(Arc::unwrap_or_clone) | ||
| } | ||
| } | ||
|
|
||
| /// Configuration and factory struct for creating lookup operations. | ||
|
|
@@ -306,3 +356,68 @@ impl Lookuper { | |
| &self.table_info | ||
| } | ||
| } | ||
|
|
||
| #[cfg(test)] | ||
| mod tests { | ||
| use super::*; | ||
| use crate::metadata::{DataField, DataTypes}; | ||
| use crate::row::binary::BinaryWriter; | ||
| use crate::row::compacted::CompactedRowWriter; | ||
| use arrow::array::Int32Array; | ||
|
|
||
| fn make_row_bytes(schema_id: i16, row_data: &[u8]) -> Vec<u8> { | ||
| let mut bytes = Vec::with_capacity(SCHEMA_ID_LENGTH + row_data.len()); | ||
| bytes.extend_from_slice(&schema_id.to_le_bytes()); | ||
| bytes.extend_from_slice(row_data); | ||
| bytes | ||
| } | ||
|
|
||
| #[test] | ||
| fn test_to_record_batch_empty() { | ||
| let row_type = Arc::new(RowType::new(vec![DataField::new( | ||
| "id", | ||
| DataTypes::int(), | ||
| None, | ||
| )])); | ||
| let result = LookupResult::empty(row_type); | ||
| let batch = result.to_record_batch().unwrap(); | ||
| assert_eq!(batch.num_rows(), 0); | ||
| assert_eq!(batch.num_columns(), 1); | ||
| } | ||
|
|
||
| #[test] | ||
| fn test_to_record_batch_with_row() { | ||
| let row_type = Arc::new(RowType::new(vec![DataField::new( | ||
| "id", | ||
| DataTypes::int(), | ||
| None, | ||
| )])); | ||
|
|
||
| let mut writer = CompactedRowWriter::new(1); | ||
| writer.write_int(42); | ||
| let row_bytes = make_row_bytes(0, writer.buffer()); | ||
|
|
||
| let result = LookupResult::new(vec![row_bytes], Arc::clone(&row_type)); | ||
| let batch = result.to_record_batch().unwrap(); | ||
|
|
||
| assert_eq!(batch.num_rows(), 1); | ||
| let col = batch | ||
| .column(0) | ||
| .as_any() | ||
| .downcast_ref::<Int32Array>() | ||
| .unwrap(); | ||
| assert_eq!(col.value(0), 42); | ||
| } | ||
|
|
||
| #[test] | ||
| fn test_to_record_batch_payload_too_short() { | ||
| let row_type = Arc::new(RowType::new(vec![DataField::new( | ||
| "id", | ||
| DataTypes::int(), | ||
| None, | ||
| )])); | ||
| // Only 1 byte — shorter than SCHEMA_ID_LENGTH (2) | ||
| let result = LookupResult::new(vec![vec![0u8]], Arc::clone(&row_type)); | ||
| assert!(result.to_record_batch().is_err()); | ||
| } | ||
| } | ||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
why did we remove this?