Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
137 changes: 126 additions & 11 deletions crates/fluss/src/client/table/lookup.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,13 +20,15 @@ use crate::client::metadata::Metadata;
use crate::client::table::partition_getter::PartitionGetter;
use crate::error::{Error, Result};
use crate::metadata::{PhysicalTablePath, RowType, TableBucket, TableInfo, TablePath};
use crate::record::RowAppendRecordBatchBuilder;
use crate::record::kv::SCHEMA_ID_LENGTH;
use crate::row::InternalRow;
use crate::row::compacted::CompactedRow;
use crate::row::encode::{KeyEncoder, KeyEncoderFactory};
use crate::rpc::ApiError;
use crate::rpc::RpcClient;
use crate::rpc::message::LookupRequest;
use arrow::array::RecordBatch;
use std::sync::Arc;

/// The result of a lookup operation.
Expand Down Expand Up @@ -59,32 +61,80 @@ impl LookupResult {
/// `CompactedRow` borrows from this result set and cannot outlive it.
///
/// # Returns
/// - `Ok(Some(row))`: If exactly one row exists.
/// - `Ok(None)`: If the result set is empty.
/// - `Err(Error::UnexpectedError)`: If the result set contains more than one row.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why did we remove this?

/// - `Ok(rows)` - All rows in the result set.
/// - `Err(Error)` - If any row payload is too short to contain a schema id.
///
pub fn get_single_row(&self) -> Result<Option<CompactedRow<'_>>> {
match self.rows.len() {
0 => Ok(None),
1 => Ok(Some(CompactedRow::from_bytes(
&self.row_type,
&self.rows[0][SCHEMA_ID_LENGTH..],
))),
1 => {
let payload =
self.rows[0]
.get(SCHEMA_ID_LENGTH..)
.ok_or_else(|| Error::UnexpectedError {
message: format!(
"Row payload too short: {} bytes, need at least {} for schema id",
self.rows[0].len(),
SCHEMA_ID_LENGTH
),
source: None,
})?;
Ok(Some(CompactedRow::from_bytes(&self.row_type, payload)))
}
_ => Err(Error::UnexpectedError {
message: "LookupResult contains multiple rows, use get_rows() instead".to_string(),
source: None,
}),
}
}

/// Returns all rows as CompactedRows.
pub fn get_rows(&self) -> Vec<CompactedRow<'_>> {
pub fn get_rows(&self) -> Result<Vec<CompactedRow<'_>>> {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we change public API, shall we update docs?
cc @luoyuxia also do we wish to make breaking change for this schema check or we trust server everywhere?

self.rows
.iter()
// TODO Add schema id check and fetch when implementing prefix lookup
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this comment should stay

.map(|bytes| CompactedRow::from_bytes(&self.row_type, &bytes[SCHEMA_ID_LENGTH..]))
.map(|bytes| {
let payload =
bytes
.get(SCHEMA_ID_LENGTH..)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

schema checks are duplicates, let's move to helper, wdyt?

.ok_or_else(|| Error::UnexpectedError {
message: format!(
"Row payload too short: {} bytes, need at least {} for schema id",
bytes.len(),
SCHEMA_ID_LENGTH
),
source: None,
})?;
Ok(CompactedRow::from_bytes(&self.row_type, payload))
})
.collect()
}
/// Converts all rows in this result into an Arrow [`RecordBatch`].
///
/// This is useful for integration with DataFusion or other Arrow-based tools.
///
/// # Returns
/// - `Ok(RecordBatch)` - All rows in columnar Arrow format. Returns an empty
/// batch (with the correct schema) if the result set is empty.
/// - `Err(Error)` - If the conversion fails.
pub fn to_record_batch(&self) -> Result<RecordBatch> {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

what about bindings?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could you clarify what you'd like here? The core to_record_batch() method is available on LookupResult. For Python, we could add a lookup_arrow() method on Lookuper that returns a PyArrow RecordBatch — but I wanted to confirm the expected API shape before adding it.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

let's leave the bindings out of scope for now
probably we wish to have just to_arrow on lookup result, but better in followup PR

let mut builder = RowAppendRecordBatchBuilder::new(&self.row_type)?;

for bytes in &self.rows {
let payload = bytes.get(SCHEMA_ID_LENGTH..).ok_or_else(|| {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

do you mind to also change get_single_row() and get_rows() to the same pattern?
or it's inconsistent with this change

Error::RowConvertError {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

different error types for the same schema check - let's use RowConvertError then in other places

message: format!(
"LookupResult row payload too short: {} bytes, need at least {} bytes for schema id",
bytes.len(),
SCHEMA_ID_LENGTH
),
}
})?;

let row = CompactedRow::from_bytes(&self.row_type, payload);
builder.append(&row)?;
}

builder.build_arrow_record_batch().map(Arc::unwrap_or_clone)
}
}

/// Configuration and factory struct for creating lookup operations.
Expand Down Expand Up @@ -306,3 +356,68 @@ impl Lookuper {
&self.table_info
}
}

#[cfg(test)]
mod tests {
use super::*;
use crate::metadata::{DataField, DataTypes};
use crate::row::binary::BinaryWriter;
use crate::row::compacted::CompactedRowWriter;
use arrow::array::Int32Array;

fn make_row_bytes(schema_id: i16, row_data: &[u8]) -> Vec<u8> {
let mut bytes = Vec::with_capacity(SCHEMA_ID_LENGTH + row_data.len());
bytes.extend_from_slice(&schema_id.to_le_bytes());
bytes.extend_from_slice(row_data);
bytes
}

#[test]
fn test_to_record_batch_empty() {
let row_type = Arc::new(RowType::new(vec![DataField::new(
"id",
DataTypes::int(),
None,
)]));
let result = LookupResult::empty(row_type);
let batch = result.to_record_batch().unwrap();
assert_eq!(batch.num_rows(), 0);
assert_eq!(batch.num_columns(), 1);
}

#[test]
fn test_to_record_batch_with_row() {
let row_type = Arc::new(RowType::new(vec![DataField::new(
"id",
DataTypes::int(),
None,
)]));

let mut writer = CompactedRowWriter::new(1);
writer.write_int(42);
let row_bytes = make_row_bytes(0, writer.buffer());

let result = LookupResult::new(vec![row_bytes], Arc::clone(&row_type));
let batch = result.to_record_batch().unwrap();

assert_eq!(batch.num_rows(), 1);
let col = batch
.column(0)
.as_any()
.downcast_ref::<Int32Array>()
.unwrap();
assert_eq!(col.value(0), 42);
}

#[test]
fn test_to_record_batch_payload_too_short() {
let row_type = Arc::new(RowType::new(vec![DataField::new(
"id",
DataTypes::int(),
None,
)]));
// Only 1 byte — shorter than SCHEMA_ID_LENGTH (2)
let result = LookupResult::new(vec![vec![0u8]], Arc::clone(&row_type));
assert!(result.to_record_batch().is_err());
}
}
9 changes: 9 additions & 0 deletions crates/fluss/src/record/arrow.rs
Original file line number Diff line number Diff line change
Expand Up @@ -319,6 +319,15 @@ impl RowAppendRecordBatchBuilder {
}),
}
}
/// Appends a row to the builder.
pub fn append(&mut self, row: &dyn InternalRow) -> Result<bool> {
ArrowRecordBatchInnerBuilder::append(self, row)
}

/// Builds the final Arrow RecordBatch.
pub fn build_arrow_record_batch(&mut self) -> Result<Arc<RecordBatch>> {
ArrowRecordBatchInnerBuilder::build_arrow_record_batch(self)
}
}

impl ArrowRecordBatchInnerBuilder for RowAppendRecordBatchBuilder {
Expand Down
1 change: 1 addition & 0 deletions website/docs/user-guide/rust/api-reference.md
Original file line number Diff line number Diff line change
Expand Up @@ -202,6 +202,7 @@ for record in records {
|----------------------------------------------------------------|----------------------------------|
| `fn get_single_row(&self) -> Result<Option<impl InternalRow>>` | Get a single row from the result |
| `fn get_rows(&self) -> Vec<impl InternalRow>` | Get all rows from the result |
| `fn to_record_batch(&self) -> Result<RecordBatch>` | Convert all rows to an Arrow `RecordBatch` for DataFusion or other Arrow-based tools |

## `WriteResultFuture`

Expand Down
8 changes: 8 additions & 0 deletions website/docs/user-guide/rust/example/primary-key-tables.md
Original file line number Diff line number Diff line change
Expand Up @@ -112,3 +112,11 @@ if let Some(row) = result.get_single_row()? {
println!("Record not found");
}
```
## Looking Up Records as Arrow RecordBatch

Use `to_record_batch()` to get lookup results in Arrow format, for example when integrating with DataFusion.
```rust
let result = lookuper.lookup(&key).await?;
let batch = result.to_record_batch()?;
println!("Rows: {}", batch.num_rows());
```
Loading