diff --git a/backends-velox/src/main/scala/org/apache/spark/shuffle/ColumnarShuffleWriter.scala b/backends-velox/src/main/scala/org/apache/spark/shuffle/ColumnarShuffleWriter.scala index ef9877011b5d..01f4bd06bab6 100644 --- a/backends-velox/src/main/scala/org/apache/spark/shuffle/ColumnarShuffleWriter.scala +++ b/backends-velox/src/main/scala/org/apache/spark/shuffle/ColumnarShuffleWriter.scala @@ -156,7 +156,8 @@ class ColumnarShuffleWriter[K, V]( conf.get(SHUFFLE_FILE_BUFFER_SIZE).toInt, tempDataFile.getAbsolutePath, localDirs, - GlutenConfig.get.columnarShuffleEnableDictionary + GlutenConfig.get.columnarShuffleEnableDictionary, + GlutenConfig.get.columnarShuffleEnableTypeAwareCompress ) nativeShuffleWriter = if (isSort) { diff --git a/cpp/core/CMakeLists.txt b/cpp/core/CMakeLists.txt index 58dd301b6968..e16464b81110 100644 --- a/cpp/core/CMakeLists.txt +++ b/cpp/core/CMakeLists.txt @@ -143,6 +143,8 @@ set(SPARK_COLUMNAR_PLUGIN_SRCS shuffle/Spill.cc shuffle/Utils.cc utils/Compression.cc + utils/tac/FForCodec.cc + utils/tac/TypeAwareCompressCodec.cc utils/StringUtil.cc utils/ObjectStore.cc jni/JniError.cc diff --git a/cpp/core/jni/JniWrapper.cc b/cpp/core/jni/JniWrapper.cc index c8cd3adef457..95c15abfa5ad 100644 --- a/cpp/core/jni/JniWrapper.cc +++ b/cpp/core/jni/JniWrapper.cc @@ -945,7 +945,8 @@ Java_org_apache_gluten_vectorized_LocalPartitionWriterJniWrapper_createPartition jint shuffleFileBufferSize, jstring dataFileJstr, jstring localDirsJstr, - jboolean enableDictionary) { + jboolean enableDictionary, + jboolean enableTypeAwareCompress) { JNI_METHOD_START const auto ctx = getRuntime(env, wrapper); @@ -960,7 +961,8 @@ Java_org_apache_gluten_vectorized_LocalPartitionWriterJniWrapper_createPartition mergeBufferSize, mergeThreshold, numSubDirs, - enableDictionary); + enableDictionary, + enableTypeAwareCompress); auto partitionWriter = std::make_shared( numPartitions, diff --git a/cpp/core/shuffle/LocalPartitionWriter.h b/cpp/core/shuffle/LocalPartitionWriter.h index 113e5a3cfd2f..5f199ffb8e48 100644 --- a/cpp/core/shuffle/LocalPartitionWriter.h +++ b/cpp/core/shuffle/LocalPartitionWriter.h @@ -54,6 +54,12 @@ class LocalPartitionWriter : public PartitionWriter { return arrow::Status::NotImplemented("Invalid code path for local shuffle writer."); } + bool enableTypeAwareCompress() const override { + // Type-aware compression is not compatible with dictionary encoding + // since it may change the buffer layout and types. + return options_->enableTypeAwareCompress && !options_->enableDictionary; + } + /// The stop function performs several tasks: /// 1. Opens the final data file. /// 2. Iterates over each partition ID (pid) to: diff --git a/cpp/core/shuffle/Options.h b/cpp/core/shuffle/Options.h index 2139c6e9d724..1d7f9ad9f9c7 100644 --- a/cpp/core/shuffle/Options.h +++ b/cpp/core/shuffle/Options.h @@ -42,6 +42,7 @@ static constexpr int64_t kDefaultReadBufferSize = 1 << 20; static constexpr int64_t kDefaultDeserializerBufferSize = 1 << 20; static constexpr int64_t kDefaultShuffleFileBufferSize = 32 << 10; static constexpr bool kDefaultEnableDictionary = false; +static constexpr bool kDefaultEnableTypeAwareCompress = false; enum class ShuffleWriterType { kHashShuffle, kSortShuffle, kRssSortShuffle, kGpuHashShuffle }; @@ -175,6 +176,7 @@ struct LocalPartitionWriterOptions { int32_t numSubDirs = kDefaultNumSubDirs; // spark.diskStore.subDirectories bool enableDictionary = kDefaultEnableDictionary; + bool enableTypeAwareCompress = kDefaultEnableTypeAwareCompress; LocalPartitionWriterOptions() = default; @@ -185,14 +187,16 @@ struct LocalPartitionWriterOptions { int32_t mergeBufferSize, double mergeThreshold, int32_t numSubDirs, - bool enableDictionary) + bool enableDictionary, + bool enableTypeAwareCompress = kDefaultEnableTypeAwareCompress) : shuffleFileBufferSize(shuffleFileBufferSize), compressionBufferSize(compressionBufferSize), compressionThreshold(compressionThreshold), mergeBufferSize(mergeBufferSize), mergeThreshold(mergeThreshold), numSubDirs(numSubDirs), - enableDictionary(enableDictionary) {} + enableDictionary(enableDictionary), + enableTypeAwareCompress(enableTypeAwareCompress) {} }; struct RssPartitionWriterOptions { diff --git a/cpp/core/shuffle/PartitionWriter.h b/cpp/core/shuffle/PartitionWriter.h index ebb86004cb88..1ebf7d66a7f9 100644 --- a/cpp/core/shuffle/PartitionWriter.h +++ b/cpp/core/shuffle/PartitionWriter.h @@ -71,6 +71,10 @@ class PartitionWriter : public Reclaimable { return payloadPool_->bytes_allocated(); } + virtual bool enableTypeAwareCompress() const { + return false; + } + protected: uint32_t numPartitions_; std::unique_ptr codec_; diff --git a/cpp/core/shuffle/Payload.cc b/cpp/core/shuffle/Payload.cc index 27e7d9c85cbb..7c7282d9697b 100644 --- a/cpp/core/shuffle/Payload.cc +++ b/cpp/core/shuffle/Payload.cc @@ -26,6 +26,7 @@ #include "shuffle/Utils.h" #include "utils/Exception.h" #include "utils/Timer.h" +#include "utils/tac/TypeAwareCompressCodec.h" namespace gluten { namespace { @@ -36,6 +37,7 @@ static const Payload::Type kUncompressedType = gluten::BlockPayload::kUncompress static constexpr int64_t kZeroLengthBuffer = 0; static constexpr int64_t kNullBuffer = -1; static constexpr int64_t kUncompressedBuffer = -2; +static constexpr int64_t kTypeAwareBuffer = -3; template void write(uint8_t** dst, T data) { @@ -86,6 +88,51 @@ arrow::Result compressBuffer( return kCompressedBufferHeaderLength + compressedLength; } +// Type-aware buffer compression via TypeAwareCompressCodec. +// Same wire format as compressBuffer: +// kTypeAwareBuffer (int64) | uncompressedLength (int64) | compressedLength (int64) | compressed data +// If compressed size >= uncompressed size, falls back to kUncompressedBuffer (same as standard codec). +arrow::Result compressTypeAwareBuffer( + const std::shared_ptr& buffer, + uint8_t* output, + int64_t outputLength, + int8_t typeKind) { + auto outputPtr = &output; + if (!buffer) { + write(outputPtr, kNullBuffer); + return sizeof(int64_t); + } + if (buffer->size() == 0) { + write(outputPtr, kZeroLengthBuffer); + return sizeof(int64_t); + } + + static const int64_t kHeaderLength = 3 * sizeof(int64_t); // marker + uncompressedLen + compressedLen + if (outputLength < kHeaderLength + buffer->size()) { + return arrow::Status::Invalid("Output buffer too small for type-aware compression."); + } + auto* dataOutput = output + kHeaderLength; + auto availableOutput = outputLength - kHeaderLength; + + ARROW_ASSIGN_OR_RAISE( + auto compressedSize, + TypeAwareCompressCodec::compress(buffer->data(), buffer->size(), dataOutput, availableOutput, typeKind)); + + if (compressedSize >= buffer->size()) { + // Compression didn't help. Fall back to uncompressed, same as compressBuffer. + write(outputPtr, kUncompressedBuffer); + write(outputPtr, static_cast(buffer->size())); + memcpy(*outputPtr, buffer->data(), buffer->size()); + return 2 * sizeof(int64_t) + buffer->size(); + } + + write(outputPtr, kTypeAwareBuffer); + write(outputPtr, static_cast(buffer->size())); + write(outputPtr, static_cast(compressedSize)); + // compressed data already written at dataOutput by TypeAwareCompressCodec::compress. + return kHeaderLength + compressedSize; +} + arrow::Status compressAndFlush( const std::shared_ptr& buffer, arrow::io::OutputStream* outputStream, @@ -146,6 +193,24 @@ arrow::Result> readCompressedBuffer( int64_t uncompressedLength; RETURN_NOT_OK(inputStream->Read(sizeof(int64_t), &uncompressedLength)); + + if (compressedLength == kTypeAwareBuffer) { + // Type-aware compressed buffer. This marker only appears when compression helped. + // Wire format: compressedLength (int64) already consumed above as kTypeAwareBuffer, + // then uncompressedLength (already read), then actualCompressedLen, then data. + int64_t actualCompressedLen; + RETURN_NOT_OK(inputStream->Read(sizeof(int64_t), &actualCompressedLen)); + ARROW_ASSIGN_OR_RAISE(auto compressed, arrow::AllocateResizableBuffer(actualCompressedLen, pool)); + RETURN_NOT_OK(inputStream->Read(actualCompressedLen, compressed->mutable_data())); + + timer.switchTo(&decompressTime); + ARROW_ASSIGN_OR_RAISE(auto output, arrow::AllocateResizableBuffer(uncompressedLength, pool)); + RETURN_NOT_OK(TypeAwareCompressCodec::decompress( + compressed->data(), actualCompressedLen, output->mutable_data(), uncompressedLength) + .status()); + return output; + } + if (compressedLength == kUncompressedBuffer) { ARROW_ASSIGN_OR_RAISE(auto uncompressed, arrow::AllocateResizableBuffer(uncompressedLength, pool)); RETURN_NOT_OK(inputStream->Read(uncompressedLength, uncompressed->mutable_data())); @@ -185,25 +250,38 @@ arrow::Result> BlockPayload::fromBuffers( std::vector> buffers, const std::vector* isValidityBuffer, arrow::MemoryPool* pool, - arrow::util::Codec* codec) { + arrow::util::Codec* codec, + const std::vector* bufferTypes) { const uint32_t numBuffers = buffers.size(); if (payloadType == Payload::Type::kCompressed) { Timer compressionTime; compressionTime.start(); - // Compress. - auto maxLength = maxCompressedLength(buffers, codec); - std::shared_ptr compressedBuffer; + // Compute max compressed length, accounting for type-aware compression where applicable. + auto maxLength = maxCompressedLength(buffers, codec, bufferTypes); + + std::shared_ptr compressedBuffer; ARROW_ASSIGN_OR_RAISE(compressedBuffer, arrow::AllocateResizableBuffer(maxLength, pool)); auto* output = compressedBuffer->mutable_data(); int64_t actualLength = 0; // Compress buffers one by one. - for (auto& buffer : buffers) { + for (size_t i = 0; i < buffers.size(); ++i) { auto availableLength = maxLength - actualLength; - // Release buffer after compression. - ARROW_ASSIGN_OR_RAISE(auto compressedSize, compressBuffer(std::move(buffer), output, availableLength, codec)); + auto typeKind = + (bufferTypes != nullptr && i < bufferTypes->size()) ? (*bufferTypes)[i] : tac::kUnsupported; + + int64_t compressedSize = 0; + if (TypeAwareCompressCodec::support(typeKind)) { + // Use type-aware compression for supported types. + ARROW_ASSIGN_OR_RAISE( + compressedSize, compressTypeAwareBuffer(std::move(buffers[i]), output, availableLength, typeKind)); + } else { + // Use standard codec (LZ4/ZSTD) for unsupported types. + ARROW_ASSIGN_OR_RAISE( + compressedSize, compressBuffer(std::move(buffers[i]), output, availableLength, codec)); + } output += compressedSize; actualLength += compressedSize; } @@ -327,16 +405,29 @@ int64_t BlockPayload::rawSize() { int64_t BlockPayload::maxCompressedLength( const std::vector>& buffers, - arrow::util::Codec* codec) { + arrow::util::Codec* codec, + const std::vector* bufferTypes) { // Compressed buffer layout: | buffer1 compressedLength | buffer1 uncompressedLength | buffer1 | ... - const auto metadataLength = sizeof(int64_t) * 2 * buffers.size(); - int64_t totalCompressedLength = - std::accumulate(buffers.begin(), buffers.end(), 0LL, [&](auto sum, const auto& buffer) { - if (!buffer) { - return sum; - } - return sum + codec->MaxCompressedLen(buffer->size(), buffer->data()); - }); + int64_t metadataLength = sizeof(int64_t) * 2 * buffers.size(); + int64_t totalCompressedLength = 0; + for (size_t i = 0; i < buffers.size(); ++i) { + const auto& buffer = buffers[i]; + if (!buffer) { + continue; + } + if (bufferTypes != nullptr && i < bufferTypes->size()) { + auto typeKind = (*bufferTypes)[i]; + if (TypeAwareCompressCodec::support(typeKind)) { + // Type-aware compressed buffer has an extra int64 marker to indicate type-aware compression. + // buffer layout: | kTypeAwareBuffer (int64) | buffer 1 uncompressedLength | buffer 1 compressedLength | buffer 1 | ... + metadataLength += sizeof(int64_t); + totalCompressedLength += TypeAwareCompressCodec::maxCompressedLen(buffer->size(), typeKind); + continue; + } + } + // Standard codec: compressed data. + totalCompressedLength += codec->MaxCompressedLen(buffer->size(), buffer->data()); + } return metadataLength + totalCompressedLength; } @@ -413,12 +504,14 @@ arrow::Result> InMemoryPayload::merge( } } } - return std::make_unique(mergedRows, isValidityBuffer, source->schema(), std::move(merged)); + return std::make_unique( + mergedRows, isValidityBuffer, source->schema(), std::move(merged), false, source->bufferTypes_); } arrow::Result> InMemoryPayload::toBlockPayload(Payload::Type payloadType, arrow::MemoryPool* pool, arrow::util::Codec* codec) { - return BlockPayload::fromBuffers(payloadType, numRows_, std::move(buffers_), isValidityBuffer_, pool, codec); + return BlockPayload::fromBuffers( + payloadType, numRows_, std::move(buffers_), isValidityBuffer_, pool, codec, bufferTypes_); } arrow::Status InMemoryPayload::serialize(arrow::io::OutputStream* outputStream) { diff --git a/cpp/core/shuffle/Payload.h b/cpp/core/shuffle/Payload.h index dfe98cafd563..eb130c637586 100644 --- a/cpp/core/shuffle/Payload.h +++ b/cpp/core/shuffle/Payload.h @@ -79,7 +79,8 @@ class BlockPayload final : public Payload { std::vector> buffers, const std::vector* isValidityBuffer, arrow::MemoryPool* pool, - arrow::util::Codec* codec); + arrow::util::Codec* codec, + const std::vector* bufferTypes = nullptr); static arrow::Result>> deserialize( arrow::io::InputStream* inputStream, @@ -91,7 +92,8 @@ class BlockPayload final : public Payload { static int64_t maxCompressedLength( const std::vector>& buffers, - arrow::util::Codec* codec); + arrow::util::Codec* codec, + const std::vector* bufferTypes = nullptr); arrow::Status serialize(arrow::io::OutputStream* outputStream) override; @@ -121,11 +123,13 @@ class InMemoryPayload final : public Payload { const std::vector* isValidityBuffer, const std::shared_ptr& schema, std::vector> buffers, - bool hasComplexType = false) + bool hasComplexType = false, + const std::vector* bufferTypes = nullptr) : Payload(Type::kUncompressed, numRows, isValidityBuffer), schema_(schema), buffers_(std::move(buffers)), - hasComplexType_(hasComplexType) {} + hasComplexType_(hasComplexType), + bufferTypes_(bufferTypes) {} static arrow::Result> merge(std::unique_ptr source, std::unique_ptr append, arrow::MemoryPool* pool); @@ -155,6 +159,7 @@ class InMemoryPayload final : public Payload { std::shared_ptr schema_; std::vector> buffers_; bool hasComplexType_; + const std::vector* bufferTypes_; }; class UncompressedDiskBlockPayload final : public Payload { diff --git a/cpp/core/tests/CMakeLists.txt b/cpp/core/tests/CMakeLists.txt index 5bd34c77360d..33026948ce54 100644 --- a/cpp/core/tests/CMakeLists.txt +++ b/cpp/core/tests/CMakeLists.txt @@ -16,3 +16,4 @@ add_test_case(round_robin_partitioner_test SOURCES RoundRobinPartitionerTest.cc) add_test_case(object_store_test SOURCES ObjectStoreTest.cc) add_test_case(memory_allocator_test SOURCES MemoryAllocatorTest.cc) +add_test_case(ffor_codec_test SOURCES FForCodecTest.cc) diff --git a/cpp/core/tests/FForCodecTest.cc b/cpp/core/tests/FForCodecTest.cc new file mode 100644 index 000000000000..3c0f15632821 --- /dev/null +++ b/cpp/core/tests/FForCodecTest.cc @@ -0,0 +1,647 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include "utils/tac/FForCodec.h" +#include "utils/tac/TypeAwareCompressCodec.h" +#include "utils/tac/ffor.hpp" + +#include +#include +#include +#include + +using namespace gluten::ffor; +using namespace gluten; + +namespace { + +// Some non-TAC type values for negative testing. +static constexpr int8_t kSomeUnsupportedType = 99; + +std::vector genData(size_t n, uint64_t base, uint64_t range, uint64_t seed = 42) { + std::mt19937_64 rng(seed); + std::uniform_int_distribution dist(0, range); + std::vector data(n); + for (size_t i = 0; i < n; ++i) { + data[i] = base + dist(rng); + } + return data; +} + +std::vector padToLanes(const std::vector& data) { + size_t padded = (data.size() + kLanes - 1) / kLanes * kLanes; + auto result = data; + result.resize(padded, data.empty() ? 0 : data.back()); + return result; +} + +template +void roundtripTest(const uint64_t* data, size_t n, uint64_t base) { + size_t nPadded = (n + kLanes - 1) / kLanes * kLanes; + size_t compN = compressedWords(nPadded, BW); + + std::vector encoded(compN + kLanes, 0xDEADBEEFDEADBEEF); + std::vector decoded(nPadded, 0xDEADBEEFDEADBEEF); + + encode(data, encoded.data(), base, nPadded); + decode(encoded.data(), decoded.data(), base, nPadded); + + for (size_t i = 0; i < n; ++i) { + ASSERT_EQ(decoded[i], data[i]) << "Mismatch at index " << i; + } +} + +void roundtripTestRt(const uint64_t* data, size_t n, uint64_t base, unsigned bw) { + size_t nPadded = (n + kLanes - 1) / kLanes * kLanes; + size_t compN = compressedWords(nPadded, bw); + + std::vector encoded(compN + kLanes, 0); + std::vector decoded(nPadded, 0); + + encodeRt(data, encoded.data(), base, nPadded, bw); + decodeRt(encoded.data(), decoded.data(), base, nPadded, bw); + + for (size_t i = 0; i < n; ++i) { + ASSERT_EQ(decoded[i], data[i]) << "Mismatch at index " << i; + } +} + +void compressRoundtrip(const uint64_t* data, size_t num) { + std::vector buf(compress64Bound(num)); + size_t written = compress64(data, num, buf.data()); + + std::vector decoded(num); + size_t nDecoded = decompress64(buf.data(), written, decoded.data()); + + ASSERT_EQ(nDecoded, num); + for (size_t i = 0; i < num; ++i) { + ASSERT_EQ(decoded[i], data[i]) << "Mismatch at index " << i; + } +} + +} // namespace + +// Low-level encode/decode tests + +TEST(FForTest, Bw0Constant) { + std::vector data(256, 12345); + roundtripTest<0>(data.data(), data.size(), 12345); +} + +TEST(FForTest, Bw1Binary) { + auto data = padToLanes(genData(256, 100, 1)); + uint64_t base; + unsigned bw; + analyze(data.data(), data.size(), base, bw); + ASSERT_EQ(bw, 1u); + roundtripTest<1>(data.data(), data.size(), base); +} + +TEST(FForTest, Bw6Narrow) { + auto data = padToLanes(genData(1024, 1000, 63)); + roundtripTest<6>(data.data(), data.size(), 1000); +} + +TEST(FForTest, Bw16Medium) { + auto data = padToLanes(genData(1024, 50000, 65535)); + roundtripTest<16>(data.data(), data.size(), 50000); +} + +TEST(FForTest, Bw32Wide) { + auto data = padToLanes(genData(512, 1000000, (1ULL << 32) - 1)); + roundtripTest<32>(data.data(), data.size(), 1000000); +} + +TEST(FForTest, Bw64FullRange) { + auto data = padToLanes(genData(256, 0, UINT64_MAX)); + roundtripTest<64>(data.data(), data.size(), 0); +} + +TEST(FForTest, AllBitwidthsSmall) { + for (unsigned bw = 0; bw <= 64; ++bw) { + uint64_t range = (bw == 0) ? 0 : (bw == 64) ? UINT64_MAX : ((1ULL << bw) - 1); + auto data = padToLanes(genData(64, 42, range, 100 + bw)); + roundtripTestRt(data.data(), data.size(), 42, bw); + } +} + +TEST(FForTest, AllBitwidthsLarge) { + for (unsigned bw = 0; bw <= 64; ++bw) { + uint64_t range = (bw == 0) ? 0 : (bw == 64) ? UINT64_MAX : ((1ULL << bw) - 1); + auto data = padToLanes(genData(4096, 42, range, 200 + bw)); + roundtripTestRt(data.data(), data.size(), 42, bw); + } +} + +TEST(FForTest, VariousSizes) { + for (size_t n : {4, 8, 12, 16, 20, 28, 32, 60, 64, 100, 128, 255, 256, 500, 1000, 1024, 4096}) { + auto data = padToLanes(genData(n, 100, 255, n)); + roundtripTest<8>(data.data(), (n + kLanes - 1) / kLanes * kLanes, 100); + } +} + +TEST(FForTest, MinSize) { + uint64_t data[4] = {10, 11, 12, 13}; + roundtripTest<4>(data, 4, 10); +} + +TEST(FForTest, AllSame) { + std::vector data(1024, 999999); + roundtripTest<0>(data.data(), data.size(), 999999); +} + +TEST(FForTest, Sequential) { + std::vector data(1024); + for (size_t i = 0; i < 1024; ++i) + data[i] = 1000 + i; + uint64_t base; + unsigned bw; + analyze(data.data(), data.size(), base, bw); + ASSERT_EQ(base, uint64_t(1000)); + ASSERT_EQ(bw, 10u); + roundtripTest<10>(data.data(), data.size(), base); +} + +TEST(FForTest, LargeBase) { + uint64_t largeBase = UINT64_MAX - 1000; + auto data = padToLanes(genData(256, largeBase, 100)); + roundtripTest<7>(data.data(), data.size(), largeBase); +} + +TEST(FForTest, AnalyzeCorrectness) { + uint64_t data1[] = {5, 5, 5, 5}; + uint64_t b; + unsigned w; + analyze(data1, 4, b, w); + ASSERT_EQ(b, uint64_t(5)); + ASSERT_EQ(w, 0u); + + uint64_t data2[] = {10, 11, 10, 11}; + analyze(data2, 4, b, w); + ASSERT_EQ(b, uint64_t(10)); + ASSERT_EQ(w, 1u); + + uint64_t data3[] = {0, 255, 128, 64}; + analyze(data3, 4, b, w); + ASSERT_EQ(b, uint64_t(0)); + ASSERT_EQ(w, 8u); +} + +TEST(FForTest, CompressedSize) { + ASSERT_EQ(compressedWords(256, 6), size_t(24)); + ASSERT_EQ(compressedWords(256, 1), size_t(4)); + ASSERT_EQ(compressedWords(256, 64), size_t(256)); + ASSERT_EQ(compressedWords(256, 0), size_t(0)); +} + +// compress64 / decompress64 tests + +TEST(FForTest, Compress64Basic) { + auto data = genData(256, 1000, 99); + compressRoundtrip(data.data(), data.size()); +} + +TEST(FForTest, Compress64WithTail1) { + auto data = genData(5, 100, 50); + compressRoundtrip(data.data(), data.size()); +} + +TEST(FForTest, Compress64WithTail2) { + auto data = genData(6, 100, 50); + compressRoundtrip(data.data(), data.size()); +} + +TEST(FForTest, Compress64WithTail3) { + auto data = genData(7, 100, 50); + compressRoundtrip(data.data(), data.size()); +} + +TEST(FForTest, Compress64ExactLanes) { + auto data = genData(4, 100, 50); + compressRoundtrip(data.data(), data.size()); +} + +TEST(FForTest, Compress64OnlyTail) { + for (size_t n = 1; n <= 3; ++n) { + auto data = genData(n, 42, 10); + compressRoundtrip(data.data(), data.size()); + } +} + +TEST(FForTest, Compress64Large) { + auto data = genData(10000, 5000, 255); + compressRoundtrip(data.data(), data.size()); +} + +TEST(FForTest, Compress64LargeWithTail) { + auto data = genData(10001, 5000, 255); + compressRoundtrip(data.data(), data.size()); +} + +TEST(FForTest, Compress64AllSame) { + std::vector data(128, 42); + compressRoundtrip(data.data(), data.size()); +} + +TEST(FForTest, Compress64FullRange) { + auto data = genData(256, 0, UINT64_MAX); + compressRoundtrip(data.data(), data.size()); +} + +TEST(FForTest, Compress64SizeCheck) { + auto narrow = genData(256, 1000, 63); // bw=6 + std::vector buf(compress64Bound(256)); + size_t written = compress64(narrow.data(), narrow.size(), buf.data()); + + // block header(16) + packed data + tail header(16) + size_t expected = kHeaderSize + compressedWords(256, 6) * sizeof(uint64_t) + kHeaderSize; + ASSERT_EQ(written, expected); + + size_t raw = 256 * sizeof(uint64_t); + double ratio = double(raw) / double(written); + ASSERT_GT(ratio, 9.0) << "Ratio too low: " << ratio; +} + +TEST(FForTest, Compress64AllSizes1To20) { + for (size_t n = 1; n <= 20; ++n) { + auto data = genData(n, 100, 200, n * 7); + compressRoundtrip(data.data(), data.size()); + } +} + +// OOB read test — decode() reads past the end of the compressed buffer on the +// last group when newBitPos hits a 64-bit boundary. To detect this, we place +// the compressed buffer at the end of an mmap'd page with a PROT_NONE guard +// page immediately after, so any OOB read causes a SIGSEGV. +// +// Example: BW=32, 8 values (2 groups of 4). compressedWords = 4. +// decode pre-loads in[0..3]. After group 1: newBitPos=64, overflow=0, +// the else branch loads in[4..7] — 4 words past end of 4-word buffer. +#if defined(__linux__) || defined(__APPLE__) +#include +#include + +// Allocate `size` bytes at the END of a page, with a guard page after. +// Returns {base_ptr (to munmap), usable_ptr, total_mmap_size}. +static std::tuple allocAtPageEnd(size_t size) { + long pageSize = sysconf(_SC_PAGESIZE); + // Round up to cover `size` bytes + 1 guard page. + size_t dataPages = (size + pageSize - 1) / pageSize; + size_t totalSize = (dataPages + 1) * pageSize; // +1 for guard + void* base = mmap(nullptr, totalSize, PROT_READ | PROT_WRITE, MAP_PRIVATE | MAP_ANONYMOUS, -1, 0); + EXPECT_NE(base, MAP_FAILED); + // Make the last page a guard (no access). + void* guardPage = static_cast(base) + dataPages * pageSize; + mprotect(guardPage, pageSize, PROT_NONE); + // Return pointer to the last `size` bytes before the guard page. + auto* usable = static_cast(guardPage) - size; + return {base, usable, totalSize}; +} + +static void freePageEnd(void* base, size_t totalSize) { + munmap(base, totalSize); +} + +// BW=32, nValues=8: clean 64-bit boundary on last group, triggers OOB pre-load. +TEST(FForTest, DecodeBw32OobGuardPage) { + constexpr unsigned BW = 32; + constexpr size_t N = 8; // 2 groups of 4 + uint64_t data[N]; + for (size_t i = 0; i < N; ++i) { + data[i] = 1000 + i; + } + + // Encode into exact-size buffer (no padding). + size_t compN = compressedWords(N, BW); + size_t compBytes = compN * sizeof(uint64_t); + auto [encBase, encBuf, encTotalSize] = allocAtPageEnd(compBytes); + auto* encPtr = reinterpret_cast(encBuf); + encode(data, encPtr, 1000, N); + + // Decode from the exact-size buffer at page end — OOB read hits guard page. + uint64_t decoded[N] = {}; + decode(encPtr, decoded, 1000, N); + + for (size_t i = 0; i < N; ++i) { + ASSERT_EQ(decoded[i], data[i]) << "Mismatch at i=" << i; + } + freePageEnd(encBase, encTotalSize); +} + +// BW=16, nValues=16: newBitPos=64 with overflow=0 on groups 3,7,11,15. +// Last group (g=3) triggers OOB. +TEST(FForTest, DecodeBw16OobGuardPage) { + constexpr unsigned BW = 16; + constexpr size_t N = 16; // 4 groups of 4 + uint64_t data[N]; + for (size_t i = 0; i < N; ++i) { + data[i] = 50000 + i; + } + + size_t compN = compressedWords(N, BW); + size_t compBytes = compN * sizeof(uint64_t); + auto [encBase, encBuf, encTotalSize] = allocAtPageEnd(compBytes); + auto* encPtr = reinterpret_cast(encBuf); + encode(data, encPtr, 50000, N); + + uint64_t decoded[N] = {}; + decode(encPtr, decoded, 50000, N); + + for (size_t i = 0; i < N; ++i) { + ASSERT_EQ(decoded[i], data[i]) << "Mismatch at i=" << i; + } + freePageEnd(encBase, encTotalSize); +} + +// BW=7, nValues=256: overflow > 0 on last group, triggers OOB in the +// "load next words" branch (lines 177-180). +TEST(FForTest, DecodeBw7OobGuardPage) { + constexpr unsigned BW = 7; + constexpr size_t N = 256; + auto data = padToLanes(genData(N, 1000, 99)); + + size_t compN = compressedWords(N, BW); + size_t compBytes = compN * sizeof(uint64_t); + auto [encBase, encBuf, encTotalSize] = allocAtPageEnd(compBytes); + auto* encPtr = reinterpret_cast(encBuf); + encode(data.data(), encPtr, 1000, N); + + uint64_t decoded[N] = {}; + decode(encPtr, decoded, 1000, N); + + for (size_t i = 0; i < N; ++i) { + ASSERT_EQ(decoded[i], data[i]) << "Mismatch at i=" << i; + } + freePageEnd(encBase, encTotalSize); +} + +#endif // __linux__ || __APPLE__ + +// Misalignment tests — verify compress64/decompress64 handle unaligned pointers. + +TEST(FForTest, Compress64MisalignedOutput) { + auto data = genData(256, 1000, 99); + std::vector buf(compress64Bound(256) + 16); + + for (size_t offset = 0; offset < 8; ++offset) { + uint8_t* out = buf.data() + offset; + size_t written = compress64(data.data(), data.size(), out); + + std::vector decoded(256); + size_t n = decompress64(out, written, decoded.data()); + ASSERT_EQ(n, size_t(256)); + for (size_t i = 0; i < 256; ++i) { + ASSERT_EQ(decoded[i], data[i]) << "offset=" << offset << " i=" << i; + } + } +} + +TEST(FForTest, Compress64MisalignedInput) { + auto raw = genData(256, 1000, 99); + std::vector inputBuf(256 * sizeof(uint64_t) + 16); + + for (size_t offset = 0; offset < 8; ++offset) { + std::memcpy(inputBuf.data() + offset, raw.data(), 256 * sizeof(uint64_t)); + const auto* misalignedInput = reinterpret_cast(inputBuf.data() + offset); + + std::vector comp(compress64Bound(256)); + size_t written = compress64(misalignedInput, 256, comp.data()); + + std::vector decoded(256); + size_t n = decompress64(comp.data(), written, decoded.data()); + ASSERT_EQ(n, size_t(256)); + for (size_t i = 0; i < 256; ++i) { + ASSERT_EQ(decoded[i], raw[i]) << "offset=" << offset << " i=" << i; + } + } +} + +TEST(FForTest, Decompress64MisalignedOutput) { + auto data = genData(256, 1000, 99); + std::vector comp(compress64Bound(256)); + size_t written = compress64(data.data(), data.size(), comp.data()); + + std::vector outBuf(256 * sizeof(uint64_t) + 16); + for (size_t offset = 0; offset < 8; ++offset) { + auto* misalignedOutput = reinterpret_cast(outBuf.data() + offset); + size_t n = decompress64(comp.data(), written, misalignedOutput); + ASSERT_EQ(n, size_t(256)); + for (size_t i = 0; i < 256; ++i) { + uint64_t val; + std::memcpy(&val, reinterpret_cast(misalignedOutput) + i * sizeof(uint64_t), sizeof(val)); + ASSERT_EQ(val, data[i]) << "offset=" << offset << " i=" << i; + } + } +} + +TEST(FForTest, Compress64AllMisaligned) { + auto raw = genData(256, 1000, 99); + std::vector inputBuf(256 * sizeof(uint64_t) + 16); + std::vector compBuf(compress64Bound(256) + 16); + std::vector outBuf(256 * sizeof(uint64_t) + 16); + + for (size_t inOff = 1; inOff < 8; inOff += 3) { + for (size_t compOff = 1; compOff < 8; compOff += 3) { + for (size_t outOff = 1; outOff < 8; outOff += 3) { + std::memcpy(inputBuf.data() + inOff, raw.data(), 256 * sizeof(uint64_t)); + const auto* inPtr = reinterpret_cast(inputBuf.data() + inOff); + + size_t written = compress64(inPtr, 256, compBuf.data() + compOff); + + auto* outPtr = reinterpret_cast(outBuf.data() + outOff); + size_t n = decompress64(compBuf.data() + compOff, written, outPtr); + ASSERT_EQ(n, size_t(256)); + for (size_t i = 0; i < 256; ++i) { + uint64_t val; + std::memcpy(&val, reinterpret_cast(outPtr) + i * sizeof(uint64_t), sizeof(val)); + ASSERT_EQ(val, raw[i]) << "inOff=" << inOff << " compOff=" << compOff << " outOff=" << outOff << " i=" << i; + } + } + } + } +} + +// FForCodec wrapper tests + +TEST(FForCodecTest, CompressDecompressRoundtrip) { + auto data = genData(1024, 5000, 255); + int64_t inputSize = data.size() * sizeof(uint64_t); + + auto maxLen = FForCodec::maxCompressedLength(inputSize); + std::vector compressed(maxLen); + + auto compResult = FForCodec::compress( + reinterpret_cast(data.data()), inputSize, compressed.data(), maxLen); + ASSERT_TRUE(compResult.ok()) << compResult.status().ToString(); + auto compressedSize = *compResult; + ASSERT_GT(compressedSize, 0); + ASSERT_LT(compressedSize, inputSize); + + std::vector decoded(data.size()); + auto decResult = FForCodec::decompress( + compressed.data(), compressedSize, reinterpret_cast(decoded.data()), inputSize); + ASSERT_TRUE(decResult.ok()) << decResult.status().ToString(); + + for (size_t i = 0; i < data.size(); ++i) { + ASSERT_EQ(decoded[i], data[i]) << "Mismatch at index " << i; + } +} + +TEST(FForCodecTest, EmptyInput) { + auto result = FForCodec::compress(nullptr, 0, nullptr, 0); + ASSERT_TRUE(result.ok()); + ASSERT_EQ(*result, 0); +} + +TEST(FForCodecTest, InvalidInputSize) { + uint8_t dummy[7] = {}; + auto result = FForCodec::compress(dummy, 7, dummy, 100); + ASSERT_FALSE(result.ok()); +} + +// Full-range random data: bw=64, FFOR can't compress below raw size. +// This exercises the fallback path in compressTypeAwareBuffer where +// compressed size >= uncompressed size and kUncompressedBuffer is used. +TEST(FForCodecTest, FullRangeDataRoundtrip) { + auto data = genData(256, 0, UINT64_MAX); + int64_t inputSize = data.size() * sizeof(uint64_t); + + auto maxLen = FForCodec::maxCompressedLength(inputSize); + std::vector compressed(maxLen); + + auto compResult = FForCodec::compress( + reinterpret_cast(data.data()), inputSize, compressed.data(), maxLen); + ASSERT_TRUE(compResult.ok()) << compResult.status().ToString(); + auto compressedSize = *compResult; + // Full-range data: compressed >= raw (FFOR adds overhead at bw=64). + ASSERT_GE(compressedSize, inputSize); + + std::vector decoded(data.size()); + auto decResult = FForCodec::decompress( + compressed.data(), compressedSize, reinterpret_cast(decoded.data()), inputSize); + ASSERT_TRUE(decResult.ok()) << decResult.status().ToString(); + + for (size_t i = 0; i < data.size(); ++i) { + ASSERT_EQ(decoded[i], data[i]) << "Mismatch at index " << i; + } +} + +// TypeAwareCompressCodec roundtrip tests + +TEST(TypeAwareCompressCodecTest, SupportedTypes) { + // Supported TAC types. + ASSERT_TRUE(TypeAwareCompressCodec::support(tac::kUInt64)); + + // Not supported. + ASSERT_FALSE(TypeAwareCompressCodec::support(tac::kUnsupported)); + ASSERT_FALSE(TypeAwareCompressCodec::support(kSomeUnsupportedType)); +} + +TEST(TypeAwareCompressCodecTest, NarrowDataRoundtrip) { + // Narrow range data: compresses well. + auto data = genData(1024, 5000, 255); + int64_t inputSize = data.size() * sizeof(uint64_t); + + auto maxLen = TypeAwareCompressCodec::maxCompressedLen(inputSize, tac::kUInt64); + std::vector compressed(maxLen); + + auto compResult = TypeAwareCompressCodec::compress( + reinterpret_cast(data.data()), inputSize, compressed.data(), maxLen, tac::kUInt64); + ASSERT_TRUE(compResult.ok()) << compResult.status().ToString(); + auto compressedSize = *compResult; + ASSERT_GT(compressedSize, 0); + ASSERT_LT(compressedSize, inputSize); + + std::vector decoded(data.size()); + auto decResult = TypeAwareCompressCodec::decompress( + compressed.data(), compressedSize, reinterpret_cast(decoded.data()), inputSize); + ASSERT_TRUE(decResult.ok()) << decResult.status().ToString(); + + for (size_t i = 0; i < data.size(); ++i) { + ASSERT_EQ(decoded[i], data[i]) << "Mismatch at index " << i; + } +} + +// Full-range random data through TypeAwareCompressCodec. +// FFOR produces output >= input size. The caller (compressTypeAwareBuffer) would +// fall back to kUncompressedBuffer, but TypeAwareCompressCodec itself still +// produces valid (just large) output that roundtrips correctly. +TEST(TypeAwareCompressCodecTest, FullRangeDataRoundtrip) { + auto data = genData(256, 0, UINT64_MAX); + int64_t inputSize = data.size() * sizeof(uint64_t); + + auto maxLen = TypeAwareCompressCodec::maxCompressedLen(inputSize, tac::kUInt64); + std::vector compressed(maxLen); + + auto compResult = TypeAwareCompressCodec::compress( + reinterpret_cast(data.data()), inputSize, compressed.data(), maxLen, tac::kUInt64); + ASSERT_TRUE(compResult.ok()) << compResult.status().ToString(); + auto compressedSize = *compResult; + // Compressed size >= input because full-range data can't be compressed. + ASSERT_GE(compressedSize, inputSize); + + std::vector decoded(data.size()); + auto decResult = TypeAwareCompressCodec::decompress( + compressed.data(), compressedSize, reinterpret_cast(decoded.data()), inputSize); + ASSERT_TRUE(decResult.ok()) << decResult.status().ToString(); + + for (size_t i = 0; i < data.size(); ++i) { + ASSERT_EQ(decoded[i], data[i]) << "Mismatch at index " << i; + } +} + +TEST(TypeAwareCompressCodecTest, DoubleTypeRoundtrip) { + // Doubles reinterpreted as uint64 — exercises the codec with DOUBLE type. + std::vector doubles(512); + std::mt19937_64 rng(99); + std::uniform_real_distribution dist(1000.0, 1001.0); // narrow range + for (auto& d : doubles) { + d = dist(rng); + } + + int64_t inputSize = doubles.size() * sizeof(double); + auto maxLen = TypeAwareCompressCodec::maxCompressedLen(inputSize, tac::kUInt64); + std::vector compressed(maxLen); + + auto compResult = TypeAwareCompressCodec::compress( + reinterpret_cast(doubles.data()), inputSize, compressed.data(), maxLen, tac::kUInt64); + ASSERT_TRUE(compResult.ok()) << compResult.status().ToString(); + + std::vector decoded(doubles.size()); + auto decResult = TypeAwareCompressCodec::decompress( + compressed.data(), *compResult, reinterpret_cast(decoded.data()), inputSize); + ASSERT_TRUE(decResult.ok()) << decResult.status().ToString(); + + for (size_t i = 0; i < doubles.size(); ++i) { + ASSERT_EQ( + *reinterpret_cast(&decoded[i]), + *reinterpret_cast(&doubles[i])) + << "Mismatch at index " << i; + } +} + +TEST(TypeAwareCompressCodecTest, EmptyInput) { + auto result = TypeAwareCompressCodec::compress(nullptr, 0, nullptr, 0, tac::kUInt64); + ASSERT_TRUE(result.ok()); + ASSERT_EQ(*result, 0); +} + +TEST(TypeAwareCompressCodecTest, UnsupportedType) { + uint8_t dummy[8] = {}; + auto result = TypeAwareCompressCodec::compress(dummy, 8, dummy, 100, kSomeUnsupportedType); + ASSERT_FALSE(result.ok()); +} diff --git a/cpp/core/utils/tac/FForCodec.cc b/cpp/core/utils/tac/FForCodec.cc new file mode 100644 index 000000000000..ec079662f92e --- /dev/null +++ b/cpp/core/utils/tac/FForCodec.cc @@ -0,0 +1,60 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include "utils/tac/FForCodec.h" +#include "utils/tac/ffor.hpp" + +namespace gluten { + +int64_t FForCodec::maxCompressedLength(int64_t inputSize) { + size_t numValues = inputSize / sizeof(uint64_t); + return static_cast(ffor::compress64Bound(numValues)); +} + +arrow::Result +FForCodec::compress(const uint8_t* input, int64_t inputSize, uint8_t* output, int64_t outputSize) { + if (inputSize == 0) { + return 0; + } + if (inputSize % sizeof(uint64_t) != 0) { + return arrow::Status::Invalid("FForCodec: input size ", inputSize, " is not a multiple of 8."); + } + + size_t numValues = inputSize / sizeof(uint64_t); + auto maxLen = static_cast(ffor::compress64Bound(numValues)); + if (outputSize < maxLen) { + return arrow::Status::Invalid("FForCodec: output buffer too small."); + } + + auto written = ffor::compress64(reinterpret_cast(input), numValues, output); + return static_cast(written); +} + +arrow::Result +FForCodec::decompress(const uint8_t* input, int64_t inputSize, uint8_t* output, int64_t outputSize) { + if (outputSize == 0) { + return 0; + } + if (outputSize % sizeof(uint64_t) != 0) { + return arrow::Status::Invalid("FForCodec: output size ", outputSize, " is not a multiple of 8."); + } + + auto nDecoded = ffor::decompress64(input, inputSize, reinterpret_cast(output)); + return static_cast(nDecoded); +} + +} // namespace gluten diff --git a/cpp/core/utils/tac/FForCodec.h b/cpp/core/utils/tac/FForCodec.h new file mode 100644 index 000000000000..b91a13860a32 --- /dev/null +++ b/cpp/core/utils/tac/FForCodec.h @@ -0,0 +1,45 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#pragma once + +#include +#include +#include + +namespace gluten { + +// FFOR (Frame-of-Reference) codec for uint64_t data using 4-lane layout. +// Used for INT64/UINT64 columns in shuffle. +class FForCodec { + public: + // Returns the maximum compressed size in bytes for the given input size. + static int64_t maxCompressedLength(int64_t inputSize); + + // Compress uint64_t data. + // inputSize must be a multiple of 8 (sizeof(uint64_t)). + // Returns the number of compressed bytes written to output. + static arrow::Result compress(const uint8_t* input, int64_t inputSize, uint8_t* output, int64_t outputSize); + + // Decompress data compressed by compress(). + // outputSize must be a multiple of 8 (sizeof(uint64_t)). + // Returns the number of uint64_t values decoded. + static arrow::Result + decompress(const uint8_t* input, int64_t inputSize, uint8_t* output, int64_t outputSize); +}; + +} // namespace gluten diff --git a/cpp/core/utils/tac/TypeAwareCompressCodec.cc b/cpp/core/utils/tac/TypeAwareCompressCodec.cc new file mode 100644 index 000000000000..2362f999b577 --- /dev/null +++ b/cpp/core/utils/tac/TypeAwareCompressCodec.cc @@ -0,0 +1,82 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include "utils/tac/TypeAwareCompressCodec.h" +#include "utils/tac/FForCodec.h" + +namespace gluten { + +bool TypeAwareCompressCodec::support(int8_t tacType) { + return tacType == tac::kUInt64; +} + +int64_t TypeAwareCompressCodec::maxCompressedLen(int64_t inputLen, int8_t tacType) { + if (!support(tacType)) { + return 0; + } + return kPayloadHeaderSize + FForCodec::maxCompressedLength(inputLen); +} + +arrow::Result TypeAwareCompressCodec::compress( + const uint8_t* input, + int64_t inputLen, + uint8_t* output, + int64_t outputLen, + int8_t tacType) { + if (!support(tacType)) { + return arrow::Status::Invalid("Type-aware compression not supported for tac type: ", static_cast(tacType)); + } + if (inputLen == 0) { + return 0; + } + if (outputLen < kPayloadHeaderSize) { + return arrow::Status::Invalid("Output buffer too small for type-aware compression."); + } + + auto* out = output; + *out++ = static_cast(CodecId::kFFor); + *out++ = static_cast(tacType); + + auto availableOutput = outputLen - kPayloadHeaderSize; + ARROW_ASSIGN_OR_RAISE(auto compressedLen, FForCodec::compress(input, inputLen, out, availableOutput)); + + return kPayloadHeaderSize + compressedLen; +} + +arrow::Result +TypeAwareCompressCodec::decompress(const uint8_t* input, int64_t inputLen, uint8_t* output, int64_t outputLen) { + if (inputLen < kPayloadHeaderSize) { + return arrow::Status::Invalid("Input too small for type-aware decompress header."); + } + + auto* in = input; + auto codecId = static_cast(*in++); + [[maybe_unused]] auto tacType = *in++; + auto dataLen = inputLen - kPayloadHeaderSize; + + switch (codecId) { + case CodecId::kFFor: { + ARROW_ASSIGN_OR_RAISE(auto nDecoded, FForCodec::decompress(in, dataLen, output, outputLen)); + (void)nDecoded; + return inputLen; + } + default: + return arrow::Status::Invalid("Unknown type-aware codec ID: ", static_cast(codecId)); + } +} + +} // namespace gluten diff --git a/cpp/core/utils/tac/TypeAwareCompressCodec.h b/cpp/core/utils/tac/TypeAwareCompressCodec.h new file mode 100644 index 000000000000..6955525d2e44 --- /dev/null +++ b/cpp/core/utils/tac/TypeAwareCompressCodec.h @@ -0,0 +1,68 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#pragma once + +#include +#include +#include + +namespace gluten { +namespace tac { + +/// Type identifiers for type-aware compression. +/// Independent of any external type system (Arrow, Velox, etc.). +/// Backend-specific code converts to/from these types. +enum TacDataType : int8_t { + kUnsupported = -1, // Not compressible by TAC. + kUInt64 = 0, // 8-byte unsigned integer (also used for int64, double, date64). +}; + +} // namespace tac + +/// TypeAwareCompressCodec provides type-aware compression that selects the best +/// compression algorithm based on the data type of the buffer. +/// +/// Currently supported: +/// kUInt64 -> FFor (Frame-of-Reference + Bit-Packing) for uint64_t streams. +/// +/// The compressed wire format is self-describing: decompress() does not need +/// a type hint because codec ID and element width are embedded in the header. +class TypeAwareCompressCodec { + public: + /// Check if type-aware compression is supported for the given TAC type. + static bool support(int8_t tacType); + + /// Estimate the maximum compressed output size. + static int64_t maxCompressedLen(int64_t inputLen, int8_t tacType); + + /// Compress a buffer with a type hint. Returns bytes written to output. + static arrow::Result + compress(const uint8_t* input, int64_t inputLen, uint8_t* output, int64_t outputLen, int8_t tacType); + + /// Decompress without a type hint. Self-describing from the payload header. + static arrow::Result decompress(const uint8_t* input, int64_t inputLen, uint8_t* output, int64_t outputLen); + + private: + enum CodecId : uint8_t { + kFFor = 1, + }; + + static constexpr int64_t kPayloadHeaderSize = sizeof(uint8_t) + sizeof(uint8_t); +}; + +} // namespace gluten diff --git a/cpp/core/utils/tac/ffor.hpp b/cpp/core/utils/tac/ffor.hpp new file mode 100644 index 000000000000..92ec4fbe69bd --- /dev/null +++ b/cpp/core/utils/tac/ffor.hpp @@ -0,0 +1,472 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +// 4-lane FFOR (Frame-of-Reference + Bit-Packing) codec for uint64_t. +// Uses a 4-lane transposed layout for auto-vectorization. +// Reference: https://www.vldb.org/pvldb/vol16/p2132-afroozeh.pdf + +#pragma once + +#include +#include +#include +#include +#include + +namespace gluten { +namespace ffor { + +static constexpr unsigned kLanes = 4; + +// Compile-time mask for a given bit width. +template +static constexpr uint64_t bitmask() { + if constexpr (BW == 0) { + return 0; + } else if constexpr (BW >= 64) { + return ~uint64_t(0); + } else { + return (uint64_t(1) << BW) - 1; + } +} + +// Returns number of uint64_t words needed for the compressed output. +inline constexpr size_t compressedWords(size_t nValues, unsigned bw) { + if (bw == 0) { + return 0; + } + const size_t valsPerLane = nValues / kLanes; + const size_t wordsPerLane = (valsPerLane * bw + 63) / 64; + return wordsPerLane * kLanes; +} + +// FFOR encode: bit-pack nValues uint64_t values with a given base and bit width. +// nValues must be a multiple of kLanes. +template +#if defined(__clang__) +__attribute__((noinline)) +#elif defined(__GNUC__) +__attribute__((optimize("O3,tree-vectorize"), noinline)) +#endif +void encode(const uint64_t* __restrict in, uint64_t* __restrict out, uint64_t base, size_t nValues) { + static_assert(BW <= 64, "BW must be <= 64"); + + if constexpr (BW == 0) { + return; + } else if constexpr (BW == 64) { + for (size_t i = 0; i < nValues; ++i) { + out[i] = in[i] - base; + } + return; + } else { + constexpr uint64_t kMask = bitmask(); + const size_t nGroups = nValues / kLanes; + + uint64_t tmp[kLanes] = {}; + size_t outOffset = 0; + unsigned bitPos = 0; + + for (size_t g = 0; g < nGroups; ++g) { +#if defined(__clang__) +#pragma clang loop vectorize(enable) interleave(enable) +#elif defined(__GNUC__) +#pragma GCC ivdep +#endif + for (unsigned lane = 0; lane < kLanes; ++lane) { + uint64_t val = (in[g * kLanes + lane] - base) & kMask; + tmp[lane] |= val << bitPos; + } + + unsigned newBitPos = bitPos + BW; + + if (newBitPos >= 64) { + for (unsigned lane = 0; lane < kLanes; ++lane) { + out[outOffset + lane] = tmp[lane]; + } + outOffset += kLanes; + + unsigned overflow = newBitPos - 64; + if (overflow > 0) { + for (unsigned lane = 0; lane < kLanes; ++lane) { + uint64_t val = (in[g * kLanes + lane] - base) & kMask; + tmp[lane] = val >> (BW - overflow); + } + } else { + for (unsigned lane = 0; lane < kLanes; ++lane) { + tmp[lane] = 0; + } + } + bitPos = overflow; + } else { + bitPos = newBitPos; + } + } + + if (bitPos > 0) { + for (unsigned lane = 0; lane < kLanes; ++lane) { + out[outOffset + lane] = tmp[lane]; + } + } + } +} + +// FFOR decode: unpack nValues uint64_t values with a given base and bit width. +// nValues must be a multiple of kLanes. +template +#if defined(__clang__) +__attribute__((noinline)) +#elif defined(__GNUC__) +__attribute__((optimize("O3,tree-vectorize"), noinline)) +#endif +void decode(const uint64_t* __restrict in, uint64_t* __restrict out, uint64_t base, size_t nValues) { + static_assert(BW <= 64, "BW must be <= 64"); + + if constexpr (BW == 0) { + for (size_t i = 0; i < nValues; ++i) { + out[i] = base; + } + return; + } else if constexpr (BW == 64) { + for (size_t i = 0; i < nValues; ++i) { + out[i] = in[i] + base; + } + return; + } else { + constexpr uint64_t kMask = bitmask(); + const size_t nGroups = nValues / kLanes; + + uint64_t cur[kLanes]; + size_t inOffset = 0; + unsigned bitPos = 0; + + for (unsigned lane = 0; lane < kLanes; ++lane) { + cur[lane] = in[inOffset + lane]; + } + inOffset += kLanes; + + for (size_t g = 0; g < nGroups; ++g) { +#if defined(__clang__) +#pragma clang loop vectorize(enable) interleave(enable) +#elif defined(__GNUC__) +#pragma GCC ivdep +#endif + for (unsigned lane = 0; lane < kLanes; ++lane) { + uint64_t val = (cur[lane] >> bitPos) & kMask; + out[g * kLanes + lane] = val + base; + } + + unsigned newBitPos = bitPos + BW; + + if (newBitPos >= 64) { + unsigned overflow = newBitPos - 64; + + if (overflow > 0) { + // Straddled values: need bits from the next words. + // Safe even on last group — encoder wrote these partial words. + for (unsigned lane = 0; lane < kLanes; ++lane) { + cur[lane] = in[inOffset + lane]; + } + inOffset += kLanes; + + for (unsigned lane = 0; lane < kLanes; ++lane) { + uint64_t prevPart = (in[inOffset - 2 * kLanes + lane] >> bitPos); + uint64_t nextPart = cur[lane] << (BW - overflow); + out[g * kLanes + lane] = ((prevPart | nextPart) & kMask) + base; + } + } else if (g + 1 < nGroups) { + // Clean 64-bit boundary, more groups to follow — pre-load next words. + for (unsigned lane = 0; lane < kLanes; ++lane) { + cur[lane] = in[inOffset + lane]; + } + inOffset += kLanes; + } + // else: clean boundary on last group — values fully decoded, no load needed. + bitPos = overflow; + } else { + bitPos = newBitPos; + } + } + } +} + +// Runtime BW dispatch via compile-time generated jump table. +namespace detail { + +template +void encodeDispatch(const uint64_t* in, uint64_t* out, uint64_t base, size_t n) { + encode(in, out, base, n); +} + +template +void decodeDispatch(const uint64_t* in, uint64_t* out, uint64_t base, size_t n) { + decode(in, out, base, n); +} + +using DispatchFn = void (*)(const uint64_t*, uint64_t*, uint64_t, size_t); + +template +constexpr auto makeEncodeTable(std::index_sequence) { + return std::array{&encodeDispatch...}; +} + +template +constexpr auto makeDecodeTable(std::index_sequence) { + return std::array{&decodeDispatch...}; +} + +inline const auto kEncodeTable = makeEncodeTable(std::make_index_sequence<65>{}); +inline const auto kDecodeTable = makeDecodeTable(std::make_index_sequence<65>{}); + +} // namespace detail + +// Runtime-dispatched encode (when BW is not known at compile time). +inline void encodeRt(const uint64_t* in, uint64_t* out, uint64_t base, size_t n, unsigned bw) { + detail::kEncodeTable[bw](in, out, base, n); +} + +// Runtime-dispatched decode. +inline void decodeRt(const uint64_t* in, uint64_t* out, uint64_t base, size_t n, unsigned bw) { + detail::kDecodeTable[bw](in, out, base, n); +} + +// Compute base (min) and bitwidth for a vector of values. +inline void analyze(const uint64_t* data, size_t n, uint64_t& base, unsigned& bw) { + if (n == 0) { + base = 0; + bw = 0; + return; + } + uint64_t mn = data[0]; + uint64_t mx = data[0]; + for (size_t i = 1; i < n; ++i) { + if (data[i] < mn) { + mn = data[i]; + } + if (data[i] > mx) { + mx = data[i]; + } + } + base = mn; + uint64_t range = mx - mn; + bw = 0; + while (range > 0) { + bw++; + range >>= 1; + } +} + +// All headers are 16 bytes (64-bit aligned), so packed data that follows +// is always naturally aligned — no memcpy needed for encode/decode. +// +// Block header (16 bytes, aligned): +// | bw (1B) | count (1B) | reserved (6B) | base (8B) | +// bw = 0..64: compressed block, count = LANES-groups. +// bw = 255: tail marker, count = number of raw values (0..3). +// base field is unused (zeroed) in tail marker. +static constexpr uint8_t kBwTailMarker = 255; +static constexpr size_t kHeaderSize = 16; // 8-byte aligned +static constexpr size_t kMaxValuesPerBlock = 256; + +// Write a 16-byte block header: | bw(1) | count(1) | reserved(6) | base(8) | +inline void writeHeader(uint8_t* p, uint8_t bw, uint8_t count, uint64_t base) { + p[0] = bw; + p[1] = count; + std::memset(p + 2, 0, 6); + std::memcpy(p + 8, &base, sizeof(base)); +} + +// Read a 16-byte block header. +inline void readHeader(const uint8_t* p, uint8_t& bw, uint8_t& count, uint64_t& base) { + bw = p[0]; + count = p[1]; + std::memcpy(&base, p + 8, sizeof(base)); +} + +// Worst-case compressed buffer size for num values. +inline constexpr size_t compress64Bound(size_t num) { + size_t nBlocks = (num + kMaxValuesPerBlock - 1) / kMaxValuesPerBlock; + if (nBlocks == 0) { + nBlocks = 1; + } + // block headers + data + tail header(16) + tail data + return (nBlocks + 1) * kHeaderSize + num * sizeof(uint64_t); +} + +// Template-based compress/decompress with alignment dispatch. +// InAligned: true if input (const uint64_t*) is 8-byte aligned. +// OutAligned: true if output (uint8_t*) is 8-byte aligned. +// When aligned, encode_rt/decode_rt work on pointers directly. +// When not aligned, a per-block aligned temp buffer is used. +template +inline size_t compress64Impl(const uint64_t* input, size_t num, uint8_t* output) { + alignas(64) uint64_t tmpIn[kMaxValuesPerBlock]; + alignas(64) uint64_t tmpOut[kMaxValuesPerBlock]; + + uint8_t* outPtr = output; + size_t remaining = num; + const uint64_t* inPtr = input; + + while (remaining >= kLanes) { + size_t blockVals = remaining - (remaining % kLanes); + if (blockVals > kMaxValuesPerBlock) { + blockVals = kMaxValuesPerBlock; + } + + // Analyze — read input via memcpy if unaligned. + const uint64_t* analyzeSrc; + if constexpr (InAligned) { + analyzeSrc = inPtr; + } else { + std::memcpy(tmpIn, inPtr, blockVals * sizeof(uint64_t)); + analyzeSrc = tmpIn; + } + + uint64_t base; + unsigned bw; + analyze(analyzeSrc, blockVals, base, bw); + + writeHeader(outPtr, static_cast(bw), static_cast(blockVals / kLanes), base); + outPtr += kHeaderSize; + + size_t compN = compressedWords(blockVals, bw); + size_t compBytes = compN * sizeof(uint64_t); + + // Encode: pick aligned src/dst. + const uint64_t* encIn = InAligned ? inPtr : tmpIn; + uint64_t* encOut = OutAligned ? reinterpret_cast(outPtr) : tmpOut; + + encodeRt(encIn, encOut, base, blockVals, bw); + + if constexpr (!OutAligned) { + std::memcpy(outPtr, tmpOut, compBytes); + } + outPtr += compBytes; + + inPtr += blockVals; + remaining -= blockVals; + } + + // Tail. + writeHeader(outPtr, kBwTailMarker, static_cast(remaining), 0); + outPtr += kHeaderSize; + + if (remaining > 0) { + std::memcpy(outPtr, inPtr, remaining * sizeof(uint64_t)); + outPtr += remaining * sizeof(uint64_t); + } + + return static_cast(outPtr - output); +} + +// Runtime dispatch — check alignment once, pick the right template. +inline size_t compress64(const uint64_t* input, size_t num, uint8_t* output) { + bool inOk = (reinterpret_cast(input) % alignof(uint64_t) == 0); + bool outOk = (reinterpret_cast(output) % alignof(uint64_t) == 0); + if (inOk && outOk) { + return compress64Impl(input, num, output); + } + if (inOk && !outOk) { + return compress64Impl(input, num, output); + } + if (!inOk && outOk) { + return compress64Impl(input, num, output); + } + return compress64Impl(input, num, output); +} + +// Template-based decompress with alignment dispatch. +template +inline size_t decompress64Impl(const uint8_t* input, size_t inputSize, uint64_t* output) { + alignas(64) uint64_t tmpIn[kMaxValuesPerBlock]; + alignas(64) uint64_t tmpOut[kMaxValuesPerBlock]; + + const uint8_t* inPtr = input; + const uint8_t* inEnd = input + inputSize; + size_t nDecoded = 0; + + while (inPtr + kHeaderSize <= inEnd) { + uint8_t bw; + uint8_t count; + uint64_t base; + readHeader(inPtr, bw, count, base); + inPtr += kHeaderSize; + + if (bw == kBwTailMarker) { + if (count > 0) { + // memcpy handles any alignment, no special case needed. + std::memcpy( + reinterpret_cast(output) + nDecoded * sizeof(uint64_t), inPtr, count * sizeof(uint64_t)); + nDecoded += count; + } + break; + } + + size_t blockVals = static_cast(count) * kLanes; + size_t compBytes = compressedWords(blockVals, bw) * sizeof(uint64_t); + + if (inPtr + compBytes > inEnd) { + break; + } + + // Decode: pick aligned src/dst. + const uint64_t* decIn; + if constexpr (InAligned) { + decIn = reinterpret_cast(inPtr); + } else { + std::memcpy(tmpIn, inPtr, compBytes); + decIn = tmpIn; + } + + uint64_t* decOut; + if constexpr (OutAligned) { + decOut = output + nDecoded; + } else { + decOut = tmpOut; + } + + decodeRt(decIn, decOut, base, blockVals, bw); + + if constexpr (!OutAligned) { + std::memcpy( + reinterpret_cast(output) + nDecoded * sizeof(uint64_t), tmpOut, blockVals * sizeof(uint64_t)); + } + + inPtr += compBytes; + nDecoded += blockVals; + } + + return nDecoded; +} + +// Runtime dispatch. +inline size_t decompress64(const uint8_t* input, size_t inputSize, uint64_t* output) { + bool inOk = (reinterpret_cast(input) % alignof(uint64_t) == 0); + bool outOk = (reinterpret_cast(output) % alignof(uint64_t) == 0); + if (inOk && outOk) { + return decompress64Impl(input, inputSize, output); + } + if (inOk && !outOk) { + return decompress64Impl(input, inputSize, output); + } + if (!inOk && outOk) { + return decompress64Impl(input, inputSize, output); + } + return decompress64Impl(input, inputSize, output); +} + +} // namespace ffor +} // namespace gluten diff --git a/cpp/velox/shuffle/VeloxHashShuffleWriter.cc b/cpp/velox/shuffle/VeloxHashShuffleWriter.cc index e8b455e448f5..4e7c22cfb166 100644 --- a/cpp/velox/shuffle/VeloxHashShuffleWriter.cc +++ b/cpp/velox/shuffle/VeloxHashShuffleWriter.cc @@ -17,6 +17,7 @@ #include "shuffle/VeloxHashShuffleWriter.h" #include "memory/ArrowMemory.h" +#include "shuffle/VeloxTypeAwareCompress.h" #include "memory/VeloxColumnarBatch.h" #include "shuffle/Utils.h" #include "utils/Common.h" @@ -764,6 +765,7 @@ arrow::Status VeloxHashShuffleWriter::initColumnTypes(const facebook::velox::Row isValidityBuffer_.push_back(true); isValidityBuffer_.push_back(false); isValidityBuffer_.push_back(false); + bufferTypes_.insert(bufferTypes_.end(), 3, tac::kUnsupported); } break; case arrow::StructType::type_id: case arrow::MapType::type_id: @@ -777,6 +779,7 @@ arrow::Status VeloxHashShuffleWriter::initColumnTypes(const facebook::velox::Row simpleColumnIndices_.push_back(i); isValidityBuffer_.push_back(true); isValidityBuffer_.push_back(boolIsBit()); + bufferTypes_.insert(bufferTypes_.end(), 2, tac::kUnsupported); } break; case arrow::NullType::type_id: break; @@ -784,12 +787,15 @@ arrow::Status VeloxHashShuffleWriter::initColumnTypes(const facebook::velox::Row simpleColumnIndices_.push_back(i); isValidityBuffer_.push_back(true); isValidityBuffer_.push_back(false); + bufferTypes_.push_back(tac::kUnsupported); // validity + bufferTypes_.push_back(veloxTypeToTacType(veloxColumnTypes_[i]->kind())); // data } break; } } if (hasComplexType_) { isValidityBuffer_.push_back(false); + bufferTypes_.push_back(tac::kUnsupported); } fixedWidthColumnCount_ = simpleColumnIndices_.size(); @@ -973,8 +979,9 @@ arrow::Status VeloxHashShuffleWriter::evictBuffers( std::vector> buffers, bool reuseBuffers) { if (!buffers.empty()) { - auto payload = - std::make_unique(numRows, &isValidityBuffer_, schema_, std::move(buffers), hasComplexType_); + auto* types = partitionWriter_->enableTypeAwareCompress() ? &bufferTypes_ : nullptr; + auto payload = std::make_unique( + numRows, &isValidityBuffer_, schema_, std::move(buffers), hasComplexType_, types); RETURN_NOT_OK(partitionWriter_->hashEvict(partitionId, std::move(payload), Evict::kCache, reuseBuffers, writtenBytes_)); } return arrow::Status::OK(); @@ -1393,8 +1400,9 @@ arrow::Result VeloxHashShuffleWriter::evictPartitionBuffersMinSize(int6 for (auto& item : pidToSize) { auto pid = item.first; ARROW_ASSIGN_OR_RAISE(auto buffers, assembleBuffers(pid, false)); + auto* types = partitionWriter_->enableTypeAwareCompress() ? &bufferTypes_ : nullptr; auto payload = std::make_unique( - item.second, &isValidityBuffer_, schema_, std::move(buffers), hasComplexType_); + item.second, &isValidityBuffer_, schema_, std::move(buffers), hasComplexType_, types); metrics_.totalBytesToEvict += payload->rawSize(); RETURN_NOT_OK(partitionWriter_->hashEvict(pid, std::move(payload), Evict::kSpill, false, writtenBytes_)); evicted = beforeEvict - partitionBufferPool_->bytes_allocated(); diff --git a/cpp/velox/shuffle/VeloxHashShuffleWriter.h b/cpp/velox/shuffle/VeloxHashShuffleWriter.h index fd889d4c327e..9947f7bc19e8 100644 --- a/cpp/velox/shuffle/VeloxHashShuffleWriter.h +++ b/cpp/velox/shuffle/VeloxHashShuffleWriter.h @@ -336,6 +336,7 @@ class VeloxHashShuffleWriter : public VeloxShuffleWriter { bool hasComplexType_ = false; std::vector isValidityBuffer_; + std::vector bufferTypes_; // Store arrow column types. Calculated once. std::vector> arrowColumnTypes_; diff --git a/cpp/velox/shuffle/VeloxTypeAwareCompress.h b/cpp/velox/shuffle/VeloxTypeAwareCompress.h new file mode 100644 index 000000000000..fbbabc5c06d9 --- /dev/null +++ b/cpp/velox/shuffle/VeloxTypeAwareCompress.h @@ -0,0 +1,36 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#pragma once + +#include "utils/tac/TypeAwareCompressCodec.h" +#include "velox/type/Type.h" + +namespace gluten { + +/// Convert a Velox TypeKind to a TAC data type for type-aware compression. +/// Returns tac::kUnsupported for types that cannot be compressed by TAC. +inline int8_t veloxTypeToTacType(facebook::velox::TypeKind kind) { + switch (kind) { + case facebook::velox::TypeKind::BIGINT: + return tac::kUInt64; + default: + return tac::kUnsupported; + } +} + +} // namespace gluten diff --git a/cpp/velox/tests/VeloxShuffleWriterTest.cc b/cpp/velox/tests/VeloxShuffleWriterTest.cc index e79c8de1de1d..bfb783f4be74 100644 --- a/cpp/velox/tests/VeloxShuffleWriterTest.cc +++ b/cpp/velox/tests/VeloxShuffleWriterTest.cc @@ -43,6 +43,7 @@ struct ShuffleTestParams { int32_t diskWriteBufferSize{0}; bool useRadixSort{false}; bool enableDictionary{false}; + bool enableTypeAwareCompress{false}; int64_t deserializerBufferSize{0}; std::string toString() const { @@ -54,6 +55,7 @@ struct ShuffleTestParams { << ", compressionBufferSize = " << diskWriteBufferSize << ", useRadixSort = " << (useRadixSort ? "true" : "false") << ", enableDictionary = " << (enableDictionary ? "true" : "false") + << ", enableTypeAwareCompress = " << (enableTypeAwareCompress ? "true" : "false") << ", deserializerBufferSize = " << deserializerBufferSize; return out.str(); } @@ -132,13 +134,16 @@ std::vector getTestParams() { // Local. for (const auto mergeBufferSize : mergeBufferSizes) { for (const bool enableDictionary : {true, false}) { - params.push_back(ShuffleTestParams{ - .shuffleWriterType = ShuffleWriterType::kHashShuffle, - .partitionWriterType = PartitionWriterType::kLocal, - .compressionType = compression, - .compressionThreshold = compressionThreshold, - .mergeBufferSize = mergeBufferSize, - .enableDictionary = enableDictionary}); + for (const bool enableTypeAwareCompress : {true, false}) { + params.push_back(ShuffleTestParams{ + .shuffleWriterType = ShuffleWriterType::kHashShuffle, + .partitionWriterType = PartitionWriterType::kLocal, + .compressionType = compression, + .compressionThreshold = compressionThreshold, + .mergeBufferSize = mergeBufferSize, + .enableDictionary = enableDictionary, + .enableTypeAwareCompress = enableTypeAwareCompress}); + } } } @@ -162,7 +167,8 @@ std::shared_ptr createPartitionWriter( arrow::Compression::type compressionType, int32_t mergeBufferSize, int32_t compressionThreshold, - bool enableDictionary) { + bool enableDictionary, + bool enableTypeAwareCompress = false) { GLUTEN_ASSIGN_OR_THROW(auto codec, arrow::util::Codec::Create(compressionType)); switch (partitionWriterType) { case PartitionWriterType::kLocal: { @@ -170,6 +176,7 @@ std::shared_ptr createPartitionWriter( options->mergeBufferSize = mergeBufferSize; options->compressionThreshold = compressionThreshold; options->enableDictionary = enableDictionary; + options->enableTypeAwareCompress = enableTypeAwareCompress; return std::make_shared( numPartitions, std::move(codec), getDefaultMemoryManager(), options, dataFile, std::move(localDirs)); } @@ -248,7 +255,8 @@ class VeloxShuffleWriterTest : public ::testing::TestWithParam 0, s"must be positive.")