Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -156,7 +156,8 @@ class ColumnarShuffleWriter[K, V](
conf.get(SHUFFLE_FILE_BUFFER_SIZE).toInt,
tempDataFile.getAbsolutePath,
localDirs,
GlutenConfig.get.columnarShuffleEnableDictionary
GlutenConfig.get.columnarShuffleEnableDictionary,
GlutenConfig.get.columnarShuffleEnableTypeAwareCompress
)

nativeShuffleWriter = if (isSort) {
Expand Down
2 changes: 2 additions & 0 deletions cpp/core/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -143,6 +143,8 @@ set(SPARK_COLUMNAR_PLUGIN_SRCS
shuffle/Spill.cc
shuffle/Utils.cc
utils/Compression.cc
utils/tac/FForCodec.cc
utils/tac/TypeAwareCompressCodec.cc
utils/StringUtil.cc
utils/ObjectStore.cc
jni/JniError.cc
Expand Down
6 changes: 4 additions & 2 deletions cpp/core/jni/JniWrapper.cc
Original file line number Diff line number Diff line change
Expand Up @@ -945,7 +945,8 @@ Java_org_apache_gluten_vectorized_LocalPartitionWriterJniWrapper_createPartition
jint shuffleFileBufferSize,
jstring dataFileJstr,
jstring localDirsJstr,
jboolean enableDictionary) {
jboolean enableDictionary,
jboolean enableTypeAwareCompress) {
JNI_METHOD_START

const auto ctx = getRuntime(env, wrapper);
Expand All @@ -960,7 +961,8 @@ Java_org_apache_gluten_vectorized_LocalPartitionWriterJniWrapper_createPartition
mergeBufferSize,
mergeThreshold,
numSubDirs,
enableDictionary);
enableDictionary,
enableTypeAwareCompress);

auto partitionWriter = std::make_shared<LocalPartitionWriter>(
numPartitions,
Expand Down
6 changes: 6 additions & 0 deletions cpp/core/shuffle/LocalPartitionWriter.h
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,12 @@ class LocalPartitionWriter : public PartitionWriter {
return arrow::Status::NotImplemented("Invalid code path for local shuffle writer.");
}

bool enableTypeAwareCompress() const override {
// Type-aware compression is not compatible with dictionary encoding
// since it may change the buffer layout and types.
return options_->enableTypeAwareCompress && !options_->enableDictionary;
}

/// The stop function performs several tasks:
/// 1. Opens the final data file.
/// 2. Iterates over each partition ID (pid) to:
Expand Down
8 changes: 6 additions & 2 deletions cpp/core/shuffle/Options.h
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ static constexpr int64_t kDefaultReadBufferSize = 1 << 20;
static constexpr int64_t kDefaultDeserializerBufferSize = 1 << 20;
static constexpr int64_t kDefaultShuffleFileBufferSize = 32 << 10;
static constexpr bool kDefaultEnableDictionary = false;
static constexpr bool kDefaultEnableTypeAwareCompress = false;

enum class ShuffleWriterType { kHashShuffle, kSortShuffle, kRssSortShuffle, kGpuHashShuffle };

Expand Down Expand Up @@ -175,6 +176,7 @@ struct LocalPartitionWriterOptions {
int32_t numSubDirs = kDefaultNumSubDirs; // spark.diskStore.subDirectories

bool enableDictionary = kDefaultEnableDictionary;
bool enableTypeAwareCompress = kDefaultEnableTypeAwareCompress;

LocalPartitionWriterOptions() = default;

Expand All @@ -185,14 +187,16 @@ struct LocalPartitionWriterOptions {
int32_t mergeBufferSize,
double mergeThreshold,
int32_t numSubDirs,
bool enableDictionary)
bool enableDictionary,
bool enableTypeAwareCompress = kDefaultEnableTypeAwareCompress)
: shuffleFileBufferSize(shuffleFileBufferSize),
compressionBufferSize(compressionBufferSize),
compressionThreshold(compressionThreshold),
mergeBufferSize(mergeBufferSize),
mergeThreshold(mergeThreshold),
numSubDirs(numSubDirs),
enableDictionary(enableDictionary) {}
enableDictionary(enableDictionary),
enableTypeAwareCompress(enableTypeAwareCompress) {}
};

struct RssPartitionWriterOptions {
Expand Down
4 changes: 4 additions & 0 deletions cpp/core/shuffle/PartitionWriter.h
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,10 @@ class PartitionWriter : public Reclaimable {
return payloadPool_->bytes_allocated();
}

virtual bool enableTypeAwareCompress() const {
return false;
}

protected:
uint32_t numPartitions_;
std::unique_ptr<arrow::util::Codec> codec_;
Expand Down
129 changes: 111 additions & 18 deletions cpp/core/shuffle/Payload.cc
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
#include "shuffle/Utils.h"
#include "utils/Exception.h"
#include "utils/Timer.h"
#include "utils/tac/TypeAwareCompressCodec.h"

namespace gluten {
namespace {
Expand All @@ -36,6 +37,7 @@ static const Payload::Type kUncompressedType = gluten::BlockPayload::kUncompress
static constexpr int64_t kZeroLengthBuffer = 0;
static constexpr int64_t kNullBuffer = -1;
static constexpr int64_t kUncompressedBuffer = -2;
static constexpr int64_t kTypeAwareBuffer = -3;

template <typename T>
void write(uint8_t** dst, T data) {
Expand Down Expand Up @@ -86,6 +88,51 @@ arrow::Result<int64_t> compressBuffer(
return kCompressedBufferHeaderLength + compressedLength;
}

// Type-aware buffer compression via TypeAwareCompressCodec.
// Same wire format as compressBuffer:
// kTypeAwareBuffer (int64) | uncompressedLength (int64) | compressedLength (int64) | compressed data
// If compressed size >= uncompressed size, falls back to kUncompressedBuffer (same as standard codec).
arrow::Result<int64_t> compressTypeAwareBuffer(
const std::shared_ptr<arrow::Buffer>& buffer,
uint8_t* output,
int64_t outputLength,
int8_t typeKind) {
auto outputPtr = &output;
if (!buffer) {
write<int64_t>(outputPtr, kNullBuffer);
return sizeof(int64_t);
}
if (buffer->size() == 0) {
write<int64_t>(outputPtr, kZeroLengthBuffer);
return sizeof(int64_t);
}

static const int64_t kHeaderLength = 3 * sizeof(int64_t); // marker + uncompressedLen + compressedLen
if (outputLength < kHeaderLength + buffer->size()) {
return arrow::Status::Invalid("Output buffer too small for type-aware compression.");
}
auto* dataOutput = output + kHeaderLength;
auto availableOutput = outputLength - kHeaderLength;

ARROW_ASSIGN_OR_RAISE(
auto compressedSize,
TypeAwareCompressCodec::compress(buffer->data(), buffer->size(), dataOutput, availableOutput, typeKind));

if (compressedSize >= buffer->size()) {
// Compression didn't help. Fall back to uncompressed, same as compressBuffer.
write<int64_t>(outputPtr, kUncompressedBuffer);
write(outputPtr, static_cast<int64_t>(buffer->size()));
memcpy(*outputPtr, buffer->data(), buffer->size());
return 2 * sizeof(int64_t) + buffer->size();
}

write<int64_t>(outputPtr, kTypeAwareBuffer);
write(outputPtr, static_cast<int64_t>(buffer->size()));
write(outputPtr, static_cast<int64_t>(compressedSize));
// compressed data already written at dataOutput by TypeAwareCompressCodec::compress.
return kHeaderLength + compressedSize;
}

arrow::Status compressAndFlush(
const std::shared_ptr<arrow::Buffer>& buffer,
arrow::io::OutputStream* outputStream,
Expand Down Expand Up @@ -146,6 +193,24 @@ arrow::Result<std::shared_ptr<arrow::Buffer>> readCompressedBuffer(

int64_t uncompressedLength;
RETURN_NOT_OK(inputStream->Read(sizeof(int64_t), &uncompressedLength));

if (compressedLength == kTypeAwareBuffer) {
// Type-aware compressed buffer. This marker only appears when compression helped.
// Wire format: compressedLength (int64) already consumed above as kTypeAwareBuffer,
// then uncompressedLength (already read), then actualCompressedLen, then data.
int64_t actualCompressedLen;
RETURN_NOT_OK(inputStream->Read(sizeof(int64_t), &actualCompressedLen));
ARROW_ASSIGN_OR_RAISE(auto compressed, arrow::AllocateResizableBuffer(actualCompressedLen, pool));
RETURN_NOT_OK(inputStream->Read(actualCompressedLen, compressed->mutable_data()));

timer.switchTo(&decompressTime);
ARROW_ASSIGN_OR_RAISE(auto output, arrow::AllocateResizableBuffer(uncompressedLength, pool));
RETURN_NOT_OK(TypeAwareCompressCodec::decompress(
compressed->data(), actualCompressedLen, output->mutable_data(), uncompressedLength)
.status());
return output;
}

if (compressedLength == kUncompressedBuffer) {
ARROW_ASSIGN_OR_RAISE(auto uncompressed, arrow::AllocateResizableBuffer(uncompressedLength, pool));
RETURN_NOT_OK(inputStream->Read(uncompressedLength, uncompressed->mutable_data()));
Expand Down Expand Up @@ -185,25 +250,38 @@ arrow::Result<std::unique_ptr<BlockPayload>> BlockPayload::fromBuffers(
std::vector<std::shared_ptr<arrow::Buffer>> buffers,
const std::vector<bool>* isValidityBuffer,
arrow::MemoryPool* pool,
arrow::util::Codec* codec) {
arrow::util::Codec* codec,
const std::vector<int8_t>* bufferTypes) {
const uint32_t numBuffers = buffers.size();

if (payloadType == Payload::Type::kCompressed) {
Timer compressionTime;
compressionTime.start();
// Compress.
auto maxLength = maxCompressedLength(buffers, codec);
std::shared_ptr<arrow::Buffer> compressedBuffer;

// Compute max compressed length, accounting for type-aware compression where applicable.
auto maxLength = maxCompressedLength(buffers, codec, bufferTypes);

std::shared_ptr<arrow::Buffer> compressedBuffer;
ARROW_ASSIGN_OR_RAISE(compressedBuffer, arrow::AllocateResizableBuffer(maxLength, pool));
auto* output = compressedBuffer->mutable_data();

int64_t actualLength = 0;
// Compress buffers one by one.
for (auto& buffer : buffers) {
for (size_t i = 0; i < buffers.size(); ++i) {
auto availableLength = maxLength - actualLength;
// Release buffer after compression.
ARROW_ASSIGN_OR_RAISE(auto compressedSize, compressBuffer(std::move(buffer), output, availableLength, codec));
auto typeKind =
(bufferTypes != nullptr && i < bufferTypes->size()) ? (*bufferTypes)[i] : tac::kUnsupported;

int64_t compressedSize = 0;
if (TypeAwareCompressCodec::support(typeKind)) {
// Use type-aware compression for supported types.
ARROW_ASSIGN_OR_RAISE(
compressedSize, compressTypeAwareBuffer(std::move(buffers[i]), output, availableLength, typeKind));
} else {
// Use standard codec (LZ4/ZSTD) for unsupported types.
ARROW_ASSIGN_OR_RAISE(
compressedSize, compressBuffer(std::move(buffers[i]), output, availableLength, codec));
}
output += compressedSize;
actualLength += compressedSize;
}
Expand Down Expand Up @@ -327,16 +405,29 @@ int64_t BlockPayload::rawSize() {

int64_t BlockPayload::maxCompressedLength(
const std::vector<std::shared_ptr<arrow::Buffer>>& buffers,
arrow::util::Codec* codec) {
arrow::util::Codec* codec,
const std::vector<int8_t>* bufferTypes) {
// Compressed buffer layout: | buffer1 compressedLength | buffer1 uncompressedLength | buffer1 | ...
const auto metadataLength = sizeof(int64_t) * 2 * buffers.size();
int64_t totalCompressedLength =
std::accumulate(buffers.begin(), buffers.end(), 0LL, [&](auto sum, const auto& buffer) {
if (!buffer) {
return sum;
}
return sum + codec->MaxCompressedLen(buffer->size(), buffer->data());
});
int64_t metadataLength = sizeof(int64_t) * 2 * buffers.size();
int64_t totalCompressedLength = 0;
for (size_t i = 0; i < buffers.size(); ++i) {
const auto& buffer = buffers[i];
if (!buffer) {
continue;
}
if (bufferTypes != nullptr && i < bufferTypes->size()) {
auto typeKind = (*bufferTypes)[i];
if (TypeAwareCompressCodec::support(typeKind)) {
// Type-aware compressed buffer has an extra int64 marker to indicate type-aware compression.
// buffer layout: | kTypeAwareBuffer (int64) | buffer 1 uncompressedLength | buffer 1 compressedLength | buffer 1 | ...
metadataLength += sizeof(int64_t);
totalCompressedLength += TypeAwareCompressCodec::maxCompressedLen(buffer->size(), typeKind);
continue;
}
}
// Standard codec: compressed data.
totalCompressedLength += codec->MaxCompressedLen(buffer->size(), buffer->data());
}
return metadataLength + totalCompressedLength;
}

Expand Down Expand Up @@ -413,12 +504,14 @@ arrow::Result<std::unique_ptr<InMemoryPayload>> InMemoryPayload::merge(
}
}
}
return std::make_unique<InMemoryPayload>(mergedRows, isValidityBuffer, source->schema(), std::move(merged));
return std::make_unique<InMemoryPayload>(
mergedRows, isValidityBuffer, source->schema(), std::move(merged), false, source->bufferTypes_);
}

arrow::Result<std::unique_ptr<BlockPayload>>
InMemoryPayload::toBlockPayload(Payload::Type payloadType, arrow::MemoryPool* pool, arrow::util::Codec* codec) {
return BlockPayload::fromBuffers(payloadType, numRows_, std::move(buffers_), isValidityBuffer_, pool, codec);
return BlockPayload::fromBuffers(
payloadType, numRows_, std::move(buffers_), isValidityBuffer_, pool, codec, bufferTypes_);
}

arrow::Status InMemoryPayload::serialize(arrow::io::OutputStream* outputStream) {
Expand Down
13 changes: 9 additions & 4 deletions cpp/core/shuffle/Payload.h
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,8 @@ class BlockPayload final : public Payload {
std::vector<std::shared_ptr<arrow::Buffer>> buffers,
const std::vector<bool>* isValidityBuffer,
arrow::MemoryPool* pool,
arrow::util::Codec* codec);
arrow::util::Codec* codec,
const std::vector<int8_t>* bufferTypes = nullptr);

static arrow::Result<std::vector<std::shared_ptr<arrow::Buffer>>> deserialize(
arrow::io::InputStream* inputStream,
Expand All @@ -91,7 +92,8 @@ class BlockPayload final : public Payload {

static int64_t maxCompressedLength(
const std::vector<std::shared_ptr<arrow::Buffer>>& buffers,
arrow::util::Codec* codec);
arrow::util::Codec* codec,
const std::vector<int8_t>* bufferTypes = nullptr);

arrow::Status serialize(arrow::io::OutputStream* outputStream) override;

Expand Down Expand Up @@ -121,11 +123,13 @@ class InMemoryPayload final : public Payload {
const std::vector<bool>* isValidityBuffer,
const std::shared_ptr<arrow::Schema>& schema,
std::vector<std::shared_ptr<arrow::Buffer>> buffers,
bool hasComplexType = false)
bool hasComplexType = false,
const std::vector<int8_t>* bufferTypes = nullptr)
: Payload(Type::kUncompressed, numRows, isValidityBuffer),
schema_(schema),
buffers_(std::move(buffers)),
hasComplexType_(hasComplexType) {}
hasComplexType_(hasComplexType),
bufferTypes_(bufferTypes) {}

static arrow::Result<std::unique_ptr<InMemoryPayload>>
merge(std::unique_ptr<InMemoryPayload> source, std::unique_ptr<InMemoryPayload> append, arrow::MemoryPool* pool);
Expand Down Expand Up @@ -155,6 +159,7 @@ class InMemoryPayload final : public Payload {
std::shared_ptr<arrow::Schema> schema_;
std::vector<std::shared_ptr<arrow::Buffer>> buffers_;
bool hasComplexType_;
const std::vector<int8_t>* bufferTypes_;
};

class UncompressedDiskBlockPayload final : public Payload {
Expand Down
1 change: 1 addition & 0 deletions cpp/core/tests/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -16,3 +16,4 @@
add_test_case(round_robin_partitioner_test SOURCES RoundRobinPartitionerTest.cc)
add_test_case(object_store_test SOURCES ObjectStoreTest.cc)
add_test_case(memory_allocator_test SOURCES MemoryAllocatorTest.cc)
add_test_case(ffor_codec_test SOURCES FForCodecTest.cc)
Loading
Loading