feat: Add array data type support#433
Conversation
| let (val, next) = reader.read_long(cursor); | ||
| let decimal = Decimal::from_unscaled_long(val, precision, scale) | ||
| .expect("Failed to create decimal from unscaled long"); | ||
| let decimal = |
There was a problem hiding this comment.
i think we should convert all expect to typed errors so malformed array payload handling does not leave panic backdoor
| // silently widen key semantics. | ||
| if matches!( | ||
| field_type, | ||
| DataType::Array(_) | DataType::Map(_) | DataType::Row(_) |
There was a problem hiding this comment.
this applies to all array/map/row, so i understand this PR only adds Array, but i think better to add all of them.
if there is objection, i can remove
|
@charlesdong1991 Thanks for the great pr. I'll find some time to review |
There was a problem hiding this comment.
Pull request overview
Adds end-to-end ARRAY column support to the Fluss Rust client, including a Java-compatible binary array representation and integration across row encoders/decoders, Arrow interop, and public APIs.
Changes:
- Introduces
FlussArray/FlussArrayWriter(BinaryArray layout) and wires it intoDatum,InternalRow, and runtime field/value writers. - Adds compacted row read/write support for arrays (length-prefixed BinaryArray bytes) and rejects
ARRAYas a key column type. - Extends Arrow conversion logic and updates user/docs + tests for primitive, nullable, empty, and nested arrays.
Reviewed changes
Copilot reviewed 15 out of 15 changed files in this pull request and generated 6 comments.
Show a summary per file
| File | Description |
|---|---|
| website/docs/user-guide/rust/data-types.md | Documents ARRAY<T> type mapping and how to construct/read arrays. |
| website/docs/user-guide/rust/api-reference.md | Adds InternalRow::get_array and documents FlussArray API. |
| crates/fluss/src/row/mod.rs | Exposes binary_array module and re-exports FlussArray; adds InternalRow::get_array. |
| crates/fluss/src/row/field_getter.rs | Adds Array field getter + tests. |
| crates/fluss/src/row/encode/compacted_key_encoder.rs | Adds test ensuring array types are rejected in key encoding. |
| crates/fluss/src/row/datum.rs | Adds Datum::Array and Arrow ListBuilder append/conversion logic. |
| crates/fluss/src/row/compacted/compacted_row_writer.rs | Adds write_array (delegates to length-prefixed bytes). |
| crates/fluss/src/row/compacted/compacted_row_reader.rs | Makes deserialization fallible and adds array decoding via FlussArray::from_bytes. |
| crates/fluss/src/row/compacted/compacted_row.rs | Propagates fallible deserialization and adds InternalRow::get_array + array tests. |
| crates/fluss/src/row/compacted/compacted_key_writer.rs | Explicitly rejects complex key types and adds write_array to the delegated writer surface. |
| crates/fluss/src/row/column.rs | Implements ColumnarRow::get_array by converting Arrow ListArray → FlussArray; adds tests. |
| crates/fluss/src/row/binary_array.rs | New Java-compatible binary array implementation + writer + tests. |
| crates/fluss/src/row/binary/binary_writer.rs | Adds BinaryWriter::write_array and InnerValueWriter::Array. |
| crates/fluss/src/record/arrow.rs | Adds Arrow List builder support and from_arrow_type mapping for list element conversion. |
| bindings/cpp/src/types.rs | Propagates Datum::Array handling in type resolution and row ownership conversion. |
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
There was a problem hiding this comment.
@charlesdong1991 LGTM overall, left minor comments
If you fixed smth in the meantime - good, sorry for the noise, as I started reviewing before the new code :)
|
Hi @fresh-borzoni thanks a lot! |
leekeiabstraction
left a comment
There was a problem hiding this comment.
Thank you for the PR! Left some comments. PTAL
| let inner_builder = Self::create_builder(field.data_type(), capacity)?; | ||
| Ok(Box::new(ListBuilder::with_capacity(inner_builder, capacity))) |
There was a problem hiding this comment.
This reads like we are over provisioning here with capacity^2?
There was a problem hiding this comment.
it's capacity + capacity, bc it's offset buffer and values buffer, it's fine, though we use lower bound of each list - 1 element, which is fine for now I guess
|
|
||
| /// Converts an Arrow data type back to a Fluss `DataType`. | ||
| /// Used for reading array elements from Arrow ListArray back into Fluss types. | ||
| pub fn from_arrow_type(arrow_type: &ArrowDataType) -> Result<DataType> { |
There was a problem hiding this comment.
Should this be pub (crate)?
Also, have you check if there is an already existing function that does something similar? (Reads familiar but I do not have access to codebase right now)
There was a problem hiding this comment.
+1 pub (crate), but I don't think we have similar function
| // TIMESTAMP_LTZ(1): 1698235273182 | ||
| 0xDE, 0x9F, 0xD7, 0xB5, 0xB6, 0x31, | ||
| // TIMESTAMP_LTZ(5): 1698235273182 | ||
| 0xDE, 0x9F, 0xD7, 0xB5, 0xB6, 0x31, 0x00, |
There was a problem hiding this comment.
I vaguely remember that Java side does encode array.
See this PR which was created to give us a way to check that compacted key encoding is consistent across both Java and Rust
There was a problem hiding this comment.
+ 1 it seems that server rejects arrays as primary key, but encoding is allowed
fresh-borzoni
left a comment
There was a problem hiding this comment.
@charlesdong1991 TY, looked through again, left comments
PTAL
| data.truncate(self.cursor); | ||
| FlussArray::from_bytes(&data) | ||
| } | ||
|
|
There was a problem hiding this comment.
we already own data Vec, so from_bytes would unnecessary copy
| ), | ||
| }); | ||
| } | ||
| let raw_size = i32::from_ne_bytes(data[0..4].try_into().unwrap()); |
|
|
||
| pub fn get_short(&self, pos: usize) -> i16 { | ||
| let offset = self.element_offset(pos, 2); | ||
| i16::from_ne_bytes(self.data[offset..offset + 2].try_into().unwrap()) |
There was a problem hiding this comment.
ditto and all other cases in this file
| // TIMESTAMP_LTZ(1): 1698235273182 | ||
| 0xDE, 0x9F, 0xD7, 0xB5, 0xB6, 0x31, | ||
| // TIMESTAMP_LTZ(5): 1698235273182 | ||
| 0xDE, 0x9F, 0xD7, 0xB5, 0xB6, 0x31, 0x00, |
There was a problem hiding this comment.
+ 1 it seems that server rejects arrays as primary key, but encoding is allowed
| writer.set_null_at(i); | ||
| } else { | ||
| let nested_values = list_arr.value(i); | ||
| let nested_element_type = from_arrow_type(nested_values.data_type())?; |
There was a problem hiding this comment.
we allocate DataType in a loop, it's wasteful. Shall we hoist it?
| (offset, size) | ||
| } | ||
|
|
||
| pub fn get_string(&self, pos: usize) -> Result<&str> { |
There was a problem hiding this comment.
why do we return Result in half of the getters and half of the methods go unchecked?
I doubt that Java is the same half-way, pls, check
|
|
||
| /// Converts an Arrow data type back to a Fluss `DataType`. | ||
| /// Used for reading array elements from Arrow ListArray back into Fluss types. | ||
| pub fn from_arrow_type(arrow_type: &ArrowDataType) -> Result<DataType> { |
There was a problem hiding this comment.
+1 pub (crate), but I don't think we have similar function
|
|
||
| fn get_array(&self, pos: usize) -> Result<FlussArray> { | ||
| match self.get_value(pos)? { | ||
| Datum::Array(a) => Ok(a.clone()), |
There was a problem hiding this comment.
it clones on everY call, mb we wish to use Bytes to zero-copy?
| let inner_builder = Self::create_builder(field.data_type(), capacity)?; | ||
| Ok(Box::new(ListBuilder::with_capacity(inner_builder, capacity))) |
There was a problem hiding this comment.
it's capacity + capacity, bc it's offset buffer and values buffer, it's fine, though we use lower bound of each list - 1 element, which is fine for now I guess
Purpose
Linked issue: close #386
Brief change log
Add end-to-end ARRAY column support in fluss-rust
Tests
Yes all tests are passed locally