Skip to content

feat: Add array data type support#433

Open
charlesdong1991 wants to merge 6 commits intoapache:mainfrom
charlesdong1991:issue-386
Open

feat: Add array data type support#433
charlesdong1991 wants to merge 6 commits intoapache:mainfrom
charlesdong1991:issue-386

Conversation

@charlesdong1991
Copy link
Contributor

Purpose

Linked issue: close #386

Brief change log

Add end-to-end ARRAY column support in fluss-rust

Tests

Yes all tests are passed locally

let (val, next) = reader.read_long(cursor);
let decimal = Decimal::from_unscaled_long(val, precision, scale)
.expect("Failed to create decimal from unscaled long");
let decimal =
Copy link
Contributor Author

@charlesdong1991 charlesdong1991 Mar 7, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

i think we should convert all expect to typed errors so malformed array payload handling does not leave panic backdoor

// silently widen key semantics.
if matches!(
field_type,
DataType::Array(_) | DataType::Map(_) | DataType::Row(_)
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this applies to all array/map/row, so i understand this PR only adds Array, but i think better to add all of them.
if there is objection, i can remove

@luoyuxia
Copy link
Contributor

luoyuxia commented Mar 8, 2026

@charlesdong1991 Thanks for the great pr. I'll find some time to review

Copy link

Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull request overview

Adds end-to-end ARRAY column support to the Fluss Rust client, including a Java-compatible binary array representation and integration across row encoders/decoders, Arrow interop, and public APIs.

Changes:

  • Introduces FlussArray / FlussArrayWriter (BinaryArray layout) and wires it into Datum, InternalRow, and runtime field/value writers.
  • Adds compacted row read/write support for arrays (length-prefixed BinaryArray bytes) and rejects ARRAY as a key column type.
  • Extends Arrow conversion logic and updates user/docs + tests for primitive, nullable, empty, and nested arrays.

Reviewed changes

Copilot reviewed 15 out of 15 changed files in this pull request and generated 6 comments.

Show a summary per file
File Description
website/docs/user-guide/rust/data-types.md Documents ARRAY<T> type mapping and how to construct/read arrays.
website/docs/user-guide/rust/api-reference.md Adds InternalRow::get_array and documents FlussArray API.
crates/fluss/src/row/mod.rs Exposes binary_array module and re-exports FlussArray; adds InternalRow::get_array.
crates/fluss/src/row/field_getter.rs Adds Array field getter + tests.
crates/fluss/src/row/encode/compacted_key_encoder.rs Adds test ensuring array types are rejected in key encoding.
crates/fluss/src/row/datum.rs Adds Datum::Array and Arrow ListBuilder append/conversion logic.
crates/fluss/src/row/compacted/compacted_row_writer.rs Adds write_array (delegates to length-prefixed bytes).
crates/fluss/src/row/compacted/compacted_row_reader.rs Makes deserialization fallible and adds array decoding via FlussArray::from_bytes.
crates/fluss/src/row/compacted/compacted_row.rs Propagates fallible deserialization and adds InternalRow::get_array + array tests.
crates/fluss/src/row/compacted/compacted_key_writer.rs Explicitly rejects complex key types and adds write_array to the delegated writer surface.
crates/fluss/src/row/column.rs Implements ColumnarRow::get_array by converting Arrow ListArrayFlussArray; adds tests.
crates/fluss/src/row/binary_array.rs New Java-compatible binary array implementation + writer + tests.
crates/fluss/src/row/binary/binary_writer.rs Adds BinaryWriter::write_array and InnerValueWriter::Array.
crates/fluss/src/record/arrow.rs Adds Arrow List builder support and from_arrow_type mapping for list element conversion.
bindings/cpp/src/types.rs Propagates Datum::Array handling in type resolution and row ownership conversion.

💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.

Copy link
Contributor

@fresh-borzoni fresh-borzoni left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@charlesdong1991 LGTM overall, left minor comments
If you fixed smth in the meantime - good, sorry for the noise, as I started reviewing before the new code :)

@charlesdong1991
Copy link
Contributor Author

Hi @fresh-borzoni thanks a lot!
It would be great if you can give another round of reviews since i made some changes during your review ^^

Copy link
Contributor

@leekeiabstraction leekeiabstraction left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thank you for the PR! Left some comments. PTAL

Comment on lines +334 to +335
let inner_builder = Self::create_builder(field.data_type(), capacity)?;
Ok(Box::new(ListBuilder::with_capacity(inner_builder, capacity)))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This reads like we are over provisioning here with capacity^2?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

it's capacity + capacity, bc it's offset buffer and values buffer, it's fine, though we use lower bound of each list - 1 element, which is fine for now I guess


/// Converts an Arrow data type back to a Fluss `DataType`.
/// Used for reading array elements from Arrow ListArray back into Fluss types.
pub fn from_arrow_type(arrow_type: &ArrowDataType) -> Result<DataType> {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should this be pub (crate)?

Also, have you check if there is an already existing function that does something similar? (Reads familiar but I do not have access to codebase right now)

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1 pub (crate), but I don't think we have similar function

// TIMESTAMP_LTZ(1): 1698235273182
0xDE, 0x9F, 0xD7, 0xB5, 0xB6, 0x31,
// TIMESTAMP_LTZ(5): 1698235273182
0xDE, 0x9F, 0xD7, 0xB5, 0xB6, 0x31, 0x00,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I vaguely remember that Java side does encode array.

See this PR which was created to give us a way to check that compacted key encoding is consistent across both Java and Rust

https://github.com/apache/fluss/pull/2312/files

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+ 1 it seems that server rejects arrays as primary key, but encoding is allowed

Copy link
Contributor

@fresh-borzoni fresh-borzoni left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@charlesdong1991 TY, looked through again, left comments
PTAL

data.truncate(self.cursor);
FlussArray::from_bytes(&data)
}

Copy link
Contributor

@fresh-borzoni fresh-borzoni Mar 8, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we already own data Vec, so from_bytes would unnecessary copy

),
});
}
let raw_size = i32::from_ne_bytes(data[0..4].try_into().unwrap());
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think Java uses LE


pub fn get_short(&self, pos: usize) -> i16 {
let offset = self.element_offset(pos, 2);
i16::from_ne_bytes(self.data[offset..offset + 2].try_into().unwrap())
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ditto and all other cases in this file

// TIMESTAMP_LTZ(1): 1698235273182
0xDE, 0x9F, 0xD7, 0xB5, 0xB6, 0x31,
// TIMESTAMP_LTZ(5): 1698235273182
0xDE, 0x9F, 0xD7, 0xB5, 0xB6, 0x31, 0x00,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+ 1 it seems that server rejects arrays as primary key, but encoding is allowed

writer.set_null_at(i);
} else {
let nested_values = list_arr.value(i);
let nested_element_type = from_arrow_type(nested_values.data_type())?;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we allocate DataType in a loop, it's wasteful. Shall we hoist it?

(offset, size)
}

pub fn get_string(&self, pos: usize) -> Result<&str> {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why do we return Result in half of the getters and half of the methods go unchecked?
I doubt that Java is the same half-way, pls, check


/// Converts an Arrow data type back to a Fluss `DataType`.
/// Used for reading array elements from Arrow ListArray back into Fluss types.
pub fn from_arrow_type(arrow_type: &ArrowDataType) -> Result<DataType> {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1 pub (crate), but I don't think we have similar function


fn get_array(&self, pos: usize) -> Result<FlussArray> {
match self.get_value(pos)? {
Datum::Array(a) => Ok(a.clone()),
Copy link
Contributor

@fresh-borzoni fresh-borzoni Mar 9, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

it clones on everY call, mb we wish to use Bytes to zero-copy?

Comment on lines +334 to +335
let inner_builder = Self::create_builder(field.data_type(), capacity)?;
Ok(Box::new(ListBuilder::with_capacity(inner_builder, capacity)))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

it's capacity + capacity, bc it's offset buffer and values buffer, it's fine, though we use lower bound of each list - 1 element, which is fine for now I guess

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

Array data type support in Rust

5 participants