From 68120c45af300e12c1a838b24c77a8399dc3e6f3 Mon Sep 17 00:00:00 2001 From: Jerry Hu Date: Wed, 28 Aug 2024 23:16:45 +0800 Subject: [PATCH] [fix](spill) should call set_ready after changing the status --- .../partitioned_aggregation_source_operator.cpp | 2 +- .../exec/partitioned_hash_join_sink_operator.cpp | 8 ++------ be/src/pipeline/exec/spill_sort_sink_operator.cpp | 13 +++++++------ be/src/pipeline/exec/spill_sort_source_operator.cpp | 2 +- 4 files changed, 11 insertions(+), 14 deletions(-) diff --git a/be/src/pipeline/exec/partitioned_aggregation_source_operator.cpp b/be/src/pipeline/exec/partitioned_aggregation_source_operator.cpp index 5e030e7ab49d10..eef597b329d5c2 100644 --- a/be/src/pipeline/exec/partitioned_aggregation_source_operator.cpp +++ b/be/src/pipeline/exec/partitioned_aggregation_source_operator.cpp @@ -227,7 +227,6 @@ Status PartitionedAggLocalState::initiate_merge_spill_partition_agg_data(Runtime } Base::_shared_state->in_mem_shared_state->aggregate_data_container->init_once(); _is_merging = false; - _dependency->Dependency::set_ready(); }}; bool has_agg_data = false; auto& parent = Base::_parent->template cast(); @@ -286,6 +285,7 @@ Status PartitionedAggLocalState::initiate_merge_spill_partition_agg_data(Runtime if (!status.ok()) { _status = status; } + _dependency->set_ready(); }; DBUG_EXECUTE_IF("fault_inject::partitioned_agg_source::submit_func", { diff --git a/be/src/pipeline/exec/partitioned_hash_join_sink_operator.cpp b/be/src/pipeline/exec/partitioned_hash_join_sink_operator.cpp index fc17ef41be62c8..3ec7e9c1955e29 100644 --- a/be/src/pipeline/exec/partitioned_hash_join_sink_operator.cpp +++ b/be/src/pipeline/exec/partitioned_hash_join_sink_operator.cpp @@ -139,7 +139,6 @@ Status PartitionedHashJoinSinkLocalState::_revoke_unpartitioned_block(RuntimeSta std::unique_lock lock(_spill_lock); _spill_status = status; _spill_status_ok = false; - _dependency->set_ready(); return false; } return true; @@ -187,7 +186,6 @@ Status PartitionedHashJoinSinkLocalState::_revoke_unpartitioned_block(RuntimeSta std::unique_lock lock(_spill_lock); _spill_status = st; _spill_status_ok = false; - _dependency->set_ready(); return; } partitions_indexes[partition_idx].clear(); @@ -202,8 +200,6 @@ Status PartitionedHashJoinSinkLocalState::_revoke_unpartitioned_block(RuntimeSta } } } - - _dependency->set_ready(); }; auto exception_catch_func = [spill_func, this]() mutable { @@ -216,8 +212,8 @@ Status PartitionedHashJoinSinkLocalState::_revoke_unpartitioned_block(RuntimeSta std::unique_lock lock(_spill_lock); _spill_status = status; _spill_status_ok = false; - _dependency->set_ready(); } + _dependency->set_ready(); }; auto spill_runnable = std::make_shared(state, _shared_state->shared_from_this(), @@ -294,7 +290,7 @@ Status PartitionedHashJoinSinkLocalState::revoke_memory(RuntimeState* state) { return Status::OK(); }(); - if (!status.OK()) { + if (!status.ok()) { std::unique_lock lock(_spill_lock); _dependency->set_ready(); _spill_status_ok = false; diff --git a/be/src/pipeline/exec/spill_sort_sink_operator.cpp b/be/src/pipeline/exec/spill_sort_sink_operator.cpp index 94196a0354e5cf..32db358556c1d2 100644 --- a/be/src/pipeline/exec/spill_sort_sink_operator.cpp +++ b/be/src/pipeline/exec/spill_sort_sink_operator.cpp @@ -231,12 +231,6 @@ Status SpillSortSinkLocalState::revoke_memory(RuntimeState* state) { } _spilling_stream.reset(); - if (_eos) { - _dependency->set_ready_to_read(); - _finish_dependency->set_ready(); - } else { - _dependency->Dependency::set_ready(); - } }}; _shared_state->sink_status = @@ -279,6 +273,13 @@ Status SpillSortSinkLocalState::revoke_memory(RuntimeState* state) { _shared_state->sink_status = [&]() { RETURN_IF_CATCH_EXCEPTION({ return spill_func(); }); }(); + + if (_eos) { + _dependency->set_ready_to_read(); + _finish_dependency->set_ready(); + } else { + _dependency->Dependency::set_ready(); + } }; DBUG_EXECUTE_IF("fault_inject::spill_sort_sink::revoke_memory_submit_func", { diff --git a/be/src/pipeline/exec/spill_sort_source_operator.cpp b/be/src/pipeline/exec/spill_sort_source_operator.cpp index 967e13d1fa527b..706d0762d92edb 100644 --- a/be/src/pipeline/exec/spill_sort_source_operator.cpp +++ b/be/src/pipeline/exec/spill_sort_source_operator.cpp @@ -102,7 +102,6 @@ Status SpillSortLocalState::initiate_merge_sort_spill_streams(RuntimeState* stat VLOG_DEBUG << "query " << print_id(query_id) << " sort node " << _parent->node_id() << " merge spill data finish"; } - _dependency->Dependency::set_ready(); }}; vectorized::Block merge_sorted_block; vectorized::SpillStreamSPtr tmp_stream; @@ -175,6 +174,7 @@ Status SpillSortLocalState::initiate_merge_sort_spill_streams(RuntimeState* stat auto exception_catch_func = [this, spill_func]() { _status = [&]() { RETURN_IF_CATCH_EXCEPTION({ return spill_func(); }); }(); + _dependency->Dependency::set_ready(); }; DBUG_EXECUTE_IF("fault_inject::spill_sort_source::merge_sort_spill_data_submit_func", {