Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions bindings/cpp/src/types.rs
Original file line number Diff line number Diff line change
Expand Up @@ -351,6 +351,9 @@ pub fn resolve_row_types(
Datum::Time(t) => Datum::Time(*t),
Datum::TimestampNtz(ts) => Datum::TimestampNtz(*ts),
Datum::TimestampLtz(ts) => Datum::TimestampLtz(*ts),
// TODO: C++ bindings need proper CXX wrapper types for FlussArray
// before C++ users can construct or inspect array values through FFI.
Datum::Array(a) => Datum::Array(a.clone()),
};
out.set_field(idx, resolved);
}
Expand Down Expand Up @@ -408,6 +411,9 @@ pub fn compacted_row_to_owned(
fcore::metadata::DataType::Binary(dt) => {
Datum::Blob(Cow::Owned(row.get_binary(i, dt.length())?.to_vec()))
}
// TODO: C++ bindings need proper CXX wrapper types for FlussArray
// before C++ users can construct or inspect array values through FFI.
fcore::metadata::DataType::Array(_) => Datum::Array(row.get_array(i)?),
other => return Err(anyhow!("Unsupported data type for column {i}: {other:?}")),
};

Expand Down
80 changes: 76 additions & 4 deletions crates/fluss/src/record/arrow.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,10 +25,10 @@ use crate::row::{ColumnarRow, InternalRow};
use arrow::array::{
ArrayBuilder, ArrayRef, BinaryBuilder, BooleanBuilder, Date32Builder, Decimal128Builder,
FixedSizeBinaryBuilder, Float32Builder, Float64Builder, Int8Builder, Int16Builder,
Int32Builder, Int64Builder, StringBuilder, Time32MillisecondBuilder, Time32SecondBuilder,
Time64MicrosecondBuilder, Time64NanosecondBuilder, TimestampMicrosecondBuilder,
TimestampMillisecondBuilder, TimestampNanosecondBuilder, TimestampSecondBuilder, UInt8Builder,
UInt16Builder, UInt32Builder, UInt64Builder,
Int32Builder, Int64Builder, ListBuilder, StringBuilder, Time32MillisecondBuilder,
Time32SecondBuilder, Time64MicrosecondBuilder, Time64NanosecondBuilder,
TimestampMicrosecondBuilder, TimestampMillisecondBuilder, TimestampNanosecondBuilder,
TimestampSecondBuilder, UInt8Builder, UInt16Builder, UInt32Builder, UInt64Builder,
};
use arrow::{
array::RecordBatch,
Expand Down Expand Up @@ -330,6 +330,13 @@ impl RowAppendRecordBatchBuilder {
arrow_schema::DataType::Timestamp(arrow_schema::TimeUnit::Second, _) => {
Ok(Box::new(TimestampSecondBuilder::with_capacity(capacity)))
}
arrow_schema::DataType::List(field) => {
let inner_builder = Self::create_builder(field.data_type(), capacity)?;
Ok(Box::new(ListBuilder::with_capacity(
inner_builder,
capacity,
)))
}
arrow_schema::DataType::Timestamp(arrow_schema::TimeUnit::Millisecond, _) => Ok(
Box::new(TimestampMillisecondBuilder::with_capacity(capacity)),
),
Expand Down Expand Up @@ -1184,6 +1191,71 @@ pub fn to_arrow_type(fluss_type: &DataType) -> Result<ArrowDataType> {
})
}

/// Converts an Arrow data type back to a Fluss `DataType`.
/// Used for reading array elements from Arrow ListArray back into Fluss types.
pub(crate) fn from_arrow_type(arrow_type: &ArrowDataType) -> Result<DataType> {
use crate::metadata::DataTypes;

Ok(match arrow_type {
ArrowDataType::Boolean => DataTypes::boolean(),
ArrowDataType::Int8 => DataTypes::tinyint(),
ArrowDataType::Int16 => DataTypes::smallint(),
ArrowDataType::Int32 => DataTypes::int(),
ArrowDataType::Int64 => DataTypes::bigint(),
ArrowDataType::Float32 => DataTypes::float(),
ArrowDataType::Float64 => DataTypes::double(),
ArrowDataType::Utf8 => DataTypes::string(),
ArrowDataType::Binary => DataTypes::bytes(),
ArrowDataType::Date32 => DataTypes::date(),
ArrowDataType::FixedSizeBinary(len) => {
if *len < 0 {
return Err(Error::IllegalArgument {
message: format!("FixedSizeBinary length must be >= 0, got {len}"),
});
}
DataTypes::binary(*len as usize)
}
ArrowDataType::Decimal128(p, s) => {
if *s < 0 {
return Err(Error::IllegalArgument {
message: format!("Decimal scale must be >= 0, got {s}"),
});
}
DataTypes::decimal(*p as u32, *s as u32)
}
ArrowDataType::Time32(arrow_schema::TimeUnit::Second) => DataTypes::time_with_precision(0),
ArrowDataType::Time32(arrow_schema::TimeUnit::Millisecond) => {
DataTypes::time_with_precision(3)
}
ArrowDataType::Time64(arrow_schema::TimeUnit::Microsecond) => {
DataTypes::time_with_precision(6)
}
ArrowDataType::Time64(arrow_schema::TimeUnit::Nanosecond) => {
DataTypes::time_with_precision(9)
}
ArrowDataType::Timestamp(unit, tz) => {
let precision = match unit {
arrow_schema::TimeUnit::Second => 0,
arrow_schema::TimeUnit::Millisecond => 3,
arrow_schema::TimeUnit::Microsecond => 6,
arrow_schema::TimeUnit::Nanosecond => 9,
};

if tz.is_some() {
DataTypes::timestamp_ltz_with_precision(precision)
} else {
DataTypes::timestamp_with_precision(precision)
}
}
ArrowDataType::List(field) => DataTypes::array(from_arrow_type(field.data_type())?),
other => {
return Err(Error::IllegalArgument {
message: format!("Cannot convert Arrow type to Fluss type: {other:?}"),
});
}
})
}

#[derive(Clone)]
pub struct ReadContext {
target_schema: SchemaRef,
Expand Down
10 changes: 7 additions & 3 deletions crates/fluss/src/row/binary/binary_writer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -67,8 +67,7 @@ pub trait BinaryWriter {

fn write_timestamp_ltz(&mut self, value: &crate::row::datum::TimestampLtz, precision: u32);

// TODO InternalArray, ArraySerializer
// fn write_array(&mut self, pos: i32, value: i64);
fn write_array(&mut self, value: &[u8]);

// TODO Row serializer
// fn write_row(&mut self, pos: i32, value: &InternalRow);
Expand Down Expand Up @@ -136,7 +135,8 @@ pub enum InnerValueWriter {
Time(u32), // precision (not used in wire format, but kept for consistency)
TimestampNtz(u32), // precision
TimestampLtz(u32), // precision
// TODO Array, Row
Array,
// TODO Row
}

/// Accessor for writing the fields/elements of a binary writer during runtime, the
Expand Down Expand Up @@ -175,6 +175,7 @@ impl InnerValueWriter {
// Validation is done at TimestampLTzType construction time
Ok(InnerValueWriter::TimestampLtz(t.precision()))
}
DataType::Array(_) => Ok(InnerValueWriter::Array),
_ => unimplemented!(
"ValueWriter for DataType {:?} is currently not implemented",
data_type
Expand Down Expand Up @@ -237,6 +238,9 @@ impl InnerValueWriter {
(InnerValueWriter::TimestampLtz(p), Datum::TimestampLtz(ts)) => {
writer.write_timestamp_ltz(ts, *p);
}
(InnerValueWriter::Array, Datum::Array(arr)) => {
writer.write_array(arr.as_bytes());
}
_ => {
return Err(IllegalArgument {
message: format!("{self:?} used to write value {value:?}"),
Expand Down
Loading