[AMORO-4186] Add support for Parquet row-group merging in IcebergRewriteExecutor#4188
[AMORO-4186] Add support for Parquet row-group merging in IcebergRewriteExecutor#4188zhangwl9 wants to merge 7 commits intoapache:masterfrom
Conversation
c082fa6 to
ccd5ef5
Compare
There was a problem hiding this comment.
Pull request overview
Adds an optional “zero-copy” Parquet row-group merge path for Iceberg rewrite/compaction to reduce CPU/memory overhead versus row-based rewrite, gated by table properties and safety preconditions.
Changes:
- Introduces
ParquetFileMergeRunnerto merge Parquet files viaParquetFileWriter.appendFile()and build outputDataFilemetrics. - Enhances
IcebergRewriteExecutorto detect eligibility and execute row-group merge with fallback to row-based rewrite on failure. - Adds configuration + tests, and a small Iceberg-package bridge (
ParquetIOBridge) to reuse Iceberg’s Parquet IO adapters.
Reviewed changes
Copilot reviewed 7 out of 7 changed files in this pull request and generated 6 comments.
Show a summary per file
| File | Description |
|---|---|
| docs/user-guides/configurations.md | Documents the new opt-in table property for Parquet row-group merge rewrite. |
| amoro-format-iceberg/src/test/java/org/apache/amoro/optimizing/IcebergRewriteExecutorTest.java | Adds eligibility and correctness tests for row-group merge, including negative cases. |
| amoro-format-iceberg/src/main/java/org/apache/iceberg/parquet/ParquetIOBridge.java | Bridge to access Iceberg’s package-scoped Parquet IO conversions for the merge path. |
| amoro-format-iceberg/src/main/java/org/apache/amoro/table/TableProperties.java | Adds the new table property constant and default. |
| amoro-format-iceberg/src/main/java/org/apache/amoro/optimizing/ParquetFileMergeRunner.java | New runner implementing Parquet row-group merging and output DataFile construction. |
| amoro-format-iceberg/src/main/java/org/apache/amoro/optimizing/IcebergRewriteExecutor.java | Adds eligibility checks and the row-group merge rewrite implementation with fallback. |
| amoro-format-iceberg/src/main/java/org/apache/amoro/optimizing/AbstractRewriteFilesExecutor.java | Makes rewriterDataFiles() overridable by changing visibility to protected. |
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
d09cb85 to
ad97474
Compare
Codecov Report❌ Patch coverage is
Additional details and impacted files@@ Coverage Diff @@
## master #4188 +/- ##
============================================
- Coverage 29.75% 22.94% -6.82%
+ Complexity 4258 2674 -1584
============================================
Files 677 463 -214
Lines 54744 42720 -12024
Branches 6968 6028 -940
============================================
- Hits 16288 9801 -6487
+ Misses 37246 32069 -5177
+ Partials 1210 850 -360
Flags with carried forward coverage won't be shown. Click here to find out more. ☔ View full report in Codecov by Sentry. 🚀 New features to boost your workflow:
|
There was a problem hiding this comment.
Pull request overview
Copilot reviewed 7 out of 7 changed files in this pull request and generated 3 comments.
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
ad97474 to
69ef68b
Compare
|
cc @lintingbin |
|
Hi @zhangwl9, thanks for the PR! I reviewed it against the ongoing Iceberg community work on the same feature (apache/iceberg#14435) to cross-check which concerns apply here. Sharing my findings below. Context: why not just wait for Iceberg?Iceberg #14435 is still in flight — row-lineage handling (V3 So I think shipping an Amoro-native implementation now is the right call, as a transitional impl that we can replace once Iceberg exposes a reusable core. That framing changes what "must fix before merge" means: Recommendation: scope this PR to V2 tables and keep the surface minimalIf we explicitly skip V3 row-lineage tables (fallback gate), the big Iceberg-side unknowns ( Must-fix before merge (4 items)
Deferrable (track against Iceberg's final decisions)These are real issues but either safe-by-design or minor, and fixing them now risks diverging from whatever Iceberg finalizes. I'd defer:
Nits (optional)
Positive notes
Thanks again — the direction is solid, and with the four must-fix items plus a V3 gate I think this is a good transitional implementation that we can later swap for Iceberg's core |
e7e0e04 to
2197f9e
Compare
|
@lintingbin, @zhangwl9: I don’t think the RowGroup‑merging feature will make it into the Iceberg codebase. The core issue is that while this approach merges small files and reduces the file count, it does not reduce the number of RowGroups—they remain unchanged. As a result, the performance penalty of scanning Parquet files with many RowGroups still exists, so this type of compaction doesn’t really solve the underlying problem. |
2aefa92 to
c21f478
Compare
|
@lintingbin I have fixup the Must-fix before merge (4 items) and some of the issues that can be addressed in Deferrable (track against Iceberg's final decisions) |
So can we add additional parameter thresholds? For example, use raw-group merging only for files larger than a certain MB size, while smaller files continue to use the old merging method? |
|
Thanks @pvary, that's a fair point worth addressing head-on. You're right that That said, I think there's a narrower but legitimate scenario where this is net-positive in Amoro's self-optimizing context:
So the real question is: how do we gate this so it only triggers when the trade-off is actually favorable? My earlier suggestion of a file-size threshold is in the right direction, but file size alone is a proxy — a 128MB file with thirty-two 4MB row groups is exactly the case you're worried about, and a simple size check would wave it through. Since we already open every input in
If either check fails, we fall back to row-level rewrite — the path that can actually fix row-group fragmentation. That keeps this PR scoped to cases where byte-copy is genuinely net-positive, without pretending to be a general replacement for the full rewrite. WDYT — does this framing address the concern? |
Okay, the issues I was concerned about earlier have all been fixed. Next, we can discuss the problem of having too many rowgroups together. |
|
@lintingbin: If you look at the code in the Iceberg PR, you’ll see that implementing this is quite complex. Given the limited number of use cases, it’s unlikely the Iceberg community will want to carry and maintain that level of complexity for it. |
c21f478 to
04d3b93
Compare
04d3b93 to
a4998a0
Compare
|
@lintingbin This parameter "self-optimizing.rewrite.use-parquet-row-group-merge.min-avg-row-group-bytes" is required; I have already added the necessary logic. |
9319db8 to
5e372ed
Compare
|
@xxubai @zhoujinsong If you have some free time, could you help me review the code? Thanks. |
… IcebergRewriteExecutor
31e456d to
cc0fbbb
Compare
Why are the changes needed?
Close #4186 .
Brief change log
ParquetFileMergeRunnerParquetFileWriterwith target row group sizeParquetFileWriter.appendFile()(zero-copy row-group merging)DataFilewith correct metrics viaParquetUtil.footerMetrics()IcebergRewriteExecutorenhancements:canParquetRowGroupMerge()precondition checker validates:parquetRowGroupMergeFiles()merges files respectingself-optimizing.target-sizeboundariesParquetIOBridge— Package-scoped bridge inorg.apache.iceberg.parquetthat exposes Iceberg'sParquetIO.file()conversions (package-private) for the merge path.Configuration:
self-optimizing.rewrite.use-parquet-row-group-merge.enabled(default:false)]Test:
How was this patch tested?
Add some test cases that check the changes thoroughly including negative and positive cases if possible
Add screenshots for manual tests if appropriate
Run test locally before making a pull request
Documentation