2929#include " runtime/data_stream_mgr.h"
3030#include " runtime/descriptors.h"
3131#include " runtime/exec_env.h"
32+ #include " runtime/local_pass_through_buffer.h"
3233#include " runtime/runtime_state.h"
3334#include " serde/compress_strategy.h"
3435#include " serde/protobuf_serde.h"
@@ -46,13 +47,15 @@ class ExchangeSinkOperator::Channel {
4647 // how much tuple data is getting accumulated before being sent; it only applies
4748 // when data is added via add_row() and not sent directly via send_batch().
4849 Channel (ExchangeSinkOperator* parent, const TNetworkAddress& brpc_dest, const TUniqueId& fragment_instance_id,
49- PlanNodeId dest_node_id, int32_t num_shuffles, bool enable_exchange_pass_through, bool enable_exchange_perf)
50+ PlanNodeId dest_node_id, int32_t num_shuffles, bool enable_exchange_pass_through, bool enable_exchange_perf,
51+ PassThroughChunkBuffer* pass_through_chunk_buffer)
5052 : _parent(parent),
5153 _brpc_dest_addr (brpc_dest),
5254 _fragment_instance_id(fragment_instance_id),
5355 _dest_node_id(dest_node_id),
5456 _enable_exchange_pass_through(enable_exchange_pass_through),
5557 _enable_exchange_perf(enable_exchange_perf),
58+ _pass_through_context(pass_through_chunk_buffer, fragment_instance_id, dest_node_id),
5659 _chunks(num_shuffles) {}
5760
5861 // Initialize channel.
@@ -114,6 +117,7 @@ class ExchangeSinkOperator::Channel {
114117 // enable it to profile exchange's performance, which ignores computing local data for exchange_speed/_bytes,
115118 // because local data isn't accessed by remote network.
116119 const bool _enable_exchange_perf;
120+ PassThroughContext _pass_through_context;
117121
118122 bool _is_first_chunk = true ;
119123 std::shared_ptr<PInternalService_RecoverableStub> _brpc_stub = nullptr ;
@@ -123,8 +127,6 @@ class ExchangeSinkOperator::Channel {
123127 // If pipeline level shuffle is disable, the size of _chunks
124128 // always be 1
125129 std::vector<std::unique_ptr<Chunk>> _chunks;
126- ChunkPassThroughVectorPtr _pass_through_chunks;
127- int64_t _pass_through_physical_bytes = 0 ;
128130 PTransmitChunkParamsPtr _chunk_request;
129131 size_t _current_request_bytes = 0 ;
130132
@@ -152,6 +154,7 @@ bool ExchangeSinkOperator::Channel::_check_use_pass_through() {
152154}
153155
154156void ExchangeSinkOperator::Channel::_prepare_pass_through () {
157+ _pass_through_context.init ();
155158 _use_pass_through = _check_use_pass_through ();
156159}
157160
@@ -226,8 +229,6 @@ Status ExchangeSinkOperator::Channel::send_one_chunk(RuntimeState* state, const
226229 _chunk_request->set_node_id (_dest_node_id);
227230 _chunk_request->set_sender_id (_parent->_sender_id );
228231 _chunk_request->set_be_number (_parent->_be_number );
229- _pass_through_chunks = std::make_unique<ChunkPassThroughVector>();
230- _pass_through_physical_bytes = 0 ;
231232 if (_parent->_is_pipeline_level_shuffle ) {
232233 _chunk_request->set_is_pipeline_level_shuffle (true );
233234 }
@@ -236,19 +237,18 @@ Status ExchangeSinkOperator::Channel::send_one_chunk(RuntimeState* state, const
236237 // If chunk is not null, append it to request
237238 if (chunk != nullptr ) {
238239 if (_use_pass_through) {
239- int64_t before_bytes = CurrentThread::current ().get_consumed_bytes ();
240- auto clone = chunk->clone_unique ();
241- int64_t physical_bytes = CurrentThread::current ().get_consumed_bytes () - before_bytes;
242- _pass_through_physical_bytes += physical_bytes;
243240 size_t chunk_size = serde::ProtobufChunkSerde::max_serialized_size (*chunk);
244- _pass_through_chunks->emplace_back (std::move (clone), driver_sequence, chunk_size, physical_bytes);
245- COUNTER_UPDATE (_parent->_bytes_pass_through_counter , chunk_size);
241+ // -1 means disable pipeline level shuffle
242+ TRY_CATCH_BAD_ALLOC (
243+ _pass_through_context.append_chunk (_parent->_sender_id , chunk, chunk_size,
244+ _parent->_is_pipeline_level_shuffle ? driver_sequence : -1 ));
246245 _current_request_bytes += chunk_size;
246+ COUNTER_UPDATE (_parent->_bytes_pass_through_counter , chunk_size);
247+ COUNTER_SET (_parent->_pass_through_buffer_peak_mem_usage , _pass_through_context.total_bytes ());
247248 } else {
248249 if (_parent->_is_pipeline_level_shuffle ) {
249250 _chunk_request->add_driver_sequences (driver_sequence);
250251 }
251-
252252 auto pchunk = _chunk_request->add_chunks ();
253253 TRY_CATCH_BAD_ALLOC (RETURN_IF_ERROR (_parent->serialize_chunk (chunk, pchunk, &_is_first_chunk)));
254254 _current_request_bytes += pchunk->data ().size ();
@@ -261,21 +261,12 @@ Status ExchangeSinkOperator::Channel::send_one_chunk(RuntimeState* state, const
261261 _chunk_request->set_eos (eos);
262262 _chunk_request->set_use_pass_through (_use_pass_through);
263263 butil::IOBuf attachment;
264- int64_t physical_bytes = _use_pass_through ? _pass_through_physical_bytes
265- : _parent->construct_brpc_attachment (_chunk_request, attachment);
266- TransmitChunkInfo info = {this ->_fragment_instance_id ,
267- _brpc_stub,
268- std::move (_chunk_request),
269- std::move (_pass_through_chunks),
270- state->exec_env ()->stream_mgr (),
271- attachment,
272- physical_bytes,
273- _brpc_dest_addr};
264+ int64_t attachment_physical_bytes = _parent->construct_brpc_attachment (_chunk_request, attachment);
265+ TransmitChunkInfo info = {this ->_fragment_instance_id , _brpc_stub, std::move (_chunk_request), attachment,
266+ attachment_physical_bytes, _brpc_dest_addr};
274267 RETURN_IF_ERROR (_parent->_buffer ->add_request (info));
275268 _current_request_bytes = 0 ;
276269 _chunk_request.reset ();
277- _pass_through_chunks = std::make_unique<ChunkPassThroughVector>();
278- _pass_through_physical_bytes = 0 ;
279270 *is_real_sent = true ;
280271 }
281272
@@ -293,8 +284,8 @@ Status ExchangeSinkOperator::Channel::send_chunk_request(RuntimeState* state, PT
293284 chunk_request->set_be_number (_parent->_be_number );
294285 chunk_request->set_eos (false );
295286 chunk_request->set_use_pass_through (_use_pass_through);
296- TransmitChunkInfo info = {this ->_fragment_instance_id , _brpc_stub, std::move (chunk_request), nullptr ,
297- state-> exec_env ()-> stream_mgr (), attachment, attachment_physical_bytes, _brpc_dest_addr};
287+ TransmitChunkInfo info = {this ->_fragment_instance_id , _brpc_stub, std::move (chunk_request), attachment ,
288+ attachment_physical_bytes, _brpc_dest_addr};
298289 RETURN_IF_ERROR (_parent->_buffer ->add_request (info));
299290
300291 return Status::OK ();
@@ -351,6 +342,10 @@ ExchangeSinkOperator::ExchangeSinkOperator(
351342 _output_columns(output_columns),
352343 _num_sinkers(num_sinkers) {
353344 std::map<int64_t , int64_t > fragment_id_to_channel_index;
345+ RuntimeState* state = fragment_ctx->runtime_state ();
346+
347+ PassThroughChunkBuffer* pass_through_chunk_buffer =
348+ state->exec_env ()->stream_mgr ()->get_pass_through_chunk_buffer (state->query_id ());
354349
355350 _channels.reserve (destinations.size ());
356351 std::vector<int > driver_sequence_per_channel (destinations.size (), 0 );
@@ -364,7 +359,7 @@ ExchangeSinkOperator::ExchangeSinkOperator(
364359 } else {
365360 std::unique_ptr<Channel> channel = std::make_unique<Channel>(
366361 this , destination.brpc_server , fragment_instance_id, dest_node_id, _num_shuffles_per_channel,
367- enable_exchange_pass_through, enable_exchange_perf);
362+ enable_exchange_pass_through, enable_exchange_perf, pass_through_chunk_buffer );
368363 _channels.emplace_back (channel.get ());
369364 _instance_id2channel.emplace (fragment_instance_id.lo , std::move (channel));
370365 }
@@ -460,6 +455,9 @@ Status ExchangeSinkOperator::prepare(RuntimeState* state) {
460455 _shuffle_chunk_append_counter = ADD_COUNTER (_unique_metrics, " ShuffleChunkAppendCounter" , TUnit::UNIT);
461456 _shuffle_chunk_append_timer = ADD_TIMER (_unique_metrics, " ShuffleChunkAppendTime" );
462457 _compress_timer = ADD_TIMER (_unique_metrics, " CompressTime" );
458+ _pass_through_buffer_peak_mem_usage = _unique_metrics->AddHighWaterMarkCounter (
459+ " PassThroughBufferPeakMemoryUsage" , TUnit::BYTES,
460+ RuntimeProfile::Counter::create_strategy (TUnit::BYTES, TCounterMergeType::SKIP_FIRST_MERGE));
463461
464462 for (auto & [_, channel] : _instance_id2channel) {
465463 RETURN_IF_ERROR (channel->init (state));
@@ -651,10 +649,8 @@ Status ExchangeSinkOperator::set_finishing(RuntimeState* state) {
651649 butil::IOBuf attachment;
652650 const int64_t attachment_physical_bytes = construct_brpc_attachment (_chunk_request, attachment);
653651 for (const auto & [_, channel] : _instance_id2channel) {
654- if (!channel->use_pass_through ()) {
655- PTransmitChunkParamsPtr copy = std::make_shared<PTransmitChunkParams>(*_chunk_request);
656- RETURN_IF_ERROR (channel->send_chunk_request (state, copy, attachment, attachment_physical_bytes));
657- }
652+ PTransmitChunkParamsPtr copy = std::make_shared<PTransmitChunkParams>(*_chunk_request);
653+ RETURN_IF_ERROR (channel->send_chunk_request (state, copy, attachment, attachment_physical_bytes));
658654 }
659655 _current_request_bytes = 0 ;
660656 _chunk_request.reset ();
0 commit comments