-
Notifications
You must be signed in to change notification settings - Fork 46
feat: support read to arrow #116
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,30 @@ | ||
| # Licensed to the Apache Software Foundation (ASF) under one | ||
| # or more contributor license agreements. See the NOTICE file | ||
| # distributed with this work for additional information | ||
| # regarding copyright ownership. The ASF licenses this file | ||
| # to you under the Apache License, Version 2.0 (the | ||
| # "License"); you may not use this file except in compliance | ||
| # with the License. You may obtain a copy of the License at | ||
| # | ||
| # http://www.apache.org/licenses/LICENSE-2.0 | ||
| # | ||
| # Unless required by applicable law or agreed to in writing, | ||
| # software distributed under the License is distributed on | ||
| # an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY | ||
| # KIND, either express or implied. See the License for the | ||
| # specific language governing permissions and limitations | ||
| # under the License. | ||
|
|
||
| [package] | ||
| name = "paimon-integration-tests" | ||
| version.workspace = true | ||
| edition.workspace = true | ||
| license.workspace = true | ||
| repository.workspace = true | ||
| homepage.workspace = true | ||
|
|
||
| [dependencies] | ||
| paimon = { path = "../paimon" } | ||
| arrow-array = { workspace = true } | ||
| tokio = { version = "1", features = ["macros", "rt-multi-thread"] } | ||
| futures = "0.3" |
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,104 @@ | ||
| // Licensed to the Apache Software Foundation (ASF) under one | ||
| // or more contributor license agreements. See the NOTICE file | ||
| // distributed with this work for additional information | ||
| // regarding copyright ownership. The ASF licenses this file | ||
| // to you under the Apache License, Version 2.0 (the | ||
| // "License"); you may not use this file except in compliance | ||
| // with the License. You may obtain a copy of the License at | ||
| // | ||
| // http://www.apache.org/licenses/LICENSE-2.0 | ||
| // | ||
| // Unless required by applicable law or agreed to in writing, | ||
| // software distributed under the License is distributed on | ||
| // an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY | ||
| // KIND, either express or implied. See the License for the | ||
| // specific language governing permissions and limitations | ||
| // under the License. | ||
|
|
||
| //! Integration tests for reading Paimon log tables (system tables). | ||
| //! | ||
| //! Paimon log tables are system tables that contain metadata about the table, | ||
| //! such as snapshots, manifests, schemas, etc. They are stored as Parquet files | ||
| //! and can be read using the Arrow reader. | ||
|
|
||
| use arrow_array::{Int32Array, StringArray}; | ||
| use futures::TryStreamExt; | ||
| use paimon::catalog::Identifier; | ||
| use paimon::{Catalog, FileSystemCatalog}; | ||
|
|
||
| /// Get the test warehouse path from environment variable or use default. | ||
| fn get_test_warehouse() -> String { | ||
| std::env::var("PAIMON_TEST_WAREHOUSE").unwrap_or_else(|_| "/tmp/paimon-warehouse".to_string()) | ||
| } | ||
|
|
||
| /// Test reading a table and verifying the data matches expected values. | ||
| /// | ||
| /// The table was populated with: (1, 'alice'), (2, 'bob'), (3, 'carol') | ||
| #[tokio::test] | ||
| async fn test_read_log_table() { | ||
| let warehouse = get_test_warehouse(); | ||
| let catalog = FileSystemCatalog::new(warehouse).expect("Failed to create catalog"); | ||
|
|
||
| // Get the table | ||
| let identifier = Identifier::new("default", "simple_log_table"); | ||
|
|
||
| let table = catalog | ||
| .get_table(&identifier) | ||
| .await | ||
| .expect("Failed to get table"); | ||
|
|
||
| // Scan the table | ||
| let read_builder = table.new_read_builder(); | ||
| let read = read_builder.new_read().expect("Failed to create read"); | ||
| let scan = read_builder.new_scan(); | ||
|
|
||
| let plan = scan.plan().await.expect("Failed to plan scan"); | ||
|
|
||
| // Read to Arrow | ||
| let stream = read | ||
| .to_arrow(plan.splits()) | ||
| .expect("Failed to create arrow stream"); | ||
|
|
||
| let batches: Vec<_> = stream | ||
| .try_collect() | ||
| .await | ||
| .expect("Failed to collect batches"); | ||
|
|
||
| assert!( | ||
| !batches.is_empty(), | ||
| "Expected at least one batch from table" | ||
| ); | ||
|
|
||
| // Collect all rows as (id, name) tuples | ||
| let mut actual_rows: Vec<(i32, String)> = Vec::new(); | ||
|
|
||
| for batch in &batches { | ||
| let id_array = batch | ||
| .column_by_name("id") | ||
| .and_then(|c| c.as_any().downcast_ref::<Int32Array>()) | ||
| .expect("Expected Int32Array for id column"); | ||
| let name_array = batch | ||
| .column_by_name("name") | ||
| .and_then(|c| c.as_any().downcast_ref::<StringArray>()) | ||
| .expect("Expected StringArray for name column"); | ||
|
|
||
| for i in 0..batch.num_rows() { | ||
| actual_rows.push((id_array.value(i), name_array.value(i).to_string())); | ||
| } | ||
| } | ||
|
|
||
| // Expected data: (1, 'alice'), (2, 'bob'), (3, 'carol') | ||
| let expected_rows = vec![ | ||
| (1, "alice".to_string()), | ||
| (2, "bob".to_string()), | ||
| (3, "carol".to_string()), | ||
| ]; | ||
|
|
||
| // Sort for consistent comparison | ||
| actual_rows.sort_by_key(|(id, _)| *id); | ||
|
|
||
| assert_eq!( | ||
| actual_rows, expected_rows, | ||
| "Rows should match expected values" | ||
| ); | ||
| } |
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,20 @@ | ||
| // Licensed to the Apache Software Foundation (ASF) under one | ||
| // or more contributor license agreements. See the NOTICE file | ||
| // distributed with this work for additional information | ||
| // regarding copyright ownership. The ASF licenses this file | ||
| // to you under the Apache License, Version 2.0 (the | ||
| // "License"); you may not use this file except in compliance | ||
| // with the License. You may obtain a copy of the License at | ||
| // | ||
| // http://www.apache.org/licenses/LICENSE-2.0 | ||
| // | ||
| // Unless required by applicable law or agreed to in writing, | ||
| // software distributed under the License is distributed on an | ||
| // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY | ||
| // KIND, either express or implied. See the License for the | ||
| // specific language governing permissions and limitations | ||
| // under the License. | ||
|
|
||
| mod reader; | ||
|
|
||
| pub use crate::arrow::reader::ArrowReaderBuilder; |
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,170 @@ | ||
| // Licensed to the Apache Software Foundation (ASF) under one | ||
| // or more contributor license agreements. See the NOTICE file | ||
| // distributed with this work for additional information | ||
| // regarding copyright ownership. The ASF licenses this file | ||
| // to you under the Apache License, Version 2.0 (the | ||
| // "License"); you may not use this file except in compliance | ||
| // with the License. You may obtain a copy of the License at | ||
| // | ||
| // http://www.apache.org/licenses/LICENSE-2.0 | ||
| // | ||
| // Unless required by applicable law or agreed to in writing, | ||
| // software distributed under the License is distributed on an | ||
| // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY | ||
| // KIND, either express or implied. See the License for the | ||
| // specific language governing permissions and limitations | ||
| // under the License. | ||
|
|
||
| use crate::io::{FileIO, FileRead, FileStatus}; | ||
| use crate::table::ArrowRecordBatchStream; | ||
| use crate::{DataSplit, Error}; | ||
| use async_stream::try_stream; | ||
| use bytes::Bytes; | ||
| use futures::future::BoxFuture; | ||
| use futures::{StreamExt, TryFutureExt}; | ||
| use parquet::arrow::arrow_reader::ArrowReaderOptions; | ||
| use parquet::arrow::async_reader::{AsyncFileReader, MetadataFetch}; | ||
| use parquet::arrow::ParquetRecordBatchStreamBuilder; | ||
| use parquet::file::metadata::ParquetMetaData; | ||
| use parquet::file::metadata::ParquetMetaDataReader; | ||
| use std::ops::Range; | ||
| use std::sync::Arc; | ||
| use tokio::try_join; | ||
|
|
||
| /// Builder to create ArrowReader | ||
| pub struct ArrowReaderBuilder { | ||
| batch_size: Option<usize>, | ||
| file_io: FileIO, | ||
| } | ||
|
|
||
| impl ArrowReaderBuilder { | ||
| /// Create a new ArrowReaderBuilder | ||
| pub(crate) fn new(file_io: FileIO) -> Self { | ||
| ArrowReaderBuilder { | ||
| batch_size: None, | ||
| file_io, | ||
| } | ||
| } | ||
|
|
||
| /// Build the ArrowReader. | ||
| pub fn build(self) -> ArrowReader { | ||
| ArrowReader { | ||
| batch_size: self.batch_size, | ||
| file_io: self.file_io, | ||
| } | ||
| } | ||
| } | ||
|
|
||
| /// Reads data from Parquet files | ||
| #[derive(Clone)] | ||
| pub struct ArrowReader { | ||
| batch_size: Option<usize>, | ||
| file_io: FileIO, | ||
| } | ||
|
|
||
| impl ArrowReader { | ||
| /// Take a stream of DataSplits and read every data file in each split. | ||
| /// Returns a stream of Arrow RecordBatches from all files. | ||
| pub fn read(self, data_splits: &[DataSplit]) -> crate::Result<ArrowRecordBatchStream> { | ||
| let file_io = self.file_io.clone(); | ||
| let batch_size = self.batch_size; | ||
| let paths_to_read: Vec<String> = data_splits | ||
| .iter() | ||
| .flat_map(|ds| ds.data_file_entries().map(|(p, _)| p)) | ||
| .map(|p| { | ||
| if !p.to_ascii_lowercase().ends_with(".parquet") { | ||
|
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I can get your point, only parquet is supported. But I am afraid that this check is not enough. See
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I understand the concern. However, for parquet files we normally do not expect extra compression suffixes like .gz, since compression is handled within the parquet format itself. So for the current parquet-only read path, I think checking for Btw, From the Python side, it seems we only inspect the last extension via So the current behavior there does not appear to handle compressed suffixes in a generalized way either. For parquet, this is less of a concern since we normally do not expect an extra compression suffix such as .parquet.gz. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Thanks for your very kind explanation. And thanks again for pointing out the issue in python side, I wiil check it. |
||
| Err(Error::Unsupported { | ||
| message: format!( | ||
| "unsupported file format: only .parquet is supported, got: {p}" | ||
| ), | ||
| }) | ||
| } else { | ||
| Ok(p) | ||
| } | ||
| }) | ||
| .collect::<Result<Vec<_>, _>>()?; | ||
| Ok(try_stream! { | ||
| for path_to_read in paths_to_read { | ||
| let parquet_file = file_io.new_input(&path_to_read)?; | ||
|
|
||
| let (parquet_metadata, parquet_reader) = try_join!( | ||
| parquet_file.metadata(), | ||
| parquet_file.reader() | ||
| )?; | ||
|
|
||
| let arrow_file_reader = ArrowFileReader::new(parquet_metadata, parquet_reader); | ||
|
|
||
| let mut batch_stream_builder = | ||
| ParquetRecordBatchStreamBuilder::new(arrow_file_reader) | ||
| .await?; | ||
|
|
||
| if let Some(size) = batch_size { | ||
| batch_stream_builder = batch_stream_builder.with_batch_size(size); | ||
| } | ||
|
|
||
| let mut batch_stream = batch_stream_builder.build()?; | ||
|
|
||
| while let Some(batch) = batch_stream.next().await { | ||
| yield batch? | ||
| } | ||
| } | ||
| } | ||
| .boxed()) | ||
| } | ||
| } | ||
|
|
||
| /// ArrowFileReader is a wrapper around a FileRead that impls parquets AsyncFileReader. | ||
| /// | ||
| /// # TODO | ||
| /// | ||
| /// [ParquetObjectReader](https://docs.rs/parquet/latest/src/parquet/arrow/async_reader/store.rs.html#64) | ||
| /// contains the following hints to speed up metadata loading, similar to iceberg, we can consider adding them to this struct: | ||
| /// | ||
| /// - `metadata_size_hint`: Provide a hint as to the size of the parquet file's footer. | ||
| /// - `preload_column_index`: Load the Column Index as part of [`Self::get_metadata`]. | ||
| /// - `preload_offset_index`: Load the Offset Index as part of [`Self::get_metadata`]. | ||
| struct ArrowFileReader<R: FileRead> { | ||
| meta: FileStatus, | ||
| r: R, | ||
| } | ||
|
|
||
| impl<R: FileRead> ArrowFileReader<R> { | ||
| /// Create a new ArrowFileReader | ||
| fn new(meta: FileStatus, r: R) -> Self { | ||
| Self { meta, r } | ||
| } | ||
|
|
||
| fn read_bytes(&mut self, range: Range<u64>) -> BoxFuture<'_, parquet::errors::Result<Bytes>> { | ||
| Box::pin(self.r.read(range.start..range.end).map_err(|err| { | ||
| let err_msg = format!("{err}"); | ||
| parquet::errors::ParquetError::External(err_msg.into()) | ||
| })) | ||
| } | ||
| } | ||
|
|
||
| impl<R: FileRead> MetadataFetch for ArrowFileReader<R> { | ||
| fn fetch(&mut self, range: Range<u64>) -> BoxFuture<'_, parquet::errors::Result<Bytes>> { | ||
| self.read_bytes(range) | ||
| } | ||
| } | ||
|
|
||
| impl<R: FileRead> AsyncFileReader for ArrowFileReader<R> { | ||
| fn get_bytes(&mut self, range: Range<u64>) -> BoxFuture<'_, parquet::errors::Result<Bytes>> { | ||
| self.read_bytes(range) | ||
| } | ||
|
|
||
| fn get_metadata( | ||
| &mut self, | ||
| options: Option<&ArrowReaderOptions>, | ||
| ) -> BoxFuture<'_, parquet::errors::Result<Arc<ParquetMetaData>>> { | ||
| let metadata_opts = options.map(|o| o.metadata_options().clone()); | ||
| Box::pin(async move { | ||
| let file_size = self.meta.size; | ||
| let metadata = ParquetMetaDataReader::new() | ||
| .with_metadata_options(metadata_opts) | ||
| .load_and_finish(self, file_size) | ||
| .await?; | ||
| Ok(Arc::new(metadata)) | ||
| }) | ||
| } | ||
| } | ||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
only parquet?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, let's only support parquet now. We can support more in next following pr.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
👌