Skip to content

[AMORO-4186] Add support for Parquet row-group merging in IcebergRewriteExecutor#4188

Open
zhangwl9 wants to merge 7 commits intoapache:masterfrom
zhangwl9:AMORO-add-parquet_row_group_merge-dev
Open

[AMORO-4186] Add support for Parquet row-group merging in IcebergRewriteExecutor#4188
zhangwl9 wants to merge 7 commits intoapache:masterfrom
zhangwl9:AMORO-add-parquet_row_group_merge-dev

Conversation

@zhangwl9
Copy link
Copy Markdown
Contributor

@zhangwl9 zhangwl9 commented Apr 17, 2026

Why are the changes needed?

Close #4186 .

Brief change log

  1. ParquetFileMergeRunner

    • Initializes ParquetFileWriter with target row group size
    • Appends input files via ParquetFileWriter.appendFile() (zero-copy row-group merging)
    • Builds Iceberg DataFile with correct metrics via ParquetUtil.footerMetrics()
    • Handles failure cleanup (deletes partial output on error)
  2. IcebergRewriteExecutor enhancements:

    • canParquetRowGroupMerge() precondition checker validates:
      • Table has no sort order
      • All input files are Parquet format
      • No encrypted data files
      • No equality delete or position delete files
      • No bloom filters enabled
      • All files match current partition spec
      • All input files have consistent Parquet schemas
    • parquetRowGroupMergeFiles() merges files respecting self-optimizing.target-size boundaries
    • Falls back to row-based rewrite if any precondition fails or merge throws
  3. ParquetIOBridge — Package-scoped bridge in org.apache.iceberg.parquet that exposes Iceberg's ParquetIO.file() conversions (package-private) for the merge path.

  4. Configuration:

    • self-optimizing.rewrite.use-parquet-row-group-merge.enabled (default: false)]
  5. Test:

  • Encryption, bloom filter, sort order, spec evolution rejection cases
  • Schema mismatch detection (column addition, type promotion)
  • End-to-end merge with 5 source files → 2 output files, data integrity validation post-commit

How was this patch tested?

  • Add some test cases that check the changes thoroughly including negative and positive cases if possible

  • Add screenshots for manual tests if appropriate

  • Run test locally before making a pull request

Documentation

  • Does this pull request introduce a new feature? (yes / no)
  • If yes, how is the feature documented? (not applicable / docs / JavaDocs / not documented)

Copilot AI review requested due to automatic review settings April 17, 2026 10:04
@github-actions github-actions Bot added the type:docs Improvements or additions to documentation label Apr 17, 2026
@zhangwl9 zhangwl9 force-pushed the AMORO-add-parquet_row_group_merge-dev branch from c082fa6 to ccd5ef5 Compare April 17, 2026 10:10
Copy link
Copy Markdown

Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull request overview

Adds an optional “zero-copy” Parquet row-group merge path for Iceberg rewrite/compaction to reduce CPU/memory overhead versus row-based rewrite, gated by table properties and safety preconditions.

Changes:

  • Introduces ParquetFileMergeRunner to merge Parquet files via ParquetFileWriter.appendFile() and build output DataFile metrics.
  • Enhances IcebergRewriteExecutor to detect eligibility and execute row-group merge with fallback to row-based rewrite on failure.
  • Adds configuration + tests, and a small Iceberg-package bridge (ParquetIOBridge) to reuse Iceberg’s Parquet IO adapters.

Reviewed changes

Copilot reviewed 7 out of 7 changed files in this pull request and generated 6 comments.

Show a summary per file
File Description
docs/user-guides/configurations.md Documents the new opt-in table property for Parquet row-group merge rewrite.
amoro-format-iceberg/src/test/java/org/apache/amoro/optimizing/IcebergRewriteExecutorTest.java Adds eligibility and correctness tests for row-group merge, including negative cases.
amoro-format-iceberg/src/main/java/org/apache/iceberg/parquet/ParquetIOBridge.java Bridge to access Iceberg’s package-scoped Parquet IO conversions for the merge path.
amoro-format-iceberg/src/main/java/org/apache/amoro/table/TableProperties.java Adds the new table property constant and default.
amoro-format-iceberg/src/main/java/org/apache/amoro/optimizing/ParquetFileMergeRunner.java New runner implementing Parquet row-group merging and output DataFile construction.
amoro-format-iceberg/src/main/java/org/apache/amoro/optimizing/IcebergRewriteExecutor.java Adds eligibility checks and the row-group merge rewrite implementation with fallback.
amoro-format-iceberg/src/main/java/org/apache/amoro/optimizing/AbstractRewriteFilesExecutor.java Makes rewriterDataFiles() overridable by changing visibility to protected.

💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.

@zhangwl9 zhangwl9 force-pushed the AMORO-add-parquet_row_group_merge-dev branch 3 times, most recently from d09cb85 to ad97474 Compare April 17, 2026 11:16
@codecov-commenter
Copy link
Copy Markdown

Codecov Report

❌ Patch coverage is 76.19048% with 45 lines in your changes missing coverage. Please review.
✅ Project coverage is 22.94%. Comparing base (b7f7de3) to head (ad97474).
⚠️ Report is 3 commits behind head on master.

Files with missing lines Patch % Lines
...pache/amoro/optimizing/IcebergRewriteExecutor.java 82.17% 12 Missing and 11 partials ⚠️
...pache/amoro/optimizing/ParquetFileMergeRunner.java 62.06% 18 Missing and 4 partials ⚠️

❗ There is a different number of reports uploaded between BASE (b7f7de3) and HEAD (ad97474). Click for more details.

HEAD has 1 upload less than BASE
Flag BASE (b7f7de3) HEAD (ad97474)
core 1 0
Additional details and impacted files
@@             Coverage Diff              @@
##             master    #4188      +/-   ##
============================================
- Coverage     29.75%   22.94%   -6.82%     
+ Complexity     4258     2674    -1584     
============================================
  Files           677      463     -214     
  Lines         54744    42720   -12024     
  Branches       6968     6028     -940     
============================================
- Hits          16288     9801    -6487     
+ Misses        37246    32069    -5177     
+ Partials       1210      850     -360     
Flag Coverage Δ
core ?
trino 22.94% <76.19%> (?)

Flags with carried forward coverage won't be shown. Click here to find out more.

☔ View full report in Codecov by Sentry.
📢 Have feedback on the report? Share it here.

🚀 New features to boost your workflow:
  • ❄️ Test Analytics: Detect flaky tests, report on failures, and find test suite problems.
  • 📦 JS Bundle Analysis: Save yourself from yourself by tracking and limiting bundle sizes in JS merges.

Copy link
Copy Markdown

Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull request overview

Copilot reviewed 7 out of 7 changed files in this pull request and generated 3 comments.


💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.

@zhangwl9 zhangwl9 force-pushed the AMORO-add-parquet_row_group_merge-dev branch from ad97474 to 69ef68b Compare April 20, 2026 01:43
@klion26
Copy link
Copy Markdown
Member

klion26 commented Apr 20, 2026

cc @lintingbin

@lintingbin
Copy link
Copy Markdown
Contributor

Hi @zhangwl9, thanks for the PR! I reviewed it against the ongoing Iceberg community work on the same feature (apache/iceberg#14435) to cross-check which concerns apply here. Sharing my findings below.

Context: why not just wait for Iceberg?

Iceberg #14435 is still in flight — row-lineage handling (V3 _row_id / _last_updated_sequence_number), API shape, and the expectedOutputFiles==1 question are all unresolved as of the latest round of comments. Even once it lands, its entry point is SparkParquetFileMergeRunner (driver/executor split, TaskContext.taskAttemptId()), which Amoro's engine-agnostic optimizer cannot reuse directly. @pvary suggested moving the checks/merge core into a common ParquetFileMerger in iceberg-core so Flink and others can share it, but that refactor hasn't landed yet either.

So I think shipping an Amoro-native implementation now is the right call, as a transitional impl that we can replace once Iceberg exposes a reusable core. That framing changes what "must fix before merge" means:

Recommendation: scope this PR to V2 tables and keep the surface minimal

If we explicitly skip V3 row-lineage tables (fallback gate), the big Iceberg-side unknowns (_row_id preservation, _last_updated_sequence_number inheritance, null-stats edge cases) simply don't apply to us, and we can ship a much tighter patch.

Must-fix before merge (4 items)

  1. Add a V3 row-lineage gate. If the table is V3 and row lineage is enabled, return false from canParquetRowGroupMerge() so we fall back. This keeps us out of territory the Iceberg PR hasn't finalized.

  2. parquetRowGroupMergeFiles() should call this.targetSize(), not super.targetSize(). IcebergRewriteExecutor.targetSize() already overrides to return Long.MAX_VALUE when the input total is below target (see #3645). Bypassing the override makes the row-group-merge path split into multiple files where the row-based path would consolidate into one — inconsistent with [Improvement]: Optimize target file size after self-optimizing #3645's intent.

  3. Clean up already-written output files when fallback is triggered. Once a rolled-over ParquetFileMergeRunner.result() has succeeded, the corresponding DataFile is appended to outputFiles; if a later runner throws and we catch → super.rewriterDataFiles(), those earlier physical files are never deleted. ParquetFileMergeRunner.close() only handles the current runner. Please add a best-effort io.deleteFile() sweep over the accumulated outputFiles in the catch/finally.

  4. hasNoBloomFilter() misses the global switch. It only inspects write.parquet.bloom-filter-enabled.column.* (column prefix). Iceberg also supports the table-wide write.parquet.bloom-filter-enabled.default — tables with that on would silently lose bloom filters after merge. Please check both.

Deferrable (track against Iceberg's final decisions)

These are real issues but either safe-by-design or minor, and fixing them now risks diverging from whatever Iceberg finalizes. I'd defer:

  • Schema comparison via MessageType.equals() — Iceberg comments argue for field-id-based comparison. MessageType.equals() is stricter (more false negatives → more fallbacks), so it errs on the safe side. Align with Iceberg once the upstream shape stabilizes.
  • Deprecated 7-arg ParquetFileWriter constructor — same issue Guosmilesmile flagged on the Iceberg PR; they ended up wrapping Iceberg's own ParquetWriter. Works today; worth replacing when we pull their approach.
  • columnIndexTruncateLength passed as null — Parquet falls back to an internal default. Safe, but should use ParquetProperties.DEFAULT_COLUMN_INDEX_TRUNCATE_LENGTH once we touch this area again.
  • Each input file is opened twice (once in checkSchemaAndBuildContext, once by ParquetFileWriter.appendFile) — perf concern on object stores, not correctness. Iceberg's final form propagates the parsed metadata to avoid re-reads.
  • rowGroupSize param is a no-op for appendFile() — byte-level row-group copies preserve the source file's row-group size. Please add a note in configurations.md so users don't expect this property to resize groups after merge.
  • Schema check happens inside parquetRowGroupMergeFiles() and throws IllegalStateException (see testParquetRowGroupMergeWithSchemaDifferent) — functionally fine today because no writer is open yet when the check returns null, but it's fragile. A small refactor to move schema validation into the gate would eliminate the try/catch-and-retry shape entirely.
  • No per-input-file size guard. The gate doesn't reject inputs that are already ≥ target size, so we still byte-copy them via appendFile. On the Iceberg PR, pvary also pointed out that row-group merge shouldn't be used to split big files into smaller ones. Low impact (copy is cheap) but a cheap gate (e.g. skip if a single input already exceeds targetSize * 0.9) would avoid unnecessary work and align with the Iceberg discussion.

Nits (optional)

  • checkCondition logs skip reasons at DEBUG — consider one aggregated INFO line per task so operators can see why the merge path was declined.
  • ParquetIOBridge could get a // TODO: remove once Iceberg exposes ParquetIO.file() as public pointing at #14435.
  • AbstractRewriteFilesExecutor.rewriterDataFiles() becoming protected extends the subclass API — consider a named hook like rewriteDataFilesRowBased() instead.
  • MetricsConfig.forTable(table.asUnkeyedTable()) — worth double-checking behavior if the parent MixedTable is keyed; encryptionManager() in the base class has a keyed/unkeyed split for this reason.

Positive notes

  • Gate is conservative in the right way — encryption, deletes, sort order, spec evolution, format, bloom filter (modulo [Feature][Flink] Introducing the INSERT OVERWRITE statement for mixed-streaming format tables. #4 above) are all caught.
  • Fallback wrapping (try/catch Exception → super.rewriterDataFiles()) gives a safety net even if a gate misses something.
  • Test coverage mirrors the Iceberg discussion well (encryption, bloom filter, sort order, spec evolution, schema mismatch, end-to-end split-and-commit data integrity).

Thanks again — the direction is solid, and with the four must-fix items plus a V3 gate I think this is a good transitional implementation that we can later swap for Iceberg's core ParquetFileMerger once it lands.

@zhangwl9 zhangwl9 force-pushed the AMORO-add-parquet_row_group_merge-dev branch 2 times, most recently from e7e0e04 to 2197f9e Compare April 20, 2026 04:51
@pvary
Copy link
Copy Markdown

pvary commented Apr 20, 2026

@lintingbin, @zhangwl9: I don’t think the RowGroup‑merging feature will make it into the Iceberg codebase. The core issue is that while this approach merges small files and reduces the file count, it does not reduce the number of RowGroups—they remain unchanged. As a result, the performance penalty of scanning Parquet files with many RowGroups still exists, so this type of compaction doesn’t really solve the underlying problem.

@zhangwl9 zhangwl9 force-pushed the AMORO-add-parquet_row_group_merge-dev branch 6 times, most recently from 2aefa92 to c21f478 Compare April 20, 2026 09:10
@zhangwl9
Copy link
Copy Markdown
Contributor Author

@lintingbin I have fixup the Must-fix before merge (4 items) and some of the issues that can be addressed in Deferrable (track against Iceberg's final decisions)

@lintingbin
Copy link
Copy Markdown
Contributor

@lintingbin, @zhangwl9: I don’t think the RowGroup‑merging feature will make it into the Iceberg codebase. The core issue is that while this approach merges small files and reduces the file count, it does not reduce the number of RowGroups—they remain unchanged. As a result, the performance penalty of scanning Parquet files with many RowGroups still exists, so this type of compaction doesn’t really solve the underlying problem.

So can we add additional parameter thresholds? For example, use raw-group merging only for files larger than a certain MB size, while smaller files continue to use the old merging method?

@lintingbin
Copy link
Copy Markdown
Contributor

Thanks @pvary, that's a fair point worth addressing head-on.

You're right that appendFile preserves row-group layout byte-for-byte, so the merged output keeps the same number of row groups as the sum of its inputs. Tables that suffer from row-group-level fragmentation (many undersized row groups, poor vectorized scan throughput, extra page-index overhead) won't see query-side improvement from this path — a full row-level rewrite is still the right tool there.

That said, I think there's a narrower but legitimate scenario where this is net-positive in Amoro's self-optimizing context:

  1. File-count / manifest reduction is a primary goal in its own right. Streaming ingestion in Amoro commonly produces 10s-100s of small files per partition per hour. Manifest bloat and Iceberg planning cost scale with file count well before a query even starts. For users whose writers already produce reasonably-sized row groups (e.g. Flink checkpoint output, batch writes), the bottleneck they actually want to relieve is file count, not row-group count.

  2. Byte-copy is an order of magnitude cheaper than re-encode. For tables that compact frequently under tight CPU budget, trading "perfect row-group layout" for "much lower compaction cost" is a reasonable opt-in choice — as long as it's opt-in and the trade-off is understood.

So the real question is: how do we gate this so it only triggers when the trade-off is actually favorable?

My earlier suggestion of a file-size threshold is in the right direction, but file size alone is a proxy — a 128MB file with thirty-two 4MB row groups is exactly the case you're worried about, and a simple size check would wave it through. Since we already open every input in checkSchemaAndBuildContext() to read the footer, footer.getBlocks() is on hand for free. I'd propose gating on the actual row-group layout instead:

  • Average row-group size of input files ≥ threshold (e.g. a configurable self-optimizing.rewrite.parquet-row-group-merge.min-avg-row-group-bytes), and/or
  • Cap on total row-group count in the merged output (so concatenating 100 well-formed inputs doesn't produce a 100-row-group file on the other side).

If either check fails, we fall back to row-level rewrite — the path that can actually fix row-group fragmentation. That keeps this PR scoped to cases where byte-copy is genuinely net-positive, without pretending to be a general replacement for the full rewrite.

WDYT — does this framing address the concern?

@lintingbin
Copy link
Copy Markdown
Contributor

@lintingbin I have fixup the Must-fix before merge (4 items) and some of the issues that can be addressed in Deferrable (track against Iceberg's final decisions)

Okay, the issues I was concerned about earlier have all been fixed. Next, we can discuss the problem of having too many rowgroups together.

@pvary
Copy link
Copy Markdown

pvary commented Apr 20, 2026

@lintingbin: If you look at the code in the Iceberg PR, you’ll see that implementing this is quite complex. Given the limited number of use cases, it’s unlikely the Iceberg community will want to carry and maintain that level of complexity for it.

@zhangwl9 zhangwl9 force-pushed the AMORO-add-parquet_row_group_merge-dev branch from c21f478 to 04d3b93 Compare April 21, 2026 01:49
@zhangwl9 zhangwl9 force-pushed the AMORO-add-parquet_row_group_merge-dev branch from 04d3b93 to a4998a0 Compare April 21, 2026 01:50
@zhangwl9
Copy link
Copy Markdown
Contributor Author

@lintingbin This parameter "self-optimizing.rewrite.use-parquet-row-group-merge.min-avg-row-group-bytes" is required; I have already added the necessary logic.

@zhangwl9 zhangwl9 force-pushed the AMORO-add-parquet_row_group_merge-dev branch 2 times, most recently from 9319db8 to 5e372ed Compare April 21, 2026 08:36
@zhangwl9
Copy link
Copy Markdown
Contributor Author

zhangwl9 commented Apr 22, 2026

@xxubai @zhoujinsong If you have some free time, could you help me review the code? Thanks.

@zhangwl9 zhangwl9 force-pushed the AMORO-add-parquet_row_group_merge-dev branch from 31e456d to cc0fbbb Compare April 23, 2026 09:39
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

type:docs Improvements or additions to documentation

Projects

None yet

Development

Successfully merging this pull request may close these issues.

[Improvement]: Implement row group merging using ParquetFileWriter#appendFile

6 participants