diff --git a/crates/examples/Cargo.toml b/crates/examples/Cargo.toml index 26251cc7..b9187395 100644 --- a/crates/examples/Cargo.toml +++ b/crates/examples/Cargo.toml @@ -27,6 +27,10 @@ version = { workspace = true } fluss = { workspace = true, features = ["storage-all"] } tokio = { workspace = true } clap = { workspace = true } + +[target.'cfg(not(target_env = "msvc"))'.dependencies] +tikv-jemallocator = "0.6" + [[example]] name = "example-table" path = "src/example_table.rs" diff --git a/crates/examples/src/example_table.rs b/crates/examples/src/example_table.rs index e4ad1fbd..49f0ab4c 100644 --- a/crates/examples/src/example_table.rs +++ b/crates/examples/src/example_table.rs @@ -15,6 +15,10 @@ // specific language governing permissions and limitations // under the License. +#[cfg(not(target_env = "msvc"))] +#[global_allocator] +static GLOBAL: tikv_jemallocator::Jemalloc = tikv_jemallocator::Jemalloc; + mod example_kv_table; mod example_partitioned_kv_table; diff --git a/crates/fluss/Cargo.toml b/crates/fluss/Cargo.toml index db1348a0..c0ba6f8d 100644 --- a/crates/fluss/Cargo.toml +++ b/crates/fluss/Cargo.toml @@ -80,6 +80,5 @@ jiff = { workspace = true, features = ["js"] } testcontainers = "0.25.0" test-env-helpers = "0.2.2" - [build-dependencies] prost-build = { version = "0.14" } diff --git a/crates/fluss/src/lib.rs b/crates/fluss/src/lib.rs index cd060c8e..13e85981 100644 --- a/crates/fluss/src/lib.rs +++ b/crates/fluss/src/lib.rs @@ -101,6 +101,25 @@ //! Ok(()) //! } //! ``` +//! +//! # Performance +//! +//! For production deployments on Linux, we recommend using +//! [jemalloc](https://crates.io/crates/tikv-jemallocator) as the global allocator. +//! The default glibc allocator (ptmalloc2) can cause RSS bloat and fragmentation under +//! sustained write loads due to repeated same-size alloc/free cycles in Arrow batch building. +//! jemalloc's thread-local size-class bins handle this pattern efficiently. +//! +//! ```toml +//! [target.'cfg(not(target_env = "msvc"))'.dependencies] +//! tikv-jemallocator = "0.6" +//! ``` +//! +//! ```rust,ignore +//! #[cfg(not(target_env = "msvc"))] +//! #[global_allocator] +//! static GLOBAL: tikv_jemallocator::Jemalloc = tikv_jemallocator::Jemalloc; +//! ``` pub mod client; pub mod metadata; diff --git a/crates/fluss/src/record/arrow.rs b/crates/fluss/src/record/arrow.rs index ea27836e..7fd16194 100644 --- a/crates/fluss/src/record/arrow.rs +++ b/crates/fluss/src/record/arrow.rs @@ -151,8 +151,13 @@ pub const NO_BATCH_SEQUENCE: i32 = -1; pub const BUILDER_DEFAULT_OFFSET: i64 = 0; +// TODO: Switch to byte-size-based is_full() like Java's ArrowWriter instead of a hard record cap. pub const DEFAULT_MAX_RECORD: i32 = 256; +/// Estimated average byte size for variable-width columns (Utf8, Binary). +/// Used to pre-allocate data buffers and avoid reallocations during batch building. +const VARIABLE_WIDTH_AVG_BYTES: usize = 64; + pub struct MemoryLogRecordsArrowBuilder { base_log_offset: i64, schema_id: i32, @@ -236,11 +241,12 @@ pub struct RowAppendRecordBatchBuilder { impl RowAppendRecordBatchBuilder { pub fn new(row_type: &RowType) -> Result { + let capacity = DEFAULT_MAX_RECORD as usize; let schema_ref = to_arrow_schema(row_type)?; let builders: Result> = schema_ref .fields() .iter() - .map(|field| Self::create_builder(field.data_type())) + .map(|field| Self::create_builder(field.data_type(), capacity)) .collect(); let field_getters = FieldGetter::create_field_getters(row_type); Ok(Self { @@ -251,26 +257,41 @@ impl RowAppendRecordBatchBuilder { }) } - fn create_builder(data_type: &arrow_schema::DataType) -> Result> { + fn create_builder( + data_type: &arrow_schema::DataType, + capacity: usize, + ) -> Result> { match data_type { - arrow_schema::DataType::Int8 => Ok(Box::new(Int8Builder::new())), - arrow_schema::DataType::Int16 => Ok(Box::new(Int16Builder::new())), - arrow_schema::DataType::Int32 => Ok(Box::new(Int32Builder::new())), - arrow_schema::DataType::Int64 => Ok(Box::new(Int64Builder::new())), - arrow_schema::DataType::UInt8 => Ok(Box::new(UInt8Builder::new())), - arrow_schema::DataType::UInt16 => Ok(Box::new(UInt16Builder::new())), - arrow_schema::DataType::UInt32 => Ok(Box::new(UInt32Builder::new())), - arrow_schema::DataType::UInt64 => Ok(Box::new(UInt64Builder::new())), - arrow_schema::DataType::Float32 => Ok(Box::new(Float32Builder::new())), - arrow_schema::DataType::Float64 => Ok(Box::new(Float64Builder::new())), - arrow_schema::DataType::Boolean => Ok(Box::new(BooleanBuilder::new())), - arrow_schema::DataType::Utf8 => Ok(Box::new(StringBuilder::new())), - arrow_schema::DataType::Binary => Ok(Box::new(BinaryBuilder::new())), - arrow_schema::DataType::FixedSizeBinary(size) => { - Ok(Box::new(FixedSizeBinaryBuilder::new(*size))) + arrow_schema::DataType::Int8 => Ok(Box::new(Int8Builder::with_capacity(capacity))), + arrow_schema::DataType::Int16 => Ok(Box::new(Int16Builder::with_capacity(capacity))), + arrow_schema::DataType::Int32 => Ok(Box::new(Int32Builder::with_capacity(capacity))), + arrow_schema::DataType::Int64 => Ok(Box::new(Int64Builder::with_capacity(capacity))), + arrow_schema::DataType::UInt8 => Ok(Box::new(UInt8Builder::with_capacity(capacity))), + arrow_schema::DataType::UInt16 => Ok(Box::new(UInt16Builder::with_capacity(capacity))), + arrow_schema::DataType::UInt32 => Ok(Box::new(UInt32Builder::with_capacity(capacity))), + arrow_schema::DataType::UInt64 => Ok(Box::new(UInt64Builder::with_capacity(capacity))), + arrow_schema::DataType::Float32 => { + Ok(Box::new(Float32Builder::with_capacity(capacity))) + } + arrow_schema::DataType::Float64 => { + Ok(Box::new(Float64Builder::with_capacity(capacity))) + } + arrow_schema::DataType::Boolean => { + Ok(Box::new(BooleanBuilder::with_capacity(capacity))) } + arrow_schema::DataType::Utf8 => Ok(Box::new(StringBuilder::with_capacity( + capacity, + capacity * VARIABLE_WIDTH_AVG_BYTES, + ))), + arrow_schema::DataType::Binary => Ok(Box::new(BinaryBuilder::with_capacity( + capacity, + capacity * VARIABLE_WIDTH_AVG_BYTES, + ))), + arrow_schema::DataType::FixedSizeBinary(size) => Ok(Box::new( + FixedSizeBinaryBuilder::with_capacity(capacity, *size), + )), arrow_schema::DataType::Decimal128(precision, scale) => { - let builder = Decimal128Builder::new() + let builder = Decimal128Builder::with_capacity(capacity) .with_precision_and_scale(*precision, *scale) .map_err(|e| Error::IllegalArgument { message: format!( @@ -279,11 +300,13 @@ impl RowAppendRecordBatchBuilder { })?; Ok(Box::new(builder)) } - arrow_schema::DataType::Date32 => Ok(Box::new(Date32Builder::new())), + arrow_schema::DataType::Date32 => Ok(Box::new(Date32Builder::with_capacity(capacity))), arrow_schema::DataType::Time32(unit) => match unit { - arrow_schema::TimeUnit::Second => Ok(Box::new(Time32SecondBuilder::new())), + arrow_schema::TimeUnit::Second => { + Ok(Box::new(Time32SecondBuilder::with_capacity(capacity))) + } arrow_schema::TimeUnit::Millisecond => { - Ok(Box::new(Time32MillisecondBuilder::new())) + Ok(Box::new(Time32MillisecondBuilder::with_capacity(capacity))) } _ => Err(Error::IllegalArgument { message: format!( @@ -293,9 +316,11 @@ impl RowAppendRecordBatchBuilder { }, arrow_schema::DataType::Time64(unit) => match unit { arrow_schema::TimeUnit::Microsecond => { - Ok(Box::new(Time64MicrosecondBuilder::new())) + Ok(Box::new(Time64MicrosecondBuilder::with_capacity(capacity))) + } + arrow_schema::TimeUnit::Nanosecond => { + Ok(Box::new(Time64NanosecondBuilder::with_capacity(capacity))) } - arrow_schema::TimeUnit::Nanosecond => Ok(Box::new(Time64NanosecondBuilder::new())), _ => Err(Error::IllegalArgument { message: format!( "Time64 only supports Microsecond and Nanosecond units, got: {unit:?}" @@ -303,17 +328,17 @@ impl RowAppendRecordBatchBuilder { }), }, arrow_schema::DataType::Timestamp(arrow_schema::TimeUnit::Second, _) => { - Ok(Box::new(TimestampSecondBuilder::new())) - } - arrow_schema::DataType::Timestamp(arrow_schema::TimeUnit::Millisecond, _) => { - Ok(Box::new(TimestampMillisecondBuilder::new())) - } - arrow_schema::DataType::Timestamp(arrow_schema::TimeUnit::Microsecond, _) => { - Ok(Box::new(TimestampMicrosecondBuilder::new())) - } - arrow_schema::DataType::Timestamp(arrow_schema::TimeUnit::Nanosecond, _) => { - Ok(Box::new(TimestampNanosecondBuilder::new())) + Ok(Box::new(TimestampSecondBuilder::with_capacity(capacity))) } + arrow_schema::DataType::Timestamp(arrow_schema::TimeUnit::Millisecond, _) => Ok( + Box::new(TimestampMillisecondBuilder::with_capacity(capacity)), + ), + arrow_schema::DataType::Timestamp(arrow_schema::TimeUnit::Microsecond, _) => Ok( + Box::new(TimestampMicrosecondBuilder::with_capacity(capacity)), + ), + arrow_schema::DataType::Timestamp(arrow_schema::TimeUnit::Nanosecond, _) => Ok( + Box::new(TimestampNanosecondBuilder::with_capacity(capacity)), + ), dt => Err(Error::IllegalArgument { message: format!("Unsupported data type: {dt:?}"), }), @@ -1701,7 +1726,8 @@ mod tests { // Test valid builder creation with precision=10, scale=2 let mut builder = - RowAppendRecordBatchBuilder::create_builder(&ArrowDataType::Decimal128(10, 2)).unwrap(); + RowAppendRecordBatchBuilder::create_builder(&ArrowDataType::Decimal128(10, 2), 256) + .unwrap(); let decimal_builder = builder .as_any_mut() .downcast_mut::() @@ -1712,7 +1738,7 @@ mod tests { // Test error case: invalid precision/scale let result = - RowAppendRecordBatchBuilder::create_builder(&ArrowDataType::Decimal128(100, 50)); + RowAppendRecordBatchBuilder::create_builder(&ArrowDataType::Decimal128(100, 50), 256); assert!(result.is_err()); }