diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index 7135e15..42128d6 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -77,7 +77,7 @@ jobs: - uses: actions/checkout@v4 - name: Test - run: cargo test --all-targets --workspace + run: cargo test -p paimon --all-targets env: RUST_LOG: DEBUG RUST_BACKTRACE: full @@ -90,6 +90,12 @@ jobs: - name: Start Docker containers run: make docker-up + - name: Integration Test + run: cargo test -p paimon-integration-tests --all-targets + env: + RUST_LOG: DEBUG + RUST_BACKTRACE: full + - name: Stop Docker containers if: always() run: make docker-down diff --git a/Cargo.toml b/Cargo.toml index f589335..0ae4fa0 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -17,7 +17,7 @@ [workspace] resolver = "2" -members = ["crates/paimon"] +members = ["crates/paimon", "crates/integration_tests"] [workspace.package] version = "0.0.0" @@ -26,3 +26,7 @@ homepage = "https://paimon.apache.org/" repository = "https://github.com/apache/paimon-rust" license = "Apache-2.0" rust-version = "1.86.0" + +[workspace.dependencies] +arrow-array = "57.0" +parquet = "57.0" \ No newline at end of file diff --git a/crates/integration_tests/Cargo.toml b/crates/integration_tests/Cargo.toml new file mode 100644 index 0000000..a4753bf --- /dev/null +++ b/crates/integration_tests/Cargo.toml @@ -0,0 +1,30 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on +# an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +[package] +name = "paimon-integration-tests" +version.workspace = true +edition.workspace = true +license.workspace = true +repository.workspace = true +homepage.workspace = true + +[dependencies] +paimon = { path = "../paimon" } +arrow-array = { workspace = true } +tokio = { version = "1", features = ["macros", "rt-multi-thread"] } +futures = "0.3" diff --git a/crates/integration_tests/tests/read_log_tables.rs b/crates/integration_tests/tests/read_log_tables.rs new file mode 100644 index 0000000..e7b42f7 --- /dev/null +++ b/crates/integration_tests/tests/read_log_tables.rs @@ -0,0 +1,104 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on +// an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +//! Integration tests for reading Paimon log tables (system tables). +//! +//! Paimon log tables are system tables that contain metadata about the table, +//! such as snapshots, manifests, schemas, etc. They are stored as Parquet files +//! and can be read using the Arrow reader. + +use arrow_array::{Int32Array, StringArray}; +use futures::TryStreamExt; +use paimon::catalog::Identifier; +use paimon::{Catalog, FileSystemCatalog}; + +/// Get the test warehouse path from environment variable or use default. +fn get_test_warehouse() -> String { + std::env::var("PAIMON_TEST_WAREHOUSE").unwrap_or_else(|_| "/tmp/paimon-warehouse".to_string()) +} + +/// Test reading a table and verifying the data matches expected values. +/// +/// The table was populated with: (1, 'alice'), (2, 'bob'), (3, 'carol') +#[tokio::test] +async fn test_read_log_table() { + let warehouse = get_test_warehouse(); + let catalog = FileSystemCatalog::new(warehouse).expect("Failed to create catalog"); + + // Get the table + let identifier = Identifier::new("default", "simple_log_table"); + + let table = catalog + .get_table(&identifier) + .await + .expect("Failed to get table"); + + // Scan the table + let read_builder = table.new_read_builder(); + let read = read_builder.new_read().expect("Failed to create read"); + let scan = read_builder.new_scan(); + + let plan = scan.plan().await.expect("Failed to plan scan"); + + // Read to Arrow + let stream = read + .to_arrow(plan.splits()) + .expect("Failed to create arrow stream"); + + let batches: Vec<_> = stream + .try_collect() + .await + .expect("Failed to collect batches"); + + assert!( + !batches.is_empty(), + "Expected at least one batch from table" + ); + + // Collect all rows as (id, name) tuples + let mut actual_rows: Vec<(i32, String)> = Vec::new(); + + for batch in &batches { + let id_array = batch + .column_by_name("id") + .and_then(|c| c.as_any().downcast_ref::()) + .expect("Expected Int32Array for id column"); + let name_array = batch + .column_by_name("name") + .and_then(|c| c.as_any().downcast_ref::()) + .expect("Expected StringArray for name column"); + + for i in 0..batch.num_rows() { + actual_rows.push((id_array.value(i), name_array.value(i).to_string())); + } + } + + // Expected data: (1, 'alice'), (2, 'bob'), (3, 'carol') + let expected_rows = vec![ + (1, "alice".to_string()), + (2, "bob".to_string()), + (3, "carol".to_string()), + ]; + + // Sort for consistent comparison + actual_rows.sort_by_key(|(id, _)| *id); + + assert_eq!( + actual_rows, expected_rows, + "Rows should match expected values" + ); +} diff --git a/crates/paimon/Cargo.toml b/crates/paimon/Cargo.toml index d0050de..4f6c6a6 100644 --- a/crates/paimon/Cargo.toml +++ b/crates/paimon/Cargo.toml @@ -49,9 +49,13 @@ snafu = "0.8.3" typed-builder = "^0.19" opendal = { version = "0.49", features = ["services-fs"] } pretty_assertions = "1" -apache-avro = { version = "0.17", features = ["snappy"] } +apache-avro = { version = "0.17", features = ["snappy", "zstandard"] } indexmap = "2.5.0" roaring = "0.10" +arrow-array = { workspace = true } +futures = "0.3" +parquet = { workspace = true, features = ["async", "zstd"] } +async-stream = "0.3.6" [dev-dependencies] rand = "0.8.5" diff --git a/crates/paimon/src/arrow/mod.rs b/crates/paimon/src/arrow/mod.rs new file mode 100644 index 0000000..0ed18b8 --- /dev/null +++ b/crates/paimon/src/arrow/mod.rs @@ -0,0 +1,20 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +mod reader; + +pub use crate::arrow::reader::ArrowReaderBuilder; diff --git a/crates/paimon/src/arrow/reader.rs b/crates/paimon/src/arrow/reader.rs new file mode 100644 index 0000000..209874f --- /dev/null +++ b/crates/paimon/src/arrow/reader.rs @@ -0,0 +1,170 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use crate::io::{FileIO, FileRead, FileStatus}; +use crate::table::ArrowRecordBatchStream; +use crate::{DataSplit, Error}; +use async_stream::try_stream; +use bytes::Bytes; +use futures::future::BoxFuture; +use futures::{StreamExt, TryFutureExt}; +use parquet::arrow::arrow_reader::ArrowReaderOptions; +use parquet::arrow::async_reader::{AsyncFileReader, MetadataFetch}; +use parquet::arrow::ParquetRecordBatchStreamBuilder; +use parquet::file::metadata::ParquetMetaData; +use parquet::file::metadata::ParquetMetaDataReader; +use std::ops::Range; +use std::sync::Arc; +use tokio::try_join; + +/// Builder to create ArrowReader +pub struct ArrowReaderBuilder { + batch_size: Option, + file_io: FileIO, +} + +impl ArrowReaderBuilder { + /// Create a new ArrowReaderBuilder + pub(crate) fn new(file_io: FileIO) -> Self { + ArrowReaderBuilder { + batch_size: None, + file_io, + } + } + + /// Build the ArrowReader. + pub fn build(self) -> ArrowReader { + ArrowReader { + batch_size: self.batch_size, + file_io: self.file_io, + } + } +} + +/// Reads data from Parquet files +#[derive(Clone)] +pub struct ArrowReader { + batch_size: Option, + file_io: FileIO, +} + +impl ArrowReader { + /// Take a stream of DataSplits and read every data file in each split. + /// Returns a stream of Arrow RecordBatches from all files. + pub fn read(self, data_splits: &[DataSplit]) -> crate::Result { + let file_io = self.file_io.clone(); + let batch_size = self.batch_size; + let paths_to_read: Vec = data_splits + .iter() + .flat_map(|ds| ds.data_file_entries().map(|(p, _)| p)) + .map(|p| { + if !p.to_ascii_lowercase().ends_with(".parquet") { + Err(Error::Unsupported { + message: format!( + "unsupported file format: only .parquet is supported, got: {p}" + ), + }) + } else { + Ok(p) + } + }) + .collect::, _>>()?; + Ok(try_stream! { + for path_to_read in paths_to_read { + let parquet_file = file_io.new_input(&path_to_read)?; + + let (parquet_metadata, parquet_reader) = try_join!( + parquet_file.metadata(), + parquet_file.reader() + )?; + + let arrow_file_reader = ArrowFileReader::new(parquet_metadata, parquet_reader); + + let mut batch_stream_builder = + ParquetRecordBatchStreamBuilder::new(arrow_file_reader) + .await?; + + if let Some(size) = batch_size { + batch_stream_builder = batch_stream_builder.with_batch_size(size); + } + + let mut batch_stream = batch_stream_builder.build()?; + + while let Some(batch) = batch_stream.next().await { + yield batch? + } + } + } + .boxed()) + } +} + +/// ArrowFileReader is a wrapper around a FileRead that impls parquets AsyncFileReader. +/// +/// # TODO +/// +/// [ParquetObjectReader](https://docs.rs/parquet/latest/src/parquet/arrow/async_reader/store.rs.html#64) +/// contains the following hints to speed up metadata loading, similar to iceberg, we can consider adding them to this struct: +/// +/// - `metadata_size_hint`: Provide a hint as to the size of the parquet file's footer. +/// - `preload_column_index`: Load the Column Index as part of [`Self::get_metadata`]. +/// - `preload_offset_index`: Load the Offset Index as part of [`Self::get_metadata`]. +struct ArrowFileReader { + meta: FileStatus, + r: R, +} + +impl ArrowFileReader { + /// Create a new ArrowFileReader + fn new(meta: FileStatus, r: R) -> Self { + Self { meta, r } + } + + fn read_bytes(&mut self, range: Range) -> BoxFuture<'_, parquet::errors::Result> { + Box::pin(self.r.read(range.start..range.end).map_err(|err| { + let err_msg = format!("{err}"); + parquet::errors::ParquetError::External(err_msg.into()) + })) + } +} + +impl MetadataFetch for ArrowFileReader { + fn fetch(&mut self, range: Range) -> BoxFuture<'_, parquet::errors::Result> { + self.read_bytes(range) + } +} + +impl AsyncFileReader for ArrowFileReader { + fn get_bytes(&mut self, range: Range) -> BoxFuture<'_, parquet::errors::Result> { + self.read_bytes(range) + } + + fn get_metadata( + &mut self, + options: Option<&ArrowReaderOptions>, + ) -> BoxFuture<'_, parquet::errors::Result>> { + let metadata_opts = options.map(|o| o.metadata_options().clone()); + Box::pin(async move { + let file_size = self.meta.size; + let metadata = ParquetMetaDataReader::new() + .with_metadata_options(metadata_opts) + .load_and_finish(self, file_size) + .await?; + Ok(Arc::new(metadata)) + }) + } +} diff --git a/crates/paimon/src/catalog/mod.rs b/crates/paimon/src/catalog/mod.rs index 086e98b..4b43ffa 100644 --- a/crates/paimon/src/catalog/mod.rs +++ b/crates/paimon/src/catalog/mod.rs @@ -25,6 +25,7 @@ mod filesystem; use std::collections::HashMap; use std::fmt; +pub use filesystem::*; use serde::{Deserialize, Serialize}; /// Splitter for system table names (e.g. `table$snapshots`). diff --git a/crates/paimon/src/error.rs b/crates/paimon/src/error.rs index 88cf5f5..6c744b4 100644 --- a/crates/paimon/src/error.rs +++ b/crates/paimon/src/error.rs @@ -82,6 +82,15 @@ pub enum Error { )] FileIndexFormatInvalid { message: String }, + #[snafu( + visibility(pub(crate)), + display("Paimon hitting unexpected parquet error: {}", message) + )] + ParquetDataUnexpected { + message: String, + source: Box, + }, + // ======================= catalog errors =============================== #[snafu(display("Database {} already exists.", database))] DatabaseAlreadyExist { database: String }, @@ -119,3 +128,12 @@ impl From for Error { } } } + +impl From for Error { + fn from(source: parquet::errors::ParquetError) -> Self { + Error::ParquetDataUnexpected { + message: format!("Failed to read a Parquet file: {source}"), + source: Box::new(source), + } + } +} diff --git a/crates/paimon/src/lib.rs b/crates/paimon/src/lib.rs index b3095a3..d7a8983 100644 --- a/crates/paimon/src/lib.rs +++ b/crates/paimon/src/lib.rs @@ -19,12 +19,17 @@ mod error; pub use error::Error; pub use error::Result; +mod arrow; pub mod catalog; mod deletion_vector; pub mod file_index; pub mod io; pub mod spec; -mod table; +pub mod table; + +pub use catalog::Catalog; +pub use catalog::FileSystemCatalog; + pub use table::{ DataSplit, DataSplitBuilder, Plan, ReadBuilder, SnapshotManager, Table, TableRead, TableScan, }; diff --git a/crates/paimon/src/spec/types.rs b/crates/paimon/src/spec/types.rs index 4001638..e0ba04a 100644 --- a/crates/paimon/src/spec/types.rs +++ b/crates/paimon/src/spec/types.rs @@ -1237,13 +1237,22 @@ impl Default for VarCharType { } } +/// Alias for variable-length string used in schema JSON (Java: VarCharType.STRING_TYPE). +const STRING_TYPE_NAME: &str = "STRING"; + impl FromStr for VarCharType { type Err = Error; fn from_str(s: &str) -> Result { + let s = s.trim(); + if s.starts_with(STRING_TYPE_NAME) { + let nullable = !s.contains("NOT NULL"); + return VarCharType::with_nullable(nullable, Self::MAX_LENGTH); + } if !s.starts_with(serde_utils::VARCHAR::NAME) { return DataTypeInvalidSnafu { - message: "Invalid VARCHAR type. Expected string to start with 'VARCHAR'.", + message: + "Invalid VARCHAR type. Expected string to start with 'VARCHAR' or 'STRING'.", } .fail(); } diff --git a/crates/paimon/src/table/mod.rs b/crates/paimon/src/table/mod.rs index 4011f52..0e972b5 100644 --- a/crates/paimon/src/table/mod.rs +++ b/crates/paimon/src/table/mod.rs @@ -22,6 +22,9 @@ mod snapshot_manager; mod source; mod table_scan; +use crate::Result; +use arrow_array::RecordBatch; +use futures::stream::BoxStream; pub use read_builder::{ReadBuilder, TableRead}; pub use snapshot_manager::SnapshotManager; pub use source::{DataSplit, DataSplitBuilder, Plan}; @@ -84,3 +87,6 @@ impl Table { ReadBuilder::new(self) } } + +/// A stream of arrow [`RecordBatch`]es. +pub type ArrowRecordBatchStream = BoxStream<'static, Result>; diff --git a/crates/paimon/src/table/read_builder.rs b/crates/paimon/src/table/read_builder.rs index 14f56b1..4054aa7 100644 --- a/crates/paimon/src/table/read_builder.rs +++ b/crates/paimon/src/table/read_builder.rs @@ -20,10 +20,11 @@ //! Reference: [pypaimon.read.read_builder.ReadBuilder](https://github.com/apache/paimon/blob/master/paimon-python/pypaimon/read/read_builder.py) //! and [pypaimon.table.file_store_table.FileStoreTable.new_read_builder](https://github.com/apache/paimon/blob/master/paimon-python/pypaimon/table/file_store_table.py). +use super::{ArrowRecordBatchStream, Table, TableScan}; +use crate::arrow::ArrowReaderBuilder; use crate::spec::DataField; use crate::Result; - -use super::{Table, TableScan}; +use crate::{DataSplit, Error}; /// Builder for table scan and table read (new_scan, new_read). /// @@ -72,4 +73,27 @@ impl<'a> TableRead<'a> { pub fn table(&self) -> &Table { self.table } + + /// Returns an [`ArrowRecordBatchStream`]. + pub fn to_arrow(&self, data_splits: &[DataSplit]) -> crate::Result { + // todo: consider get read batch size from table + if !self.table.schema.primary_keys().is_empty() { + return Err(Error::Unsupported { + message: format!( + "Reading tables with primary keys is not yet supported. Primary keys: {:?}", + self.table.schema.primary_keys() + ), + }); + } + if !self.table.schema.partition_keys().is_empty() { + return Err(Error::Unsupported { + message: format!( + "Reading partitioned tables is not yet supported. Partition keys: {:?}", + self.table.schema.partition_keys() + ), + }); + } + let arrow_reader_builder = ArrowReaderBuilder::new(self.table.file_io.clone()).build(); + arrow_reader_builder.read(data_splits) + } } diff --git a/crates/paimon/src/table/source.rs b/crates/paimon/src/table/source.rs index 10486f9..420b662 100644 --- a/crates/paimon/src/table/source.rs +++ b/crates/paimon/src/table/source.rs @@ -59,6 +59,18 @@ impl DataSplit { &self.data_files } + /// Iterate over each data file in this split, yielding `(path, &DataFileMeta)`. + /// Use this to read each data file one by one (e.g. in ArrowReader). + pub fn data_file_entries(&self) -> impl Iterator + '_ { + let base = self.bucket_path.trim_end_matches('/'); + // todo: consider partition table + // todo: consider external path + self.data_files.iter().map(move |file| { + let path = format!("{}/{}", base, file.file_name); + (path, file) + }) + } + /// Total row count of all data files in this split. pub fn row_count(&self) -> i64 { self.data_files.iter().map(|f| f.row_count).sum() diff --git a/deny.toml b/deny.toml index 943c3f8..c11cec6 100644 --- a/deny.toml +++ b/deny.toml @@ -22,6 +22,7 @@ allow = [ "BSD-2-Clause", "BSD-3-Clause", "ISC", + "CC0-1.0", "MIT", "Unicode-3.0", "Zlib", diff --git a/dev/spark/provision.py b/dev/spark/provision.py index 00e3cca..435a5b2 100644 --- a/dev/spark/provision.py +++ b/dev/spark/provision.py @@ -28,15 +28,14 @@ def main(): # Use Paimon catalog (configured in spark-defaults.conf with warehouse file:/tmp/paimon-warehouse) spark.sql("USE paimon.default") - # Table: simple keyed table for read tests + # Table: simple log table for read tests spark.sql(""" - CREATE TABLE IF NOT EXISTS simple ( + CREATE TABLE IF NOT EXISTS simple_log_table ( id INT, name STRING ) USING paimon - TBLPROPERTIES ('primary-key' = 'id') """) - spark.sql("INSERT INTO simple VALUES (1, 'alice'), (2, 'bob'), (3, 'carol')") + spark.sql("INSERT INTO simple_log_table VALUES (1, 'alice'), (2, 'bob'), (3, 'carol')") if __name__ == "__main__": main()