Skip to content

Commit

Permalink
[pipelineX](bug) Fix broadcast buffer reference count (apache#26545)
Browse files Browse the repository at this point in the history
  • Loading branch information
Gabriel39 authored and 胥剑旭 committed Dec 14, 2023
1 parent 613a310 commit 4e26c43
Show file tree
Hide file tree
Showing 6 changed files with 25 additions and 29 deletions.
9 changes: 3 additions & 6 deletions be/src/pipeline/exec/exchange_sink_buffer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -177,20 +177,17 @@ Status ExchangeSinkBuffer<Parent>::add_block(TransmitInfo<Parent>&& request) {
}

template <typename Parent>
Status ExchangeSinkBuffer<Parent>::add_block(BroadcastTransmitInfo<Parent>&& request,
[[maybe_unused]] bool* sent) {
Status ExchangeSinkBuffer<Parent>::add_block(BroadcastTransmitInfo<Parent>&& request) {
if (_is_finishing) {
request.block_holder->unref();
return Status::OK();
}
TUniqueId ins_id = request.channel->_fragment_instance_id;
if (_is_receiver_eof(ins_id.lo)) {
request.block_holder->unref();
return Status::EndOfFile("receiver eof");
}
bool send_now = false;
if (sent) {
*sent = true;
}
request.block_holder->ref();
{
std::unique_lock<std::mutex> lock(*_instance_to_package_queue_mutex[ins_id.lo]);
// Do not have in process rpc, directly send
Expand Down
7 changes: 4 additions & 3 deletions be/src/pipeline/exec/exchange_sink_buffer.h
Original file line number Diff line number Diff line change
Expand Up @@ -76,15 +76,16 @@ class BroadcastPBlockHolder {
BroadcastPBlockHolder(pipeline::BroadcastDependency* dep) : _ref_count(0), _dep(dep) {}
~BroadcastPBlockHolder() noexcept = default;

void ref(int delta) noexcept { _ref_count._value.fetch_add(delta); }
void unref() noexcept;
void ref() noexcept { _ref_count._value.fetch_add(1); }
void ref() noexcept { ref(1); }

bool available() { return _ref_count._value == 0; }

PBlock* get_block() { return &pblock; }

private:
AtomicWrapper<uint32_t> _ref_count;
AtomicWrapper<int32_t> _ref_count;
PBlock pblock;
pipeline::BroadcastDependency* _dep;
};
Expand Down Expand Up @@ -177,7 +178,7 @@ class ExchangeSinkBuffer {
void register_sink(TUniqueId);

Status add_block(TransmitInfo<Parent>&& request);
Status add_block(BroadcastTransmitInfo<Parent>&& request, [[maybe_unused]] bool* sent);
Status add_block(BroadcastTransmitInfo<Parent>&& request);
bool can_write() const;
bool is_pending_finish();
void close();
Expand Down
21 changes: 8 additions & 13 deletions be/src/pipeline/exec/exchange_sink_operator.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -344,30 +344,25 @@ Status ExchangeSinkOperatorX::sink(RuntimeState* state, vectorized::Block* block
} else {
block_holder->get_block()->Clear();
}
Status error_status = Status::OK();
bool sent = false;
local_state._broadcast_dependency->take_available_block();
block_holder->ref(local_state.channels.size());
Status status;
for (auto channel : local_state.channels) {
if (!channel->is_receiver_eof()) {
Status status;
if (channel->is_local()) {
block_holder->unref();
status = channel->send_local_block(&cur_block);
} else {
SCOPED_CONSUME_MEM_TRACKER(_mem_tracker.get());
status = channel->send_broadcast_block(
block_holder, &sent, source_state == SourceState::FINISHED);
}
if (status.is<ErrorCode::END_OF_FILE>()) {
_handle_eof_channel(state, channel, status);
} else if (!status.ok()) {
error_status = status;
break;
block_holder, source_state == SourceState::FINISHED);
}
HANDLE_CHANNEL_STATUS(state, channel, status);
} else {
block_holder->unref();
}
}
if (sent) {
local_state._broadcast_dependency->take_available_block();
}
RETURN_IF_ERROR(error_status);
cur_block.clear_column_data();
local_state._serializer.get_block()->set_muatable_columns(
cur_block.mutate_columns());
Expand Down
2 changes: 1 addition & 1 deletion be/src/pipeline/exec/result_file_sink_operator.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -241,7 +241,7 @@ Status ResultFileSinkLocalState::close(RuntimeState* state, Status exec_status)
} else {
SCOPED_CONSUME_MEM_TRACKER(_mem_tracker.get());
status = channel->send_broadcast_block(_block_holder.get(),
nullptr, true);
true);
}
HANDLE_CHANNEL_STATUS(state, channel, status);
}
Expand Down
6 changes: 5 additions & 1 deletion be/src/vec/sink/vdata_stream_sender.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -562,15 +562,19 @@ Status VDataStreamSender::send(RuntimeState* state, Block* block, bool eos) {
block_holder->get_block()->Clear();
}
Status status;
block_holder->ref(_channels.size());
for (auto channel : _channels) {
if (!channel->is_receiver_eof()) {
if (channel->is_local()) {
block_holder->unref();
status = channel->send_local_block(&cur_block);
} else {
SCOPED_CONSUME_MEM_TRACKER(_mem_tracker.get());
status = channel->send_broadcast_block(block_holder, nullptr, eos);
status = channel->send_broadcast_block(block_holder, eos);
}
HANDLE_CHANNEL_STATUS(state, channel, status);
} else {
block_holder->unref();
}
}
cur_block.clear_column_data();
Expand Down
9 changes: 4 additions & 5 deletions be/src/vec/sink/vdata_stream_sender.h
Original file line number Diff line number Diff line change
Expand Up @@ -278,8 +278,7 @@ class Channel {
virtual Status send_remote_block(PBlock* block, bool eos = false,
Status exec_status = Status::OK());

virtual Status send_broadcast_block(BroadcastPBlockHolder* block, [[maybe_unused]] bool* sent,
bool eos = false) {
virtual Status send_broadcast_block(BroadcastPBlockHolder* block, bool eos = false) {
return Status::InternalError("Send BroadcastPBlockHolder is not allowed!");
}

Expand Down Expand Up @@ -494,17 +493,17 @@ class PipChannel final : public Channel<Parent> {
return Status::OK();
}

Status send_broadcast_block(BroadcastPBlockHolder* block, [[maybe_unused]] bool* sent,
bool eos = false) override {
Status send_broadcast_block(BroadcastPBlockHolder* block, bool eos = false) override {
COUNTER_UPDATE(Channel<Parent>::_parent->blocks_sent_counter(), 1);
if (eos) {
if (_eos_send) {
block->unref();
return Status::OK();
}
_eos_send = true;
}
if (eos || block->get_block()->column_metas_size()) {
RETURN_IF_ERROR(_buffer->add_block({this, block, eos}, sent));
RETURN_IF_ERROR(_buffer->add_block({this, block, eos}));
}
return Status::OK();
}
Expand Down

0 comments on commit 4e26c43

Please sign in to comment.