Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 7 additions & 1 deletion .github/workflows/ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,7 @@ jobs:
- uses: actions/checkout@v4

- name: Test
run: cargo test --all-targets --workspace
run: cargo test -p paimon --all-targets
env:
RUST_LOG: DEBUG
RUST_BACKTRACE: full
Expand All @@ -90,6 +90,12 @@ jobs:
- name: Start Docker containers
run: make docker-up

- name: Integration Test
run: cargo test -p paimon-integration-tests --all-targets
env:
RUST_LOG: DEBUG
RUST_BACKTRACE: full

- name: Stop Docker containers
if: always()
run: make docker-down
6 changes: 5 additions & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@

[workspace]
resolver = "2"
members = ["crates/paimon"]
members = ["crates/paimon", "crates/integration_tests"]

[workspace.package]
version = "0.0.0"
Expand All @@ -26,3 +26,7 @@ homepage = "https://paimon.apache.org/"
repository = "https://github.com/apache/paimon-rust"
license = "Apache-2.0"
rust-version = "1.86.0"

[workspace.dependencies]
arrow-array = "57.0"
parquet = "57.0"
30 changes: 30 additions & 0 deletions crates/integration_tests/Cargo.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on
# an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.

[package]
name = "paimon-integration-tests"
version.workspace = true
edition.workspace = true
license.workspace = true
repository.workspace = true
homepage.workspace = true

[dependencies]
paimon = { path = "../paimon" }
arrow-array = { workspace = true }
tokio = { version = "1", features = ["macros", "rt-multi-thread"] }
futures = "0.3"
104 changes: 104 additions & 0 deletions crates/integration_tests/tests/read_log_tables.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,104 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on
// an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

//! Integration tests for reading Paimon log tables (system tables).
//!
//! Paimon log tables are system tables that contain metadata about the table,
//! such as snapshots, manifests, schemas, etc. They are stored as Parquet files
//! and can be read using the Arrow reader.

use arrow_array::{Int32Array, StringArray};
use futures::TryStreamExt;
use paimon::catalog::Identifier;
use paimon::{Catalog, FileSystemCatalog};

/// Get the test warehouse path from environment variable or use default.
fn get_test_warehouse() -> String {
std::env::var("PAIMON_TEST_WAREHOUSE").unwrap_or_else(|_| "/tmp/paimon-warehouse".to_string())
}

/// Test reading a table and verifying the data matches expected values.
///
/// The table was populated with: (1, 'alice'), (2, 'bob'), (3, 'carol')
#[tokio::test]
async fn test_read_log_table() {
let warehouse = get_test_warehouse();
let catalog = FileSystemCatalog::new(warehouse).expect("Failed to create catalog");

// Get the table
let identifier = Identifier::new("default", "simple_log_table");

let table = catalog
.get_table(&identifier)
.await
.expect("Failed to get table");

// Scan the table
let read_builder = table.new_read_builder();
let read = read_builder.new_read().expect("Failed to create read");
let scan = read_builder.new_scan();

let plan = scan.plan().await.expect("Failed to plan scan");

// Read to Arrow
let stream = read
.to_arrow(plan.splits())
.expect("Failed to create arrow stream");

let batches: Vec<_> = stream
.try_collect()
.await
.expect("Failed to collect batches");

assert!(
!batches.is_empty(),
"Expected at least one batch from table"
);

// Collect all rows as (id, name) tuples
let mut actual_rows: Vec<(i32, String)> = Vec::new();

for batch in &batches {
let id_array = batch
.column_by_name("id")
.and_then(|c| c.as_any().downcast_ref::<Int32Array>())
.expect("Expected Int32Array for id column");
let name_array = batch
.column_by_name("name")
.and_then(|c| c.as_any().downcast_ref::<StringArray>())
.expect("Expected StringArray for name column");

for i in 0..batch.num_rows() {
actual_rows.push((id_array.value(i), name_array.value(i).to_string()));
}
}

// Expected data: (1, 'alice'), (2, 'bob'), (3, 'carol')
let expected_rows = vec![
(1, "alice".to_string()),
(2, "bob".to_string()),
(3, "carol".to_string()),
];

// Sort for consistent comparison
actual_rows.sort_by_key(|(id, _)| *id);

assert_eq!(
actual_rows, expected_rows,
"Rows should match expected values"
);
}
6 changes: 5 additions & 1 deletion crates/paimon/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -49,9 +49,13 @@ snafu = "0.8.3"
typed-builder = "^0.19"
opendal = { version = "0.49", features = ["services-fs"] }
pretty_assertions = "1"
apache-avro = { version = "0.17", features = ["snappy"] }
apache-avro = { version = "0.17", features = ["snappy", "zstandard"] }
indexmap = "2.5.0"
roaring = "0.10"
arrow-array = { workspace = true }
futures = "0.3"
parquet = { workspace = true, features = ["async", "zstd"] }
async-stream = "0.3.6"

[dev-dependencies]
rand = "0.8.5"
Expand Down
20 changes: 20 additions & 0 deletions crates/paimon/src/arrow/mod.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

mod reader;

pub use crate::arrow::reader::ArrowReaderBuilder;
170 changes: 170 additions & 0 deletions crates/paimon/src/arrow/reader.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,170 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

use crate::io::{FileIO, FileRead, FileStatus};
use crate::table::ArrowRecordBatchStream;
use crate::{DataSplit, Error};
use async_stream::try_stream;
use bytes::Bytes;
use futures::future::BoxFuture;
use futures::{StreamExt, TryFutureExt};
use parquet::arrow::arrow_reader::ArrowReaderOptions;
use parquet::arrow::async_reader::{AsyncFileReader, MetadataFetch};
use parquet::arrow::ParquetRecordBatchStreamBuilder;
use parquet::file::metadata::ParquetMetaData;
use parquet::file::metadata::ParquetMetaDataReader;
use std::ops::Range;
use std::sync::Arc;
use tokio::try_join;

/// Builder to create ArrowReader
pub struct ArrowReaderBuilder {
batch_size: Option<usize>,
file_io: FileIO,
}

impl ArrowReaderBuilder {
/// Create a new ArrowReaderBuilder
pub(crate) fn new(file_io: FileIO) -> Self {
ArrowReaderBuilder {
batch_size: None,
file_io,
}
}

/// Build the ArrowReader.
pub fn build(self) -> ArrowReader {
ArrowReader {
batch_size: self.batch_size,
file_io: self.file_io,
}
}
}

/// Reads data from Parquet files
#[derive(Clone)]
pub struct ArrowReader {
batch_size: Option<usize>,
file_io: FileIO,
}

impl ArrowReader {
/// Take a stream of DataSplits and read every data file in each split.
/// Returns a stream of Arrow RecordBatches from all files.
pub fn read(self, data_splits: &[DataSplit]) -> crate::Result<ArrowRecordBatchStream> {
let file_io = self.file_io.clone();
let batch_size = self.batch_size;
let paths_to_read: Vec<String> = data_splits
.iter()
.flat_map(|ds| ds.data_file_entries().map(|(p, _)| p))
.map(|p| {
if !p.to_ascii_lowercase().ends_with(".parquet") {

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

only parquet?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, let's only support parquet now. We can support more in next following pr.

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

👌

Copy link

@XiaoHongbo-Hope XiaoHongbo-Hope Mar 10, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I can get your point, only parquet is supported. But I am afraid that this check is not enough. See DataFilePathFactory.formatIdentifier. Or can we have a better way to get the format.

Copy link
Contributor Author

@luoyuxia luoyuxia Mar 10, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I understand the concern. However, for parquet files we normally do not expect extra compression suffixes like .gz, since compression is handled within the parquet format itself.

So for the current parquet-only read path, I think checking for .parquet should be sufficient. If we later need to support other file naming conventions, we can revisit this and align with DataFilePathFactory.formatIdentifier.

Btw, From the Python side, it seems we only inspect the last extension via os.path.splitext(). That means a path like *.json.gz would be identified as gz, not json.

So the current behavior there does not appear to handle compressed suffixes in a generalized way either.

For parquet, this is less of a concern since we normally do not expect an extra compression suffix such as .parquet.gz.

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for your very kind explanation. And thanks again for pointing out the issue in python side, I wiil check it.

Err(Error::Unsupported {
message: format!(
"unsupported file format: only .parquet is supported, got: {p}"
),
})
} else {
Ok(p)
}
})
.collect::<Result<Vec<_>, _>>()?;
Ok(try_stream! {
for path_to_read in paths_to_read {
let parquet_file = file_io.new_input(&path_to_read)?;

let (parquet_metadata, parquet_reader) = try_join!(
parquet_file.metadata(),
parquet_file.reader()
)?;

let arrow_file_reader = ArrowFileReader::new(parquet_metadata, parquet_reader);

let mut batch_stream_builder =
ParquetRecordBatchStreamBuilder::new(arrow_file_reader)
.await?;

if let Some(size) = batch_size {
batch_stream_builder = batch_stream_builder.with_batch_size(size);
}

let mut batch_stream = batch_stream_builder.build()?;

while let Some(batch) = batch_stream.next().await {
yield batch?
}
}
}
.boxed())
}
}

/// ArrowFileReader is a wrapper around a FileRead that impls parquets AsyncFileReader.
///
/// # TODO
///
/// [ParquetObjectReader](https://docs.rs/parquet/latest/src/parquet/arrow/async_reader/store.rs.html#64)
/// contains the following hints to speed up metadata loading, similar to iceberg, we can consider adding them to this struct:
///
/// - `metadata_size_hint`: Provide a hint as to the size of the parquet file's footer.
/// - `preload_column_index`: Load the Column Index as part of [`Self::get_metadata`].
/// - `preload_offset_index`: Load the Offset Index as part of [`Self::get_metadata`].
struct ArrowFileReader<R: FileRead> {
meta: FileStatus,
r: R,
}

impl<R: FileRead> ArrowFileReader<R> {
/// Create a new ArrowFileReader
fn new(meta: FileStatus, r: R) -> Self {
Self { meta, r }
}

fn read_bytes(&mut self, range: Range<u64>) -> BoxFuture<'_, parquet::errors::Result<Bytes>> {
Box::pin(self.r.read(range.start..range.end).map_err(|err| {
let err_msg = format!("{err}");
parquet::errors::ParquetError::External(err_msg.into())
}))
}
}

impl<R: FileRead> MetadataFetch for ArrowFileReader<R> {
fn fetch(&mut self, range: Range<u64>) -> BoxFuture<'_, parquet::errors::Result<Bytes>> {
self.read_bytes(range)
}
}

impl<R: FileRead> AsyncFileReader for ArrowFileReader<R> {
fn get_bytes(&mut self, range: Range<u64>) -> BoxFuture<'_, parquet::errors::Result<Bytes>> {
self.read_bytes(range)
}

fn get_metadata(
&mut self,
options: Option<&ArrowReaderOptions>,
) -> BoxFuture<'_, parquet::errors::Result<Arc<ParquetMetaData>>> {
let metadata_opts = options.map(|o| o.metadata_options().clone());
Box::pin(async move {
let file_size = self.meta.size;
let metadata = ParquetMetaDataReader::new()
.with_metadata_options(metadata_opts)
.load_and_finish(self, file_size)
.await?;
Ok(Arc::new(metadata))
})
}
}
1 change: 1 addition & 0 deletions crates/paimon/src/catalog/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ mod filesystem;
use std::collections::HashMap;
use std::fmt;

pub use filesystem::*;
use serde::{Deserialize, Serialize};

/// Splitter for system table names (e.g. `table$snapshots`).
Expand Down
Loading
Loading