Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions crates/examples/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,10 @@ version = { workspace = true }
fluss = { workspace = true, features = ["storage-all"] }
tokio = { workspace = true }
clap = { workspace = true }

[target.'cfg(not(target_env = "msvc"))'.dependencies]
tikv-jemallocator = "0.6"

[[example]]
name = "example-table"
path = "src/example_table.rs"
Expand Down
4 changes: 4 additions & 0 deletions crates/examples/src/example_table.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,10 @@
// specific language governing permissions and limitations
// under the License.

#[cfg(not(target_env = "msvc"))]
#[global_allocator]
static GLOBAL: tikv_jemallocator::Jemalloc = tikv_jemallocator::Jemalloc;

mod example_kv_table;
mod example_partitioned_kv_table;

Expand Down
1 change: 0 additions & 1 deletion crates/fluss/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,5 @@ jiff = { workspace = true, features = ["js"] }
testcontainers = "0.25.0"
test-env-helpers = "0.2.2"


[build-dependencies]
prost-build = { version = "0.14" }
19 changes: 19 additions & 0 deletions crates/fluss/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -101,6 +101,25 @@
//! Ok(())
//! }
//! ```
//!
//! # Performance
//!
//! For production deployments on Linux, we recommend using
//! [jemalloc](https://crates.io/crates/tikv-jemallocator) as the global allocator.
//! The default glibc allocator (ptmalloc2) can cause RSS bloat and fragmentation under
//! sustained write loads due to repeated same-size alloc/free cycles in Arrow batch building.
//! jemalloc's thread-local size-class bins handle this pattern efficiently.
//!
//! ```toml
//! [target.'cfg(not(target_env = "msvc"))'.dependencies]
//! tikv-jemallocator = "0.6"
//! ```
//!
//! ```rust,ignore
//! #[cfg(not(target_env = "msvc"))]
//! #[global_allocator]
//! static GLOBAL: tikv_jemallocator::Jemalloc = tikv_jemallocator::Jemalloc;
//! ```

pub mod client;
pub mod metadata;
Expand Down
96 changes: 61 additions & 35 deletions crates/fluss/src/record/arrow.rs
Original file line number Diff line number Diff line change
Expand Up @@ -151,8 +151,13 @@ pub const NO_BATCH_SEQUENCE: i32 = -1;

pub const BUILDER_DEFAULT_OFFSET: i64 = 0;

// TODO: Switch to byte-size-based is_full() like Java's ArrowWriter instead of a hard record cap.
pub const DEFAULT_MAX_RECORD: i32 = 256;

/// Estimated average byte size for variable-width columns (Utf8, Binary).
/// Used to pre-allocate data buffers and avoid reallocations during batch building.
const VARIABLE_WIDTH_AVG_BYTES: usize = 64;

pub struct MemoryLogRecordsArrowBuilder {
base_log_offset: i64,
schema_id: i32,
Expand Down Expand Up @@ -236,11 +241,12 @@ pub struct RowAppendRecordBatchBuilder {

impl RowAppendRecordBatchBuilder {
pub fn new(row_type: &RowType) -> Result<Self> {
let capacity = DEFAULT_MAX_RECORD as usize;
let schema_ref = to_arrow_schema(row_type)?;
let builders: Result<Vec<_>> = schema_ref
.fields()
.iter()
.map(|field| Self::create_builder(field.data_type()))
.map(|field| Self::create_builder(field.data_type(), capacity))
.collect();
let field_getters = FieldGetter::create_field_getters(row_type);
Ok(Self {
Expand All @@ -251,26 +257,41 @@ impl RowAppendRecordBatchBuilder {
})
}

fn create_builder(data_type: &arrow_schema::DataType) -> Result<Box<dyn ArrayBuilder>> {
fn create_builder(
data_type: &arrow_schema::DataType,
capacity: usize,
) -> Result<Box<dyn ArrayBuilder>> {
match data_type {
arrow_schema::DataType::Int8 => Ok(Box::new(Int8Builder::new())),
arrow_schema::DataType::Int16 => Ok(Box::new(Int16Builder::new())),
arrow_schema::DataType::Int32 => Ok(Box::new(Int32Builder::new())),
arrow_schema::DataType::Int64 => Ok(Box::new(Int64Builder::new())),
arrow_schema::DataType::UInt8 => Ok(Box::new(UInt8Builder::new())),
arrow_schema::DataType::UInt16 => Ok(Box::new(UInt16Builder::new())),
arrow_schema::DataType::UInt32 => Ok(Box::new(UInt32Builder::new())),
arrow_schema::DataType::UInt64 => Ok(Box::new(UInt64Builder::new())),
arrow_schema::DataType::Float32 => Ok(Box::new(Float32Builder::new())),
arrow_schema::DataType::Float64 => Ok(Box::new(Float64Builder::new())),
arrow_schema::DataType::Boolean => Ok(Box::new(BooleanBuilder::new())),
arrow_schema::DataType::Utf8 => Ok(Box::new(StringBuilder::new())),
arrow_schema::DataType::Binary => Ok(Box::new(BinaryBuilder::new())),
arrow_schema::DataType::FixedSizeBinary(size) => {
Ok(Box::new(FixedSizeBinaryBuilder::new(*size)))
arrow_schema::DataType::Int8 => Ok(Box::new(Int8Builder::with_capacity(capacity))),
arrow_schema::DataType::Int16 => Ok(Box::new(Int16Builder::with_capacity(capacity))),
arrow_schema::DataType::Int32 => Ok(Box::new(Int32Builder::with_capacity(capacity))),
arrow_schema::DataType::Int64 => Ok(Box::new(Int64Builder::with_capacity(capacity))),
arrow_schema::DataType::UInt8 => Ok(Box::new(UInt8Builder::with_capacity(capacity))),
arrow_schema::DataType::UInt16 => Ok(Box::new(UInt16Builder::with_capacity(capacity))),
arrow_schema::DataType::UInt32 => Ok(Box::new(UInt32Builder::with_capacity(capacity))),
arrow_schema::DataType::UInt64 => Ok(Box::new(UInt64Builder::with_capacity(capacity))),
arrow_schema::DataType::Float32 => {
Ok(Box::new(Float32Builder::with_capacity(capacity)))
}
arrow_schema::DataType::Float64 => {
Ok(Box::new(Float64Builder::with_capacity(capacity)))
}
arrow_schema::DataType::Boolean => {
Ok(Box::new(BooleanBuilder::with_capacity(capacity)))
}
arrow_schema::DataType::Utf8 => Ok(Box::new(StringBuilder::with_capacity(
capacity,
capacity * VARIABLE_WIDTH_AVG_BYTES,
))),
arrow_schema::DataType::Binary => Ok(Box::new(BinaryBuilder::with_capacity(
capacity,
capacity * VARIABLE_WIDTH_AVG_BYTES,
))),
arrow_schema::DataType::FixedSizeBinary(size) => Ok(Box::new(
FixedSizeBinaryBuilder::with_capacity(capacity, *size),
)),
arrow_schema::DataType::Decimal128(precision, scale) => {
let builder = Decimal128Builder::new()
let builder = Decimal128Builder::with_capacity(capacity)
.with_precision_and_scale(*precision, *scale)
.map_err(|e| Error::IllegalArgument {
message: format!(
Expand All @@ -279,11 +300,13 @@ impl RowAppendRecordBatchBuilder {
})?;
Ok(Box::new(builder))
}
arrow_schema::DataType::Date32 => Ok(Box::new(Date32Builder::new())),
arrow_schema::DataType::Date32 => Ok(Box::new(Date32Builder::with_capacity(capacity))),
arrow_schema::DataType::Time32(unit) => match unit {
arrow_schema::TimeUnit::Second => Ok(Box::new(Time32SecondBuilder::new())),
arrow_schema::TimeUnit::Second => {
Ok(Box::new(Time32SecondBuilder::with_capacity(capacity)))
}
arrow_schema::TimeUnit::Millisecond => {
Ok(Box::new(Time32MillisecondBuilder::new()))
Ok(Box::new(Time32MillisecondBuilder::with_capacity(capacity)))
}
_ => Err(Error::IllegalArgument {
message: format!(
Expand All @@ -293,27 +316,29 @@ impl RowAppendRecordBatchBuilder {
},
arrow_schema::DataType::Time64(unit) => match unit {
arrow_schema::TimeUnit::Microsecond => {
Ok(Box::new(Time64MicrosecondBuilder::new()))
Ok(Box::new(Time64MicrosecondBuilder::with_capacity(capacity)))
}
arrow_schema::TimeUnit::Nanosecond => {
Ok(Box::new(Time64NanosecondBuilder::with_capacity(capacity)))
}
arrow_schema::TimeUnit::Nanosecond => Ok(Box::new(Time64NanosecondBuilder::new())),
_ => Err(Error::IllegalArgument {
message: format!(
"Time64 only supports Microsecond and Nanosecond units, got: {unit:?}"
),
}),
},
arrow_schema::DataType::Timestamp(arrow_schema::TimeUnit::Second, _) => {
Ok(Box::new(TimestampSecondBuilder::new()))
}
arrow_schema::DataType::Timestamp(arrow_schema::TimeUnit::Millisecond, _) => {
Ok(Box::new(TimestampMillisecondBuilder::new()))
}
arrow_schema::DataType::Timestamp(arrow_schema::TimeUnit::Microsecond, _) => {
Ok(Box::new(TimestampMicrosecondBuilder::new()))
}
arrow_schema::DataType::Timestamp(arrow_schema::TimeUnit::Nanosecond, _) => {
Ok(Box::new(TimestampNanosecondBuilder::new()))
Ok(Box::new(TimestampSecondBuilder::with_capacity(capacity)))
}
arrow_schema::DataType::Timestamp(arrow_schema::TimeUnit::Millisecond, _) => Ok(
Box::new(TimestampMillisecondBuilder::with_capacity(capacity)),
),
arrow_schema::DataType::Timestamp(arrow_schema::TimeUnit::Microsecond, _) => Ok(
Box::new(TimestampMicrosecondBuilder::with_capacity(capacity)),
),
arrow_schema::DataType::Timestamp(arrow_schema::TimeUnit::Nanosecond, _) => Ok(
Box::new(TimestampNanosecondBuilder::with_capacity(capacity)),
),
dt => Err(Error::IllegalArgument {
message: format!("Unsupported data type: {dt:?}"),
}),
Expand Down Expand Up @@ -1701,7 +1726,8 @@ mod tests {

// Test valid builder creation with precision=10, scale=2
let mut builder =
RowAppendRecordBatchBuilder::create_builder(&ArrowDataType::Decimal128(10, 2)).unwrap();
RowAppendRecordBatchBuilder::create_builder(&ArrowDataType::Decimal128(10, 2), 256)
.unwrap();
let decimal_builder = builder
.as_any_mut()
.downcast_mut::<Decimal128Builder>()
Expand All @@ -1712,7 +1738,7 @@ mod tests {

// Test error case: invalid precision/scale
let result =
RowAppendRecordBatchBuilder::create_builder(&ArrowDataType::Decimal128(100, 50));
RowAppendRecordBatchBuilder::create_builder(&ArrowDataType::Decimal128(100, 50), 256);
assert!(result.is_err());
}

Expand Down
Loading