Skip to content

Commit

Permalink
[fix](spill) should call set_ready after changing the status
Browse files Browse the repository at this point in the history
  • Loading branch information
mrhhsg committed Aug 29, 2024
1 parent b805f9e commit 70203c5
Show file tree
Hide file tree
Showing 4 changed files with 11 additions and 14 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -227,7 +227,6 @@ Status PartitionedAggLocalState::initiate_merge_spill_partition_agg_data(Runtime
}
Base::_shared_state->in_mem_shared_state->aggregate_data_container->init_once();
_is_merging = false;
_dependency->Dependency::set_ready();
}};
bool has_agg_data = false;
auto& parent = Base::_parent->template cast<Parent>();
Expand Down Expand Up @@ -286,6 +285,7 @@ Status PartitionedAggLocalState::initiate_merge_spill_partition_agg_data(Runtime
if (!status.ok()) {
_status = status;
}
_dependency->set_ready();
};

DBUG_EXECUTE_IF("fault_inject::partitioned_agg_source::submit_func", {
Expand Down
8 changes: 2 additions & 6 deletions be/src/pipeline/exec/partitioned_hash_join_sink_operator.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -139,7 +139,6 @@ Status PartitionedHashJoinSinkLocalState::_revoke_unpartitioned_block(RuntimeSta
std::unique_lock<std::mutex> lock(_spill_lock);
_spill_status = status;
_spill_status_ok = false;
_dependency->set_ready();
return false;
}
return true;
Expand Down Expand Up @@ -187,7 +186,6 @@ Status PartitionedHashJoinSinkLocalState::_revoke_unpartitioned_block(RuntimeSta
std::unique_lock<std::mutex> lock(_spill_lock);
_spill_status = st;
_spill_status_ok = false;
_dependency->set_ready();
return;
}
partitions_indexes[partition_idx].clear();
Expand All @@ -202,8 +200,6 @@ Status PartitionedHashJoinSinkLocalState::_revoke_unpartitioned_block(RuntimeSta
}
}
}

_dependency->set_ready();
};

auto exception_catch_func = [spill_func, this]() mutable {
Expand All @@ -216,8 +212,8 @@ Status PartitionedHashJoinSinkLocalState::_revoke_unpartitioned_block(RuntimeSta
std::unique_lock<std::mutex> lock(_spill_lock);
_spill_status = status;
_spill_status_ok = false;
_dependency->set_ready();
}
_dependency->set_ready();
};

auto spill_runnable = std::make_shared<SpillRunnable>(state, _shared_state->shared_from_this(),
Expand Down Expand Up @@ -294,7 +290,7 @@ Status PartitionedHashJoinSinkLocalState::revoke_memory(RuntimeState* state) {
return Status::OK();
}();

if (!status.OK()) {
if (!status.ok()) {
std::unique_lock<std::mutex> lock(_spill_lock);
_dependency->set_ready();
_spill_status_ok = false;
Expand Down
13 changes: 7 additions & 6 deletions be/src/pipeline/exec/spill_sort_sink_operator.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -231,12 +231,6 @@ Status SpillSortSinkLocalState::revoke_memory(RuntimeState* state) {
}

_spilling_stream.reset();
if (_eos) {
_dependency->set_ready_to_read();
_finish_dependency->set_ready();
} else {
_dependency->Dependency::set_ready();
}
}};

_shared_state->sink_status =
Expand Down Expand Up @@ -279,6 +273,13 @@ Status SpillSortSinkLocalState::revoke_memory(RuntimeState* state) {
_shared_state->sink_status = [&]() {
RETURN_IF_CATCH_EXCEPTION({ return spill_func(); });
}();

if (_eos) {
_dependency->set_ready_to_read();
_finish_dependency->set_ready();
} else {
_dependency->Dependency::set_ready();
}
};

DBUG_EXECUTE_IF("fault_inject::spill_sort_sink::revoke_memory_submit_func", {
Expand Down
2 changes: 1 addition & 1 deletion be/src/pipeline/exec/spill_sort_source_operator.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -102,7 +102,6 @@ Status SpillSortLocalState::initiate_merge_sort_spill_streams(RuntimeState* stat
VLOG_DEBUG << "query " << print_id(query_id) << " sort node " << _parent->node_id()
<< " merge spill data finish";
}
_dependency->Dependency::set_ready();
}};
vectorized::Block merge_sorted_block;
vectorized::SpillStreamSPtr tmp_stream;
Expand Down Expand Up @@ -175,6 +174,7 @@ Status SpillSortLocalState::initiate_merge_sort_spill_streams(RuntimeState* stat

auto exception_catch_func = [this, spill_func]() {
_status = [&]() { RETURN_IF_CATCH_EXCEPTION({ return spill_func(); }); }();
_dependency->Dependency::set_ready();
};

DBUG_EXECUTE_IF("fault_inject::spill_sort_source::merge_sort_spill_data_submit_func", {
Expand Down

0 comments on commit 70203c5

Please sign in to comment.