Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion crates/paimon/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,4 +24,4 @@ pub mod file_index;
pub mod io;
pub mod spec;
mod table;
pub use table::{DataSplit, Plan, Table};
pub use table::{DataSplit, DataSplitBuilder, Plan, SnapshotManager, Table, TableScan};
5 changes: 3 additions & 2 deletions crates/paimon/src/spec/manifest_entry.rs
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ pub struct ManifestEntry {
total_buckets: i32,

#[serde(rename = "_FILE")]
file: DataFileMeta,
pub(crate) file: DataFileMeta,

#[serde(rename = "_VERSION")]
version: i32,
Expand All @@ -59,7 +59,8 @@ impl ManifestEntry {
&self.kind
}

fn partition(&self) -> &Vec<u8> {
/// Partition bytes for this entry (for grouping splits).
pub fn partition(&self) -> &[u8] {
&self.partition
}

Expand Down
4 changes: 4 additions & 0 deletions crates/paimon/src/spec/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -39,9 +39,13 @@ pub use index_file_meta::*;

mod index_manifest;
mod manifest;
pub use manifest::Manifest;
mod manifest_common;
pub use manifest_common::FileKind;
mod manifest_entry;
pub use manifest_entry::ManifestEntry;
mod objects_file;
pub use objects_file::from_avro_bytes;
mod stats;
mod types;
pub use types::*;
12 changes: 11 additions & 1 deletion crates/paimon/src/table/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,11 @@

mod snapshot_manager;
mod source;
mod table_scan;

pub use source::{DataSplit, Plan};
pub use snapshot_manager::SnapshotManager;
pub use source::{DataSplit, DataSplitBuilder, Plan};
pub use table_scan::TableScan;

use crate::catalog::Identifier;
use crate::io::FileIO;
Expand Down Expand Up @@ -71,4 +74,11 @@ impl Table {
pub fn file_io(&self) -> &FileIO {
&self.file_io
}

/// Create a table scan for full table read (no incremental, no predicate).
///
/// Reference: [pypaimon TableScan](https://github.com/apache/paimon/blob/release-1.3/paimon-python/pypaimon/read/table_scan.py).
pub fn new_scan(&self) -> TableScan {
TableScan::new(self.clone())
}
}
14 changes: 9 additions & 5 deletions crates/paimon/src/table/source.rs
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ pub struct DataSplit {
partition: BinaryRow,
bucket: i32,
bucket_path: String,
total_buckets: Option<i32>,
total_buckets: i32,
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

java use nuaable Integer for potential null value. But I think it won't be null in new version paimon. So, I remove Option.
For reference: apache/paimon#5537 may be a related pr.

data_files: Vec<DataFileMeta>,
}

Expand All @@ -51,7 +51,7 @@ impl DataSplit {
pub fn bucket_path(&self) -> &str {
&self.bucket_path
}
pub fn total_buckets(&self) -> Option<i32> {
pub fn total_buckets(&self) -> i32 {
self.total_buckets
}

Expand All @@ -78,7 +78,7 @@ pub struct DataSplitBuilder {
partition: Option<BinaryRow>,
bucket: i32,
bucket_path: Option<String>,
total_buckets: Option<i32>,
total_buckets: i32,
data_files: Option<Vec<DataFileMeta>>,
}

Expand All @@ -89,7 +89,7 @@ impl DataSplitBuilder {
partition: None,
bucket: -1,
bucket_path: None,
total_buckets: None,
total_buckets: -1,
data_files: None,
}
}
Expand All @@ -110,10 +110,14 @@ impl DataSplitBuilder {
self.bucket_path = Some(bucket_path);
self
}
pub fn with_total_buckets(mut self, total_buckets: Option<i32>) -> Self {
pub fn with_total_buckets(mut self, total_buckets: i32) -> Self {
self.total_buckets = total_buckets;
self
}
pub fn with_data_files(mut self, data_files: Vec<DataFileMeta>) -> Self {
self.data_files = Some(data_files);
self
}

pub fn build(self) -> crate::Result<DataSplit> {
if self.snapshot_id == -1 {
Expand Down
187 changes: 187 additions & 0 deletions crates/paimon/src/table/table_scan.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,187 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

//! TableScan for full table scan.
//!
//! Reference: [pypaimon.read.table_scan.TableScan](https://github.com/apache/paimon/blob/release-1.3/paimon-python/pypaimon/read/table_scan.py)
//! and [FullStartingScanner](https://github.com/apache/paimon/blob/release-1.3/paimon-python/pypaimon/read/scanner/full_starting_scanner.py).

use super::Table;
use crate::io::FileIO;
use crate::spec::{BinaryRow, FileKind, ManifestEntry, Snapshot};
use crate::table::source::{DataSplitBuilder, Plan};
use crate::table::SnapshotManager;
use crate::Error;
use std::collections::{HashMap, HashSet};

/// Path segment for manifest directory under table.
const MANIFEST_DIR: &str = "manifest";

/// Reads a manifest list file (Avro) and returns manifest file metas.
async fn read_manifest_list(
file_io: &FileIO,
table_path: &str,
list_name: &str,
) -> crate::Result<Vec<crate::spec::ManifestFileMeta>> {
let path = format!(
"{}/{}/{}",
table_path.trim_end_matches('/'),
MANIFEST_DIR,
list_name
);
let input = file_io.new_input(&path)?;
if !input.exists().await? {
return Ok(Vec::new());
}
let bytes = input.read().await?;
crate::spec::from_avro_bytes::<crate::spec::ManifestFileMeta>(&bytes)
}

/// Reads all manifest entries for a snapshot (base + delta manifest lists, then each manifest file).
async fn read_all_manifest_entries(
file_io: &FileIO,
table_path: &str,
snapshot: &Snapshot,
) -> crate::Result<Vec<ManifestEntry>> {
let mut manifest_files =
read_manifest_list(file_io, table_path, snapshot.base_manifest_list()).await?;
let delta = read_manifest_list(file_io, table_path, snapshot.delta_manifest_list()).await?;
manifest_files.extend(delta);

let manifest_path_prefix = format!("{}/{}", table_path.trim_end_matches('/'), MANIFEST_DIR);
let mut all_entries = Vec::new();
// todo: consider use multiple-threads read manifest
for meta in manifest_files {
let path = format!("{}/{}", manifest_path_prefix, meta.file_name());
let entries = crate::spec::Manifest::read(file_io, &path).await?;
all_entries.extend(entries);
}
Ok(all_entries)
}

/// Merges add/delete manifest entries: keeps only ADD entries whose (partition, bucket, file_name) is not in DELETE set.
fn merge_manifest_entries(entries: Vec<ManifestEntry>) -> Vec<ManifestEntry> {
let mut deleted = HashSet::new();
let mut added = Vec::new();
for e in entries {
// follow python code to use partition, bucket, filename as duplicator
let key = (
e.partition().to_vec(),
e.bucket(),
e.file().file_name.clone(),
);
match e.kind() {
FileKind::Add => added.push(e),
FileKind::Delete => {
deleted.insert(key);
}
}
}
added
.into_iter()
.filter(|e| {
!deleted.contains(&(
e.partition().to_vec(),
e.bucket(),
e.file().file_name.clone(),
))
})
.collect()
}

/// TableScan for full table scan (no incremental, no predicate).
///
/// Reference: [pypaimon.read.table_scan.TableScan](https://github.com/apache/paimon/blob/master/paimon-python/pypaimon/read/table_scan.py)
#[derive(Debug, Clone)]
pub struct TableScan {
table: Table,
}

impl TableScan {
pub fn new(table: Table) -> Self {
Self { table }
}

/// Plan the full scan: read latest snapshot, manifest list, manifest entries, then build one DataSplit per (partition, bucket).
pub async fn plan(&self) -> crate::Result<Plan> {
let file_io = self.table.file_io();
let table_path = self.table.location();
let snapshot_manager = SnapshotManager::new(file_io.clone(), table_path.to_string());

let snapshot = match snapshot_manager.get_latest_snapshot().await? {
Some(s) => s,
None => return Ok(Plan::new(Vec::new())),
};
Self::plan_snapshot(snapshot, file_io, table_path).await
}

pub async fn plan_snapshot(
snapshot: Snapshot,
file_io: &FileIO,
table_path: &str,
) -> crate::Result<Plan> {
let entries = read_all_manifest_entries(file_io, table_path, &snapshot).await?;
let entries = merge_manifest_entries(entries);
if entries.is_empty() {
return Ok(Plan::new(Vec::new()));
}

// Group by (partition, bucket). Key = (partition_bytes, bucket).
let mut groups: HashMap<(Vec<u8>, i32), Vec<ManifestEntry>> = HashMap::new();
for e in entries {
let key = (e.partition().to_vec(), e.bucket());
groups.entry(key).or_default().push(e);
}

let snapshot_id = snapshot.id();
let base_path = table_path;
let mut splits = Vec::new();

for ((_partition, bucket), group_entries) in groups {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@luoyuxia Hi, for partitioned tables, _partition is dropped right after grouping, but the split is later built with BinaryRow::new(0) and "{table_path}/bucket-{bucket}". This is fine for unpartitioned tables, but for partitioned tables, it loses partition identity, and bucket_path also misses the partition directory prefix (k=v/...). It would be better to reconstruct the partition from the grouped partition bytes, build splits with the real partition, and generate bucket_path as partition_path/bucket-{bucket}. What do you think of this?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

as todo comment in the following code, I'll create seperate pr to support partitioned table since I want to make each pr as small as possible

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

as todo comment in the following code, I'll create seperate pr to support partitioned table since I want to make each pr as small as possible

I see. Once the tasks are separated, feel free to assign them to me—I'd be delighted to participate.

let total_buckets = group_entries
.first()
.map(|e| e.total_buckets())
.ok_or_else(|| Error::UnexpectedError {
message: format!("Manifest entry group for bucket {bucket} is empty, cannot determine total_buckets"),
source: None,
})?;
let mut data_files = Vec::new();

// currently, only group by splits by bucket
// todo: consider use binpack to generate split
for manifest_entry in group_entries {
let ManifestEntry { file, .. } = manifest_entry;
data_files.push(file);
}

// todo: consider partitioned table
let bucket_path = format!("{base_path}/bucket-{bucket}");
let partition = BinaryRow::new(0);

let split = DataSplitBuilder::new()
.with_snapshot(snapshot_id)
.with_partition(partition)
.with_bucket(bucket)
.with_bucket_path(bucket_path)
.with_total_buckets(total_buckets)
.with_data_files(data_files)
.build()?;
splits.push(split);
}
Ok(Plan::new(splits))
}
}
Loading