Skip to content

Commit

Permalink
[fix](multicast) should not ignore Status of block::merge apache#35886 (
Browse files Browse the repository at this point in the history
  • Loading branch information
jacktengg authored and caiconghui1 committed Jun 24, 2024
1 parent 835b5bb commit c881a46
Show file tree
Hide file tree
Showing 3 changed files with 5 additions and 4 deletions.
2 changes: 1 addition & 1 deletion be/src/pipeline/exec/multi_cast_data_stream_source.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -97,7 +97,7 @@ Status MultiCastDataStreamerSourceOperator::get_block(RuntimeState* state, vecto
if (!_output_expr_contexts.empty()) {
output_block = &tmp_block;
}
_multi_cast_data_streamer->pull(_consumer_id, output_block, &eos);
RETURN_IF_ERROR(_multi_cast_data_streamer->pull(_consumer_id, output_block, &eos));

if (!_conjuncts.empty()) {
RETURN_IF_ERROR(vectorized::VExprContext::filter_block(_conjuncts, output_block,
Expand Down
5 changes: 3 additions & 2 deletions be/src/pipeline/exec/multi_cast_data_streamer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ MultiCastBlock::MultiCastBlock(vectorized::Block* block, int used_count, size_t
block->clear();
}

void MultiCastDataStreamer::pull(int sender_idx, doris::vectorized::Block* block, bool* eos) {
Status MultiCastDataStreamer::pull(int sender_idx, doris::vectorized::Block* block, bool* eos) {
std::lock_guard l(_mutex);
auto& pos_to_pull = _sender_pos_to_read[sender_idx];
if (pos_to_pull != _multi_cast_blocks.end()) {
Expand All @@ -41,11 +41,12 @@ void MultiCastDataStreamer::pull(int sender_idx, doris::vectorized::Block* block
} else {
pos_to_pull->_used_count--;
pos_to_pull->_block->create_same_struct_block(0)->swap(*block);
(void)vectorized::MutableBlock(block).merge(*pos_to_pull->_block);
RETURN_IF_ERROR(vectorized::MutableBlock(block).merge(*pos_to_pull->_block));
pos_to_pull++;
}
}
*eos = _eos and pos_to_pull == _multi_cast_blocks.end();
return Status::OK();
}

void MultiCastDataStreamer::close_sender(int sender_idx) {
Expand Down
2 changes: 1 addition & 1 deletion be/src/pipeline/exec/multi_cast_data_streamer.h
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ class MultiCastDataStreamer {

~MultiCastDataStreamer() = default;

void pull(int sender_idx, vectorized::Block* block, bool* eos);
Status pull(int sender_idx, vectorized::Block* block, bool* eos);

void close_sender(int sender_idx);

Expand Down

0 comments on commit c881a46

Please sign in to comment.