From 6c9869678a8c4c39e577db43ed8b68e89e8c4993 Mon Sep 17 00:00:00 2001 From: jacktengg <18241664+jacktengg@users.noreply.github.com> Date: Wed, 5 Jun 2024 09:49:09 +0800 Subject: [PATCH] [fix](multicast) should not ignore Status of block::merge --- be/src/pipeline/exec/multi_cast_data_stream_source.cpp | 3 ++- be/src/pipeline/exec/multi_cast_data_streamer.cpp | 5 +++-- be/src/pipeline/exec/multi_cast_data_streamer.h | 2 +- 3 files changed, 6 insertions(+), 4 deletions(-) diff --git a/be/src/pipeline/exec/multi_cast_data_stream_source.cpp b/be/src/pipeline/exec/multi_cast_data_stream_source.cpp index a7bb6462964b3d..36466e2c7fe637 100644 --- a/be/src/pipeline/exec/multi_cast_data_stream_source.cpp +++ b/be/src/pipeline/exec/multi_cast_data_stream_source.cpp @@ -87,7 +87,8 @@ Status MultiCastDataStreamerSourceOperatorX::get_block(RuntimeState* state, if (!local_state._output_expr_contexts.empty()) { output_block = &tmp_block; } - local_state._shared_state->multi_cast_data_streamer->pull(_consumer_id, output_block, eos); + RETURN_IF_ERROR(local_state._shared_state->multi_cast_data_streamer.pull(_consumer_id, + output_block, eos)); if (!local_state._conjuncts.empty()) { RETURN_IF_ERROR(vectorized::VExprContext::filter_block(local_state._conjuncts, output_block, diff --git a/be/src/pipeline/exec/multi_cast_data_streamer.cpp b/be/src/pipeline/exec/multi_cast_data_streamer.cpp index 24fa3217e3d821..d3047c42a2ddae 100644 --- a/be/src/pipeline/exec/multi_cast_data_streamer.cpp +++ b/be/src/pipeline/exec/multi_cast_data_streamer.cpp @@ -29,7 +29,7 @@ MultiCastBlock::MultiCastBlock(vectorized::Block* block, int used_count, size_t block->clear(); } -void MultiCastDataStreamer::pull(int sender_idx, doris::vectorized::Block* block, bool* eos) { +Status MultiCastDataStreamer::pull(int sender_idx, doris::vectorized::Block* block, bool* eos) { std::lock_guard l(_mutex); auto& pos_to_pull = _sender_pos_to_read[sender_idx]; if (pos_to_pull != _multi_cast_blocks.end()) { @@ -43,7 +43,7 @@ void MultiCastDataStreamer::pull(int sender_idx, doris::vectorized::Block* block } else { pos_to_pull->_used_count--; pos_to_pull->_block->create_same_struct_block(0)->swap(*block); - (void)vectorized::MutableBlock(block).merge(*pos_to_pull->_block); + RETURN_IF_ERROR(vectorized::MutableBlock(block).merge(*pos_to_pull->_block)); pos_to_pull++; } } @@ -51,6 +51,7 @@ void MultiCastDataStreamer::pull(int sender_idx, doris::vectorized::Block* block if (pos_to_pull == _multi_cast_blocks.end()) { _block_reading(sender_idx); } + return Status::OK(); } void MultiCastDataStreamer::close_sender(int sender_idx) { diff --git a/be/src/pipeline/exec/multi_cast_data_streamer.h b/be/src/pipeline/exec/multi_cast_data_streamer.h index e812067e52c804..0a1276c4f1b097 100644 --- a/be/src/pipeline/exec/multi_cast_data_streamer.h +++ b/be/src/pipeline/exec/multi_cast_data_streamer.h @@ -50,7 +50,7 @@ class MultiCastDataStreamer { ~MultiCastDataStreamer() = default; - void pull(int sender_idx, vectorized::Block* block, bool* eos); + Status pull(int sender_idx, vectorized::Block* block, bool* eos); void close_sender(int sender_idx);