Skip to content

Commit 18323c3

Browse files
feat(native): Add endpoint for expression optimization in sidecar (#26475)
## Description To support constant folding and consistent semantics between the Presto Java coordinator and the Presto C++ worker, it is necessary to use consistent expression evaluation. To support this, a native expression evaluation endpoint, `v1/expressions`, has been added to the Presto C++ sidecar. This endpoint leverages the `ExprOptimizer` in Velox to optimize Presto expressions. The optimized Velox `core::TypedExpr` returned by Velox's `ExprOptimizer` is converted to a Presto `protocol::RowExpression` in the Presto native sidecar with helper class `VeloxToPrestoExprConverter`. The end to end flow between the coordinator and sidecar is as follows: ``` RowExpression == (PrestoToVeloxExpr) ==> TypedExpr == (Velox ExprOptimizer optimize() API) ==> optimized TypedExpr == (VeloxToPrestoExprConverter) ==> optimized RowExpression ``` When `OptimizerLevel` is `EVALUATED` and an error is encountered during expression evaluation, it is converted to `NativeSidecarFailureInfo` with presto protocol. The list of optimized `RowExpression`s and `NativeSidecarFailureInfo`s is returned along with their corresponding indices in the input expressions list. With the fuzzer testing (see test plan), we expect this endpoint to be ready for production workloads. ## Motivation and Context The `native-sidecar-plugin` will implement the `ExpressionOptimizer` interface from Presto SPI to utilize the `v1/expressions` endpoint on the sidecar for optimizing Presto expressions using the native expression evaluation engine. The primary goal is to achieve consistency between C++ and Java semantics for expression optimization. With this change, C++ functions including UDFs can be used for constant folding of expressions in the Presto planner. Please refer to [RFC-0006](https://github.com/prestodb/rfcs/blob/main/RFC-0006-expression-eval.md). ## Test Plan Tests have been added by abstracting testcases from `TestRowExpressionInterpreter` to an interface `AbstractTestExpressionInterpreter`. This test interface is implemented in `TestNativeExpressionInterpreter` to test the `v1/expressions` endpoint on the sidecar end to end. This feature is still in Beta, and to support production workloads with complete certainty, the Velox expression fuzzer will be extended to test this endpoint with fuzzer generated expressions in a follow-up PR. This should surface any remaining bugs, such as in `VeloxToPrestoExpr`. Note: The Velox `ExprOptimizer` and logical rewrites are already tested with fuzzer. ## Release Notes ``` Prestissimo (Native Execution) Changes * Add http endpoint `v1/expressions` in sidecar for expression optimization. See :doc:`/presto_cpp/sidecar`. ``` --------- Co-authored-by: Timothy Meehan <[email protected]>
1 parent 4f4142a commit 18323c3

File tree

19 files changed

+3564
-1706
lines changed

19 files changed

+3564
-1706
lines changed

presto-docs/src/main/sphinx/presto_cpp/sidecar.rst

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -38,6 +38,14 @@ The following HTTP endpoints are implemented by the Presto C++ sidecar.
3838
validates the Velox plan. Returns any errors encountered during plan
3939
conversion.
4040

41+
.. function:: POST /v1/expressions
42+
43+
Optimizes a list of ``RowExpression``\s from the http request using
44+
a combination of constant folding and logical rewrites by leveraging
45+
the ``ExprOptimizer`` from Velox. Returns a list of ``RowExpressionOptimizationResult``,
46+
that contains either the optimized ``RowExpression`` or the ``NativeSidecarFailureInfo``
47+
in case the expression optimization failed.
48+
4149
Configuration Properties
4250
------------------------
4351

presto-main-base/src/test/java/com/facebook/presto/sql/TestExpressionInterpreter.java

Lines changed: 49 additions & 1654 deletions
Large diffs are not rendered by default.

presto-main-base/src/test/java/com/facebook/presto/sql/expressions/AbstractTestExpressionInterpreter.java

Lines changed: 1711 additions & 0 deletions
Large diffs are not rendered by default.

presto-native-execution/presto_cpp/main/CMakeLists.txt

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -61,6 +61,7 @@ target_link_libraries(
6161
$<TARGET_OBJECTS:presto_protocol>
6262
presto_common
6363
presto_exception
64+
presto_expression_optimizer
6465
presto_function_metadata
6566
presto_connectors
6667
presto_http

presto-native-execution/presto_cpp/main/PrestoServer.cpp

Lines changed: 60 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@
1616
#include <boost/asio/ip/host_name.hpp>
1717
#include <boost/asio/ip/tcp.hpp>
1818
#include <glog/logging.h>
19+
#include <proxygen/lib/http/HTTPHeaders.h>
1920
#include "presto_cpp/main/Announcer.h"
2021
#include "presto_cpp/main/CoordinatorDiscoverer.h"
2122
#include "presto_cpp/main/PeriodicMemoryChecker.h"
@@ -42,6 +43,7 @@
4243
#include "presto_cpp/main/operators/ShuffleExchangeSource.h"
4344
#include "presto_cpp/main/operators/ShuffleRead.h"
4445
#include "presto_cpp/main/operators/ShuffleWrite.h"
46+
#include "presto_cpp/main/types/ExpressionOptimizer.h"
4547
#include "presto_cpp/main/types/PrestoToVeloxQueryPlan.h"
4648
#include "presto_cpp/main/types/VeloxPlanConversion.h"
4749
#include "velox/common/base/Counters.h"
@@ -100,6 +102,8 @@ constexpr char const* kTaskUriFormat =
100102
constexpr char const* kConnectorName = "connector.name";
101103
constexpr char const* kLinuxSharedLibExt = ".so";
102104
constexpr char const* kMacOSSharedLibExt = ".dylib";
105+
constexpr char const* kOptimized = "OPTIMIZED";
106+
constexpr char const* kEvaluated = "EVALUATED";
103107

104108
protocol::NodeState convertNodeState(presto::NodeState nodeState) {
105109
switch (nodeState) {
@@ -192,6 +196,50 @@ void unregisterVeloxCudf() {
192196
#endif
193197
}
194198

199+
json::array_t getOptimizedExpressions(
200+
const proxygen::HTTPHeaders& httpHeaders,
201+
const std::vector<std::unique_ptr<folly::IOBuf>>& body,
202+
folly::Executor* executor,
203+
velox::memory::MemoryPool* pool) {
204+
static constexpr char const* kOptimizerLevelHeader =
205+
"X-Presto-Expression-Optimizer-Level";
206+
const auto& optimizerLevelString =
207+
httpHeaders.getSingleOrEmpty(kOptimizerLevelHeader);
208+
VELOX_USER_CHECK(
209+
(optimizerLevelString == kOptimized) ||
210+
(optimizerLevelString == kEvaluated),
211+
"Optimizer level should be OPTIMIZED or EVALUATED, received {}.",
212+
optimizerLevelString);
213+
auto optimizerLevel = (optimizerLevelString == kOptimized)
214+
? expression::OptimizerLevel::kOptimized
215+
: expression::OptimizerLevel::kEvaluated;
216+
217+
static constexpr char const* kTimezoneHeader = "X-Presto-Time-Zone";
218+
const auto& timezone = httpHeaders.getSingleOrEmpty(kTimezoneHeader);
219+
std::unordered_map<std::string, std::string> config(
220+
{{velox::core::QueryConfig::kSessionTimezone, timezone},
221+
{velox::core::QueryConfig::kAdjustTimestampToTimezone, "true"}});
222+
auto queryConfig = velox::core::QueryConfig{std::move(config)};
223+
auto queryCtx =
224+
velox::core::QueryCtx::create(executor, std::move(queryConfig));
225+
226+
json input = json::parse(util::extractMessageBody(body));
227+
VELOX_USER_CHECK(input.is_array(), "Body of request should be a JSON array.");
228+
const json::array_t expressionList = static_cast<json::array_t>(input);
229+
std::vector<RowExpressionPtr> expressions;
230+
for (const auto& j : expressionList) {
231+
expressions.push_back(j);
232+
}
233+
const auto optimizedList = expression::optimizeExpressions(
234+
expressions, timezone, optimizerLevel, queryCtx.get(), pool);
235+
236+
json::array_t result;
237+
for (const auto& optimized : optimizedList) {
238+
result.push_back(optimized);
239+
}
240+
return result;
241+
}
242+
195243
} // namespace
196244

197245
std::string nodeState2String(NodeState nodeState) {
@@ -1727,6 +1775,18 @@ void PrestoServer::registerSidecarEndpoints() {
17271775
http::sendOkResponse(downstream, getFunctionsMetadata(catalog));
17281776
});
17291777
});
1778+
httpServer_->registerPost(
1779+
"/v1/expressions",
1780+
[this](
1781+
proxygen::HTTPMessage* message,
1782+
const std::vector<std::unique_ptr<folly::IOBuf>>& body,
1783+
proxygen::ResponseHandler* downstream) {
1784+
const auto& httpHeaders = message->getHeaders();
1785+
const auto result = getOptimizedExpressions(
1786+
httpHeaders, body, driverExecutor_.get(), nativeWorkerPool_.get());
1787+
http::sendOkResponse(downstream, result);
1788+
});
1789+
17301790
httpServer_->registerPost(
17311791
"/v1/velox/plan",
17321792
[server = this](

presto-native-execution/presto_cpp/main/types/CMakeLists.txt

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -37,6 +37,20 @@ set_property(TARGET presto_types PROPERTY JOB_POOL_LINK presto_link_job_pool)
3737
add_library(presto_velox_plan_conversion OBJECT VeloxPlanConversion.cpp)
3838
target_link_libraries(presto_velox_plan_conversion velox_type)
3939

40+
add_library(presto_velox_to_presto_expr VeloxToPrestoExpr.cpp)
41+
42+
target_link_libraries(
43+
presto_velox_to_presto_expr
44+
presto_exception
45+
presto_type_converter
46+
presto_types
47+
presto_protocol
48+
)
49+
50+
add_library(presto_expression_optimizer ExpressionOptimizer.cpp)
51+
52+
target_link_libraries(presto_expression_optimizer presto_types presto_velox_to_presto_expr)
53+
4054
if(PRESTO_ENABLE_TESTING)
4155
add_subdirectory(tests)
4256
endif()
Lines changed: 124 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,124 @@
1+
/*
2+
* Licensed under the Apache License, Version 2.0 (the "License");
3+
* you may not use this file except in compliance with the License.
4+
* You may obtain a copy of the License at
5+
*
6+
* http://www.apache.org/licenses/LICENSE-2.0
7+
*
8+
* Unless required by applicable law or agreed to in writing, software
9+
* distributed under the License is distributed on an "AS IS" BASIS,
10+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
11+
* See the License for the specific language governing permissions and
12+
* limitations under the License.
13+
*/
14+
15+
#include "presto_cpp/main/types/ExpressionOptimizer.h"
16+
#include "presto_cpp/main/common/Configs.h"
17+
#include "presto_cpp/main/common/Exception.h"
18+
#include "presto_cpp/main/http/HttpServer.h"
19+
#include "presto_cpp/main/types/PrestoToVeloxExpr.h"
20+
#include "presto_cpp/main/types/TypeParser.h"
21+
#include "presto_cpp/main/types/VeloxToPrestoExpr.h"
22+
#include "presto_cpp/presto_protocol/core/presto_protocol_core.h"
23+
#include "velox/core/Expressions.h"
24+
#include "velox/expression/Expr.h"
25+
#include "velox/expression/ExprOptimizer.h"
26+
27+
namespace facebook::presto::expression {
28+
29+
namespace {
30+
31+
static const velox::expression::MakeFailExpr kMakeFailExpr =
32+
[](const std::string& error,
33+
const velox::TypePtr& type) -> velox::core::TypedExprPtr {
34+
return std::make_shared<velox::core::CastTypedExpr>(
35+
type,
36+
std::vector<velox::core::TypedExprPtr>{
37+
std::make_shared<velox::core::CallTypedExpr>(
38+
velox::UNKNOWN(),
39+
std::vector<velox::core::TypedExprPtr>{
40+
std::make_shared<velox::core::ConstantTypedExpr>(
41+
velox::VARCHAR(), error)},
42+
fmt::format(
43+
"{}fail",
44+
SystemConfig::instance()->prestoDefaultNamespacePrefix()))},
45+
false);
46+
};
47+
48+
// Tries to evaluate `expr`, irrespective of its determinism, to a constant
49+
// value.
50+
velox::VectorPtr tryEvaluateToConstant(
51+
const velox::core::TypedExprPtr& expr,
52+
velox::core::QueryCtx* queryCtx,
53+
velox::memory::MemoryPool* pool) {
54+
auto data =
55+
velox::BaseVector::create<velox::RowVector>(velox::ROW({}), 1, pool);
56+
velox::core::ExecCtx execCtx{pool, queryCtx};
57+
velox::exec::ExprSet exprSet({expr}, &execCtx);
58+
velox::exec::EvalCtx evalCtx(&execCtx, &exprSet, data.get());
59+
60+
const velox::SelectivityVector singleRow(1);
61+
std::vector<velox::VectorPtr> results(1);
62+
exprSet.eval(singleRow, evalCtx, results);
63+
return results.at(0);
64+
}
65+
66+
protocol::RowExpressionOptimizationResult optimizeExpression(
67+
const RowExpressionPtr& input,
68+
OptimizerLevel& optimizerLevel,
69+
const VeloxExprConverter& prestoToVeloxConverter,
70+
const expression::VeloxToPrestoExprConverter& veloxToPrestoConverter,
71+
velox::core::QueryCtx* queryCtx,
72+
velox::memory::MemoryPool* pool) {
73+
protocol::RowExpressionOptimizationResult result;
74+
const auto expr = prestoToVeloxConverter.toVeloxExpr(input);
75+
auto optimized =
76+
velox::expression::optimize(expr, queryCtx, pool, kMakeFailExpr);
77+
78+
if (optimizerLevel == OptimizerLevel::kEvaluated) {
79+
try {
80+
const auto evalResult = tryEvaluateToConstant(optimized, queryCtx, pool);
81+
optimized = std::make_shared<velox::core::ConstantTypedExpr>(evalResult);
82+
} catch (const velox::VeloxException& e) {
83+
result.expressionFailureInfo =
84+
toNativeSidecarFailureInfo(translateToPrestoException(e));
85+
result.optimizedExpression = nullptr;
86+
return result;
87+
} catch (const std::exception& e) {
88+
result.expressionFailureInfo =
89+
toNativeSidecarFailureInfo(translateToPrestoException(e));
90+
result.optimizedExpression = nullptr;
91+
return result;
92+
}
93+
}
94+
95+
result.optimizedExpression =
96+
veloxToPrestoConverter.getRowExpression(optimized, input);
97+
return result;
98+
}
99+
100+
} // namespace
101+
102+
std::vector<protocol::RowExpressionOptimizationResult> optimizeExpressions(
103+
const std::vector<RowExpressionPtr>& input,
104+
const std::string& timezone,
105+
OptimizerLevel& optimizerLevel,
106+
velox::core::QueryCtx* queryCtx,
107+
velox::memory::MemoryPool* pool) {
108+
TypeParser typeParser;
109+
const VeloxExprConverter prestoToVeloxConverter(pool, &typeParser);
110+
const expression::VeloxToPrestoExprConverter veloxToPrestoConverter(pool);
111+
std::vector<protocol::RowExpressionOptimizationResult> result;
112+
for (const auto& expression : input) {
113+
result.push_back(optimizeExpression(
114+
expression,
115+
optimizerLevel,
116+
prestoToVeloxConverter,
117+
veloxToPrestoConverter,
118+
queryCtx,
119+
pool));
120+
}
121+
return result;
122+
}
123+
124+
} // namespace facebook::presto::expression
Lines changed: 55 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,55 @@
1+
/*
2+
* Licensed under the Apache License, Version 2.0 (the "License");
3+
* you may not use this file except in compliance with the License.
4+
* You may obtain a copy of the License at
5+
*
6+
* http://www.apache.org/licenses/LICENSE-2.0
7+
*
8+
* Unless required by applicable law or agreed to in writing, software
9+
* distributed under the License is distributed on an "AS IS" BASIS,
10+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
11+
* See the License for the specific language governing permissions and
12+
* limitations under the License.
13+
*/
14+
15+
#include "presto_cpp/external/json/nlohmann/json.hpp"
16+
#include "presto_cpp/presto_protocol/presto_protocol.h"
17+
#include "velox/common/memory/MemoryPool.h"
18+
#include "velox/core/QueryCtx.h"
19+
20+
using RowExpressionPtr =
21+
std::shared_ptr<facebook::presto::protocol::RowExpression>;
22+
23+
namespace facebook::presto::expression {
24+
25+
/// Optimizer level, which indicates the extent to which expressions should be
26+
/// optimized.
27+
/// TODO: This should be obtained from Presto protocol after refactoring the
28+
/// enum from ExpressionOptimizer in Presto SPI.
29+
enum class OptimizerLevel {
30+
/// Removes all redundancy in a RowExpression using the ExpressionOptimizer in
31+
/// Velox.
32+
kOptimized = 0,
33+
/// Attempts to evaluate the RowExpression into a constant, even when the
34+
/// expression is non-deterministic.
35+
kEvaluated,
36+
};
37+
38+
/// Optimizes the input list of RowExpressions. For each input RowExpression,
39+
/// the result is an optimized expression on success or failure info.
40+
/// @param input List of RowExpressions to be optimized.
41+
/// @param timezone Session timezone, received from Presto coordinator.
42+
/// @param optimizerLevel Optimizer level, received from Presto coordinator.
43+
/// The optimizerLevel can either be OPTIMIZED or EVALUATED. OPTIMIZED removes
44+
/// all redundancy in a RowExpression by leveraging the ExpressionOptimizer in
45+
/// Velox, and EVALUATED attempts to evaluate the RowExpression into a constant
46+
/// even when the expression is non-deterministic.
47+
/// @param queryCtx Query context to be used during optimization.
48+
/// @param pool Memory pool, required for expression evaluation.
49+
std::vector<protocol::RowExpressionOptimizationResult> optimizeExpressions(
50+
const std::vector<RowExpressionPtr>& input,
51+
const std::string& timezone,
52+
OptimizerLevel& optimizerLevel,
53+
velox::core::QueryCtx* queryCtx,
54+
velox::memory::MemoryPool* pool);
55+
} // namespace facebook::presto::expression

0 commit comments

Comments
 (0)