From 2b78e35358bc25b9a02240f095616bce38ff5277 Mon Sep 17 00:00:00 2001 From: Mohammad Linjawi Date: Wed, 1 Apr 2026 19:10:53 +0300 Subject: [PATCH 1/4] [VL] Normalize Spark GMT session timezone before native validation --- .../apache/gluten/execution/MiscOperatorSuite.scala | 8 ++++++++ cpp/core/config/GlutenConfig.cc | 11 +++++++++++ cpp/core/config/GlutenConfig.h | 3 +++ cpp/velox/compute/WholeStageResultIterator.cc | 3 ++- cpp/velox/jni/VeloxJniWrapper.cc | 5 +++-- cpp/velox/substrait/SubstraitToVeloxPlanValidator.h | 11 +++++++++-- cpp/velox/utils/VeloxWriterUtils.cc | 4 +++- 7 files changed, 39 insertions(+), 6 deletions(-) diff --git a/backends-velox/src/test/scala/org/apache/gluten/execution/MiscOperatorSuite.scala b/backends-velox/src/test/scala/org/apache/gluten/execution/MiscOperatorSuite.scala index f5196cb8c0c2..6bdd6103cf42 100644 --- a/backends-velox/src/test/scala/org/apache/gluten/execution/MiscOperatorSuite.scala +++ b/backends-velox/src/test/scala/org/apache/gluten/execution/MiscOperatorSuite.scala @@ -1861,6 +1861,14 @@ class MiscOperatorSuite extends VeloxWholeStageTransformerSuite with AdaptiveSpa assert(plan2.find(_.isInstanceOf[ProjectExecTransformer]).isDefined) } + test("cast date to timestamp with GMT session timezone") { + withSQLConf(SQLConf.SESSION_LOCAL_TIMEZONE.key -> "GMT") { + runQueryAndCompare("SELECT cast(date'2023-01-02 01:01:01' as timestamp) as ts") { + checkGlutenPlan[ProjectExecTransformer] + } + } + } + test("cast timestamp to date") { val query = "select cast(ts as date) from values (timestamp'2024-01-01 00:00:00') as tab(ts)" runQueryAndCompare(query) { diff --git a/cpp/core/config/GlutenConfig.cc b/cpp/core/config/GlutenConfig.cc index 0afd458ee636..eb98f6bb9036 100644 --- a/cpp/core/config/GlutenConfig.cc +++ b/cpp/core/config/GlutenConfig.cc @@ -49,6 +49,17 @@ parseConfMap(JNIEnv* env, const uint8_t* planData, const int32_t planDataLength) return sparkConfs; } +std::string normalizeSessionTimezone(std::string_view sessionTimezone) { + if (sessionTimezone == "GMT") { + return "UTC"; + } + if (sessionTimezone.size() > 3 && sessionTimezone.substr(0, 3) == "GMT" && + (sessionTimezone[3] == '+' || sessionTimezone[3] == '-')) { + return std::string("UTC").append(sessionTimezone.substr(3)); + } + return std::string(sessionTimezone); +} + std::string printConfig(const std::unordered_map& conf) { std::ostringstream oss; oss << std::endl; diff --git a/cpp/core/config/GlutenConfig.h b/cpp/core/config/GlutenConfig.h index a082a720eadd..2b8ba54595ba 100644 --- a/cpp/core/config/GlutenConfig.h +++ b/cpp/core/config/GlutenConfig.h @@ -20,6 +20,7 @@ #include #include #include +#include #include namespace gluten { @@ -102,5 +103,7 @@ const std::string kDebugCudfDefault = "false"; std::unordered_map parseConfMap(JNIEnv* env, const uint8_t* planData, const int32_t planDataLength); +std::string normalizeSessionTimezone(std::string_view sessionTimezone); + std::string printConfig(const std::unordered_map& conf); } // namespace gluten diff --git a/cpp/velox/compute/WholeStageResultIterator.cc b/cpp/velox/compute/WholeStageResultIterator.cc index 3c0505263159..1806bfdd623a 100644 --- a/cpp/velox/compute/WholeStageResultIterator.cc +++ b/cpp/velox/compute/WholeStageResultIterator.cc @@ -575,7 +575,8 @@ std::unordered_map WholeStageResultIterator::getQueryC std::to_string(veloxCfg_->get(kVeloxPreferredBatchBytes, 10L << 20)); try { configs[velox::core::QueryConfig::kSparkAnsiEnabled] = veloxCfg_->get(kAnsiEnabled, "false"); - configs[velox::core::QueryConfig::kSessionTimezone] = veloxCfg_->get(kSessionTimezone, ""); + configs[velox::core::QueryConfig::kSessionTimezone] = + normalizeSessionTimezone(veloxCfg_->get(kSessionTimezone, "")); // Adjust timestamp according to the above configured session timezone. configs[velox::core::QueryConfig::kAdjustTimestampToTimezone] = "true"; diff --git a/cpp/velox/jni/VeloxJniWrapper.cc b/cpp/velox/jni/VeloxJniWrapper.cc index 4705a646e2d6..b76dd22f2a7f 100644 --- a/cpp/velox/jni/VeloxJniWrapper.cc +++ b/cpp/velox/jni/VeloxJniWrapper.cc @@ -169,7 +169,7 @@ Java_org_apache_gluten_vectorized_PlanEvaluatorJniWrapper_nativeValidateWithFail } const auto pool = defaultLeafVeloxMemoryPool().get(); - SubstraitToVeloxPlanValidator planValidator(pool); + SubstraitToVeloxPlanValidator planValidator(pool, ctx->getConfMap()); ::substrait::Plan subPlan; parseProtobuf(planData, planSize, &subPlan); @@ -226,8 +226,9 @@ JNIEXPORT jboolean JNICALL Java_org_apache_gluten_vectorized_PlanEvaluatorJniWra env->DeleteLocalRef(mapping); } + const auto ctx = getRuntime(env, wrapper); auto pool = defaultLeafVeloxMemoryPool().get(); - SubstraitToVeloxPlanValidator planValidator(pool); + SubstraitToVeloxPlanValidator planValidator(pool, ctx->getConfMap()); auto inputType = SubstraitParser::parseType(inputSubstraitType); if (inputType->kind() != TypeKind::ROW) { throw GlutenException("Input type is not a RowType."); diff --git a/cpp/velox/substrait/SubstraitToVeloxPlanValidator.h b/cpp/velox/substrait/SubstraitToVeloxPlanValidator.h index 8afc7c5bf8b2..03d443fad720 100644 --- a/cpp/velox/substrait/SubstraitToVeloxPlanValidator.h +++ b/cpp/velox/substrait/SubstraitToVeloxPlanValidator.h @@ -18,6 +18,7 @@ #pragma once #include +#include "config/GlutenConfig.h" #include "SubstraitToVeloxPlan.h" #include "velox/core/QueryCtx.h" @@ -29,9 +30,15 @@ namespace gluten { /// a Substrait plan is supported in Velox. class SubstraitToVeloxPlanValidator { public: - SubstraitToVeloxPlanValidator(memory::MemoryPool* pool) { + explicit SubstraitToVeloxPlanValidator( + memory::MemoryPool* pool, + const std::unordered_map& confMap = {}) { + const auto it = confMap.find(kSessionTimezone); + const auto sessionTimezone = + normalizeSessionTimezone(it == confMap.end() ? std::string_view("UTC") : std::string_view(it->second)); std::unordered_map configs{ - {velox::core::QueryConfig::kSparkPartitionId, "0"}, {velox::core::QueryConfig::kSessionTimezone, "GMT"}}; + {velox::core::QueryConfig::kSparkPartitionId, "0"}, + {velox::core::QueryConfig::kSessionTimezone, sessionTimezone}}; veloxCfg_ = std::make_shared(std::move(configs)); planConverter_ = std::make_unique( pool, veloxCfg_.get(), std::vector>{}, std::nullopt, std::nullopt, true); diff --git a/cpp/velox/utils/VeloxWriterUtils.cc b/cpp/velox/utils/VeloxWriterUtils.cc index 026418a223c4..bd3cca685411 100644 --- a/cpp/velox/utils/VeloxWriterUtils.cc +++ b/cpp/velox/utils/VeloxWriterUtils.cc @@ -89,7 +89,9 @@ std::unique_ptr makeParquetWriteOption(const std::unordered_mapflushPolicyFactory = [maxRowGroupRows, maxRowGroupBytes]() { return std::make_unique(maxRowGroupRows, maxRowGroupBytes, [&]() { return false; }); }; - writeOption->parquetWriteTimestampTimeZone = getConfigValue(sparkConfs, kSessionTimezone, std::nullopt); + if (auto it = sparkConfs.find(kSessionTimezone); it != sparkConfs.end()) { + writeOption->parquetWriteTimestampTimeZone = normalizeSessionTimezone(it->second); + } writeOption->arrowMemoryPool = getDefaultMemoryManager()->getOrCreateArrowMemoryPool("VeloxParquetWrite.ArrowMemoryPool"); if (auto it = sparkConfs.find(kParquetDataPageSize); it != sparkConfs.end()) { From d9af892a1e3cda373b9c3c23f7a5241de831077b Mon Sep 17 00:00:00 2001 From: Mohammad Linjawi Date: Thu, 2 Apr 2026 12:45:36 +0300 Subject: [PATCH 2/4] [VL] Apply clang-format to timezone workaround changes --- cpp/velox/compute/WholeStageResultIterator.cc | 12 ++++++------ cpp/velox/jni/VeloxJniWrapper.cc | 4 ++-- cpp/velox/substrait/SubstraitToVeloxPlanValidator.h | 2 +- 3 files changed, 9 insertions(+), 9 deletions(-) diff --git a/cpp/velox/compute/WholeStageResultIterator.cc b/cpp/velox/compute/WholeStageResultIterator.cc index 1806bfdd623a..3f85dddb83f4 100644 --- a/cpp/velox/compute/WholeStageResultIterator.cc +++ b/cpp/velox/compute/WholeStageResultIterator.cc @@ -25,14 +25,13 @@ #include "velox/exec/PlanNodeStats.h" #ifdef GLUTEN_ENABLE_GPU #include +#include "cudf/GpuLock.h" #include "velox/experimental/cudf/CudfConfig.h" #include "velox/experimental/cudf/connectors/hive/CudfHiveConnectorSplit.h" #include "velox/experimental/cudf/exec/ToCudf.h" -#include "cudf/GpuLock.h" #endif #include "operators/plannodes/RowVectorStream.h" - using namespace facebook; namespace gluten { @@ -358,14 +357,15 @@ void WholeStageResultIterator::constructPartitionColumns( } void WholeStageResultIterator::addIteratorSplits(const std::vector>& inputIterators) { - GLUTEN_CHECK(!allSplitsAdded_, "Method addIteratorSplits should not be called since all splits has been added to the Velox task."); + GLUTEN_CHECK( + !allSplitsAdded_, + "Method addIteratorSplits should not be called since all splits has been added to the Velox task."); // Create IteratorConnectorSplit for each iterator for (size_t i = 0; i < streamIds_.size() && i < inputIterators.size(); ++i) { if (inputIterators[i] == nullptr) { continue; } - auto connectorSplit = std::make_shared( - kIteratorConnectorId, inputIterators[i]); + auto connectorSplit = std::make_shared(kIteratorConnectorId, inputIterators[i]); exec::Split split(folly::copy(connectorSplit), -1); task_->addSplit(streamIds_[i], std::move(split)); } @@ -385,7 +385,7 @@ void WholeStageResultIterator::noMoreSplits() { for (const auto& scanNodeId : scanNodeIds_) { task_->noMoreSplits(scanNodeId); } - + // Mark no more splits for all stream nodes for (const auto& streamId : streamIds_) { task_->noMoreSplits(streamId); diff --git a/cpp/velox/jni/VeloxJniWrapper.cc b/cpp/velox/jni/VeloxJniWrapper.cc index b76dd22f2a7f..0fe8efeccb92 100644 --- a/cpp/velox/jni/VeloxJniWrapper.cc +++ b/cpp/velox/jni/VeloxJniWrapper.cc @@ -462,8 +462,8 @@ JNIEXPORT jlong JNICALL Java_org_apache_gluten_utils_GpuBufferBatchResizerJniWra auto arrowPool = dynamic_cast(ctx->memoryManager())->defaultArrowMemoryPool(); auto pool = dynamic_cast(ctx->memoryManager())->getLeafMemoryPool(); auto iter = makeJniColumnarBatchIterator(env, jIter, ctx); - auto appender = std::make_shared( - std::make_unique(arrowPool, pool.get(), minOutputBatchSize, maxPrefetchBatchBytes, std::move(iter))); + auto appender = std::make_shared(std::make_unique( + arrowPool, pool.get(), minOutputBatchSize, maxPrefetchBatchBytes, std::move(iter))); return ctx->saveObject(appender); JNI_METHOD_END(kInvalidObjectHandle) } diff --git a/cpp/velox/substrait/SubstraitToVeloxPlanValidator.h b/cpp/velox/substrait/SubstraitToVeloxPlanValidator.h index 03d443fad720..3e12942d6e67 100644 --- a/cpp/velox/substrait/SubstraitToVeloxPlanValidator.h +++ b/cpp/velox/substrait/SubstraitToVeloxPlanValidator.h @@ -18,8 +18,8 @@ #pragma once #include -#include "config/GlutenConfig.h" #include "SubstraitToVeloxPlan.h" +#include "config/GlutenConfig.h" #include "velox/core/QueryCtx.h" using namespace facebook; From 7b07767bb675a8783f08396c5ae94405082b08fb Mon Sep 17 00:00:00 2001 From: Mohammad Linjawi Date: Thu, 9 Apr 2026 21:15:34 +0300 Subject: [PATCH 3/4] [VL] Address review feedback on timezone workaround --- cpp/velox/compute/WholeStageResultIterator.cc | 12 ++++++------ cpp/velox/jni/VeloxJniWrapper.cc | 9 ++++----- cpp/velox/substrait/SubstraitToVeloxPlanValidator.h | 10 ++-------- 3 files changed, 12 insertions(+), 19 deletions(-) diff --git a/cpp/velox/compute/WholeStageResultIterator.cc b/cpp/velox/compute/WholeStageResultIterator.cc index 3f85dddb83f4..1806bfdd623a 100644 --- a/cpp/velox/compute/WholeStageResultIterator.cc +++ b/cpp/velox/compute/WholeStageResultIterator.cc @@ -25,13 +25,14 @@ #include "velox/exec/PlanNodeStats.h" #ifdef GLUTEN_ENABLE_GPU #include -#include "cudf/GpuLock.h" #include "velox/experimental/cudf/CudfConfig.h" #include "velox/experimental/cudf/connectors/hive/CudfHiveConnectorSplit.h" #include "velox/experimental/cudf/exec/ToCudf.h" +#include "cudf/GpuLock.h" #endif #include "operators/plannodes/RowVectorStream.h" + using namespace facebook; namespace gluten { @@ -357,15 +358,14 @@ void WholeStageResultIterator::constructPartitionColumns( } void WholeStageResultIterator::addIteratorSplits(const std::vector>& inputIterators) { - GLUTEN_CHECK( - !allSplitsAdded_, - "Method addIteratorSplits should not be called since all splits has been added to the Velox task."); + GLUTEN_CHECK(!allSplitsAdded_, "Method addIteratorSplits should not be called since all splits has been added to the Velox task."); // Create IteratorConnectorSplit for each iterator for (size_t i = 0; i < streamIds_.size() && i < inputIterators.size(); ++i) { if (inputIterators[i] == nullptr) { continue; } - auto connectorSplit = std::make_shared(kIteratorConnectorId, inputIterators[i]); + auto connectorSplit = std::make_shared( + kIteratorConnectorId, inputIterators[i]); exec::Split split(folly::copy(connectorSplit), -1); task_->addSplit(streamIds_[i], std::move(split)); } @@ -385,7 +385,7 @@ void WholeStageResultIterator::noMoreSplits() { for (const auto& scanNodeId : scanNodeIds_) { task_->noMoreSplits(scanNodeId); } - + // Mark no more splits for all stream nodes for (const auto& streamId : streamIds_) { task_->noMoreSplits(streamId); diff --git a/cpp/velox/jni/VeloxJniWrapper.cc b/cpp/velox/jni/VeloxJniWrapper.cc index 0fe8efeccb92..4705a646e2d6 100644 --- a/cpp/velox/jni/VeloxJniWrapper.cc +++ b/cpp/velox/jni/VeloxJniWrapper.cc @@ -169,7 +169,7 @@ Java_org_apache_gluten_vectorized_PlanEvaluatorJniWrapper_nativeValidateWithFail } const auto pool = defaultLeafVeloxMemoryPool().get(); - SubstraitToVeloxPlanValidator planValidator(pool, ctx->getConfMap()); + SubstraitToVeloxPlanValidator planValidator(pool); ::substrait::Plan subPlan; parseProtobuf(planData, planSize, &subPlan); @@ -226,9 +226,8 @@ JNIEXPORT jboolean JNICALL Java_org_apache_gluten_vectorized_PlanEvaluatorJniWra env->DeleteLocalRef(mapping); } - const auto ctx = getRuntime(env, wrapper); auto pool = defaultLeafVeloxMemoryPool().get(); - SubstraitToVeloxPlanValidator planValidator(pool, ctx->getConfMap()); + SubstraitToVeloxPlanValidator planValidator(pool); auto inputType = SubstraitParser::parseType(inputSubstraitType); if (inputType->kind() != TypeKind::ROW) { throw GlutenException("Input type is not a RowType."); @@ -462,8 +461,8 @@ JNIEXPORT jlong JNICALL Java_org_apache_gluten_utils_GpuBufferBatchResizerJniWra auto arrowPool = dynamic_cast(ctx->memoryManager())->defaultArrowMemoryPool(); auto pool = dynamic_cast(ctx->memoryManager())->getLeafMemoryPool(); auto iter = makeJniColumnarBatchIterator(env, jIter, ctx); - auto appender = std::make_shared(std::make_unique( - arrowPool, pool.get(), minOutputBatchSize, maxPrefetchBatchBytes, std::move(iter))); + auto appender = std::make_shared( + std::make_unique(arrowPool, pool.get(), minOutputBatchSize, maxPrefetchBatchBytes, std::move(iter))); return ctx->saveObject(appender); JNI_METHOD_END(kInvalidObjectHandle) } diff --git a/cpp/velox/substrait/SubstraitToVeloxPlanValidator.h b/cpp/velox/substrait/SubstraitToVeloxPlanValidator.h index 3e12942d6e67..4284d2fdb19e 100644 --- a/cpp/velox/substrait/SubstraitToVeloxPlanValidator.h +++ b/cpp/velox/substrait/SubstraitToVeloxPlanValidator.h @@ -19,7 +19,6 @@ #include #include "SubstraitToVeloxPlan.h" -#include "config/GlutenConfig.h" #include "velox/core/QueryCtx.h" using namespace facebook; @@ -30,15 +29,10 @@ namespace gluten { /// a Substrait plan is supported in Velox. class SubstraitToVeloxPlanValidator { public: - explicit SubstraitToVeloxPlanValidator( - memory::MemoryPool* pool, - const std::unordered_map& confMap = {}) { - const auto it = confMap.find(kSessionTimezone); - const auto sessionTimezone = - normalizeSessionTimezone(it == confMap.end() ? std::string_view("UTC") : std::string_view(it->second)); + SubstraitToVeloxPlanValidator(memory::MemoryPool* pool) { std::unordered_map configs{ {velox::core::QueryConfig::kSparkPartitionId, "0"}, - {velox::core::QueryConfig::kSessionTimezone, sessionTimezone}}; + {velox::core::QueryConfig::kSessionTimezone, "UTC"}}; veloxCfg_ = std::make_shared(std::move(configs)); planConverter_ = std::make_unique( pool, veloxCfg_.get(), std::vector>{}, std::nullopt, std::nullopt, true); From fea81aab1b2d63840250bdabeb174c42b6a1ece9 Mon Sep 17 00:00:00 2001 From: Mohammad Linjawi Date: Thu, 9 Apr 2026 21:25:10 +0300 Subject: [PATCH 4/4] [VL] Fix code format in timezone validator --- cpp/velox/substrait/SubstraitToVeloxPlanValidator.h | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/cpp/velox/substrait/SubstraitToVeloxPlanValidator.h b/cpp/velox/substrait/SubstraitToVeloxPlanValidator.h index 4284d2fdb19e..6bfca5ec36b8 100644 --- a/cpp/velox/substrait/SubstraitToVeloxPlanValidator.h +++ b/cpp/velox/substrait/SubstraitToVeloxPlanValidator.h @@ -31,8 +31,7 @@ class SubstraitToVeloxPlanValidator { public: SubstraitToVeloxPlanValidator(memory::MemoryPool* pool) { std::unordered_map configs{ - {velox::core::QueryConfig::kSparkPartitionId, "0"}, - {velox::core::QueryConfig::kSessionTimezone, "UTC"}}; + {velox::core::QueryConfig::kSparkPartitionId, "0"}, {velox::core::QueryConfig::kSessionTimezone, "UTC"}}; veloxCfg_ = std::make_shared(std::move(configs)); planConverter_ = std::make_unique( pool, veloxCfg_.get(), std::vector>{}, std::nullopt, std::nullopt, true);