feat(parquet): stream-encode definition/repetition levels incrementally#9447
feat(parquet): stream-encode definition/repetition levels incrementally#9447alamb merged 7 commits intoapache:mainfrom
Conversation
Previously, the column writer accumulated raw definition and repetition levels in `Vec<i16>` sinks (`def_levels_sink` / `rep_levels_sink`) and only RLE-encoded them in bulk at page-flush time. Replace the two sinks with streaming `LevelEncoder` fields. Levels are now encoded incrementally as each `write_batch` call arrives, so only the compact encoded bytes are held in memory at all times. At page flush, the encoder is consumed and its bytes are written directly into the page buffer; a fresh encoder is swapped in for the next page. Signed-off-by: Hippolyte Barraud <hippolyte.barraud@datadoghq.com>
Signed-off-by: Hippolyte Barraud <hippolyte.barraud@datadoghq.com>
|
run benchmark arrow_writer |
|
Benchmark for this request failed. Last 20 lines of output: Click to expand |
etseidl
left a comment
There was a problem hiding this comment.
Looks sound to me. Thanks!
|
|
||
| // Reset state. |
There was a problem hiding this comment.
| // Reset state. | |
| // Reset state. |
parquet/src/encodings/levels.rs
Outdated
| /// Computes max buffer size for level encoder/decoder based on encoding, max | ||
| /// repetition/definition level and number of total buffered values (includes null | ||
| /// values). | ||
| #[allow(dead_code)] |
There was a problem hiding this comment.
should the dead code be deprecated?
There was a problem hiding this comment.
ahh, I didn't notice that the encodings module is experimental...thought this was public. Yes, let's just remove the dead code then :)
|
I plan to benchmark on my WS since the bot seems tired |
|
run benchmark arrow_writer |
|
Benchmark for this request failed. Last 20 lines of output: Click to expand |
|
Ahh, I get it. The benchbot can't handle the remote branch also being named "main". Not much difference either way on my laptop DetailsOn my WS the times are all over the place :( The huge regressions can show up on either branch (could be too much else going on during benchmarking) Details |
alamb
left a comment
There was a problem hiding this comment.
Thank you @HippoBaro I agree with @etseidl . I think this one is ready to go. I think we can merge it and address follow ons as another PR.
Let us know what you prefer
parquet/src/column/writer/mod.rs
Outdated
| max_rep_level, | ||
| )[..], | ||
| let encoder = mem::replace( | ||
| &mut self.rep_levels_encoder, |
There was a problem hiding this comment.
This path recreates the encoders each time (and thus probably allocates, etc)
It seems like the old code path does the same thing (in encode_levels_v1, ..)
@HippoBaro would you be open to exploring adding a "clear" method to the encoder now that you have it encapsulated to save the allocation?
parquet/src/encodings/levels.rs
Outdated
| /// Computes max buffer size for level encoder/decoder based on encoding, max | ||
| /// repetition/definition level and number of total buffered values (includes null | ||
| /// values). | ||
| #[allow(dead_code)] |
Previously, flushing a data page would `mem::replace` each level encoder with a freshly allocated one, consuming the old encoder to get its buffer. This allocated new internal `Vec`s on every page boundary. We now preserve the internal state of the encoder and reuse memory across pages. Signed-off-by: Hippolyte Barraud <hippolyte.barraud@datadoghq.com>
|
@alamb @etseidl Thank you both for the review! See 0e28fba, which now preserves internal state across pages. This latest change also adds Your comment suggests we should address those in a follow-up. I’m happy to open another PR after this one. I can also remove them here if that’s more appropriate. Let me know. |
757f11c to
0e28fba
Compare
|
Apologies for the force-push there, using my fork's |
|
Thanks @HippoBaro, this looks even better now. I'm fine with simply removing the dead code in this PR. This does have me wondering if we should remove the BIT_PACKED encoder. We need a decoder for backwards compatibility, but we shouldn't ever be using it for level encoding. |
|
|
I made a fake PR to run benchmarks on: Assuming they all look good I will merge this PR |
|
🤔 the benchmarks found an issue: #9636 (comment) It doesn't seem to be introduced by this PR though. Looking more carefully |
The issue is I made a PR to disable this benchmark temporarly: |
The `v1`, `v2`, and `max_buffer_size` functions required knowing the number of values upfront and pre-allocated buffers. All callers have been migrated to the streaming variants (`v1_streaming`, `v2_streaming`), so remove the dead code and switch the last remaining caller in test utils. Signed-off-by: Hippolyte Barraud <hippolyte.barraud@datadoghq.com>
# Which issue does this PR close? - Part of #9637 # Rationale for this change I can't benchmark the arrow-writer changes in #9447 due to hitting a panic: - #9637 # What changes are included in this PR? Temporarily disable the cdc benchmarks until the underlying bug is fixed # Are these changes tested? <!-- We typically require tests for all PRs in order to: 1. Prevent the code from being accidentally broken by subsequent changes 2. Serve as another way to document the expected behavior of the code If tests are not included in your PR, please explain why (for example, are they covered by existing tests)? --> # Are there any user-facing changes? <!-- If there are user-facing changes then we may require documentation to be updated before approving the PR. If there are any breaking changes to public APIs, please call them out. -->
|
I am looking into this one trying to get CI to pass / clean (and benchmarks run) so we can merge it in. So close |
|
The benchmarks on #9636 show no noticeable difference so I think this one is good to go Thank you @HippoBaro for bearing with us and thank you @etseidl for the review |
Which issue does this PR close?
Rationale for this change
When writing a Parquet column with very sparse data,
GenericColumnWriteraccumulates unbounded memory for definition and repetition levels. The rawi16values are appended intoVec<i16>sinks on everywrite_batchcall and only RLE-encoded in bulk when a data page is flushed. For a column that is almost entirely nulls, the actual RLE-encoded output can be tiny, yet the intermediate buffer grows linearly with the number of rows.What changes are included in this PR?
Replace the two raw-level
Vec<i16>sinks (def_levels_sink/rep_levels_sink) with streamingLevelEncoderfields (def_levels_encoder/rep_levels_encoder). Behavior is the same, but we keep running RLE-encoded state rather than the full list of rows in memory. Existing logic is reused.Are these changes tested?
Yes, all tests passing.
Benchmarks show no regression.
list_primitivebenches improved by 3-5%:Are there any user-facing changes?
None. Some internal symbols are now unused. I added some
#[allow(dead_code)]statements since these were experimental-visible and might be externally relied on.