diff --git a/be/src/pipeline/exec/aggregation_source_operator.cpp b/be/src/pipeline/exec/aggregation_source_operator.cpp index 6f0c071d3911e8..a417c1fa997c7e 100644 --- a/be/src/pipeline/exec/aggregation_source_operator.cpp +++ b/be/src/pipeline/exec/aggregation_source_operator.cpp @@ -63,8 +63,6 @@ Status AggLocalState::init(RuntimeState* state, LocalStateInfo& info) { std::placeholders::_1, std::placeholders::_2, std::placeholders::_3); } - - _executor.close = std::bind(&AggLocalState::_close_without_key, this); } else { if (p._needs_finalize) { _executor.get_result = std::bind( @@ -75,10 +73,9 @@ Status AggLocalState::init(RuntimeState* state, LocalStateInfo& info) { &AggLocalState::_serialize_with_serialized_key_result, this, std::placeholders::_1, std::placeholders::_2, std::placeholders::_3); } - _executor.close = std::bind(&AggLocalState::_close_with_serialized_key, this); } - _agg_data_created_without_key = p._without_key; + _shared_state->agg_data_created_without_key = p._without_key; return Status::OK(); } @@ -91,39 +88,6 @@ Status AggLocalState::_destroy_agg_status(vectorized::AggregateDataPtr data) { return Status::OK(); } -void AggLocalState::_close_with_serialized_key() { - std::visit( - [&](auto&& agg_method) -> void { - auto& data = *agg_method.hash_table; - data.for_each_mapped([&](auto& mapped) { - if (mapped) { - static_cast(_destroy_agg_status(mapped)); - mapped = nullptr; - } - }); - if (data.has_null_key_data()) { - auto st = _destroy_agg_status( - data.template get_null_key_data()); - if (!st) { - throw Exception(st.code(), st.to_string()); - } - } - }, - _agg_data->method_variant); - _release_tracker(); -} - -void AggLocalState::_close_without_key() { - //because prepare maybe failed, and couldn't create agg data. - //but finally call close to destory agg data, if agg data has bitmapValue - //will be core dump, it's not initialized - if (_agg_data_created_without_key) { - static_cast(_destroy_agg_status(_agg_data->without_key)); - _agg_data_created_without_key = false; - } - _release_tracker(); -} - Status AggLocalState::_serialize_with_serialized_key_result(RuntimeState* state, vectorized::Block* block, SourceState& source_state) { @@ -597,12 +561,6 @@ Status AggLocalState::close(RuntimeState* state) { if (_closed) { return Status::OK(); } - for (auto* aggregate_evaluator : _shared_state->aggregate_evaluators) { - aggregate_evaluator->close(state); - } - if (_executor.close) { - _executor.close(); - } /// _hash_table_size_counter may be null if prepare failed. if (_hash_table_size_counter) { diff --git a/be/src/pipeline/exec/aggregation_source_operator.h b/be/src/pipeline/exec/aggregation_source_operator.h index 9c6d3e0fd0dde2..9418301f150cfc 100644 --- a/be/src/pipeline/exec/aggregation_source_operator.h +++ b/be/src/pipeline/exec/aggregation_source_operator.h @@ -88,8 +88,6 @@ class AggLocalState final : public PipelineXLocalState { friend class DistinctStreamingAggSourceOperatorX; friend class DistinctStreamingAggSinkOperatorX; - void _close_without_key(); - void _close_with_serialized_key(); Status _get_without_key_result(RuntimeState* state, vectorized::Block* block, SourceState& source_state); Status _serialize_without_key(RuntimeState* state, vectorized::Block* block, @@ -122,11 +120,6 @@ class AggLocalState final : public PipelineXLocalState { } } } - void _release_tracker() { - Base::_shared_state->mem_tracker->release( - Base::_shared_state->mem_usage_record.used_in_state + - Base::_shared_state->mem_usage_record.used_in_arena); - } RuntimeProfile::Counter* _get_results_timer; RuntimeProfile::Counter* _serialize_result_timer; @@ -137,17 +130,14 @@ class AggLocalState final : public PipelineXLocalState { using vectorized_get_result = std::function; - using vectorized_closer = std::function; struct executor { vectorized_get_result get_result; - vectorized_closer close; }; executor _executor; vectorized::AggregatedDataVariants* _agg_data; - bool _agg_data_created_without_key = false; }; class AggSourceOperatorX : public OperatorX { diff --git a/be/src/pipeline/pipeline_x/dependency.h b/be/src/pipeline/pipeline_x/dependency.h index 13ae1cd9d7d58d..cd5fef95c5c729 100644 --- a/be/src/pipeline/pipeline_x/dependency.h +++ b/be/src/pipeline/pipeline_x/dependency.h @@ -57,6 +57,12 @@ static_assert(TIME_UNIT_DEPENDENCY_LOG < SLOW_DEPENDENCY_THRESHOLD); struct BasicSharedState { Dependency* source_dep; Dependency* sink_dep; + + std::atomic ref_count = 0; + + void ref() { ref_count++; } + virtual Status close(RuntimeState* state) { return Status::OK(); } + virtual ~BasicSharedState() = default; }; class Dependency : public std::enable_shared_from_this { @@ -110,16 +116,6 @@ class Dependency : public std::enable_shared_from_this { virtual void block() { _ready = false; } protected: - bool _should_log(uint64_t cur_time) { - if (cur_time < SLOW_DEPENDENCY_THRESHOLD) { - return false; - } - if ((cur_time - _last_log_time) < TIME_UNIT_DEPENDENCY_LOG) { - return false; - } - _last_log_time = cur_time; - return true; - } void _add_block_task(PipelineXTask* task); bool _is_cancelled() const { return push_to_blocking_queue() ? false : _query_ctx->is_cancelled(); @@ -134,10 +130,8 @@ class Dependency : public std::enable_shared_from_this { std::shared_ptr _shared_state {nullptr}; MonotonicStopWatch _watcher; - std::weak_ptr _parent; std::list> _children; - uint64_t _last_log_time = 0; std::mutex _task_lock; std::vector _blocked_task; }; @@ -249,11 +243,25 @@ struct AggSharedState : public BasicSharedState { agg_data = std::make_unique(); agg_arena_pool = std::make_unique(); } - virtual ~AggSharedState() = default; + ~AggSharedState() override = default; void init_spill_partition_helper(size_t spill_partition_count_bits) { spill_partition_helper = std::make_unique(spill_partition_count_bits); } + Status close(RuntimeState* state) override { + if (ref_count.fetch_sub(1) == 1) { + for (auto* aggregate_evaluator : aggregate_evaluators) { + aggregate_evaluator->close(state); + } + if (probe_expr_ctxs.empty()) { + _close_without_key(); + } else { + _close_with_serialized_key(); + } + } + return Status::OK(); + } + vectorized::AggregatedDataVariantsUPtr agg_data; std::unique_ptr aggregate_data_container; vectorized::AggSpillContext spill_context; @@ -280,6 +288,49 @@ struct AggSharedState : public BasicSharedState { }; MemoryRecord mem_usage_record; std::unique_ptr mem_tracker = std::make_unique("AggregateOperator"); + bool agg_data_created_without_key = false; + +private: + void _release_tracker() { + mem_tracker->release(mem_usage_record.used_in_state + mem_usage_record.used_in_arena); + } + void _close_with_serialized_key() { + std::visit( + [&](auto&& agg_method) -> void { + auto& data = *agg_method.hash_table; + data.for_each_mapped([&](auto& mapped) { + if (mapped) { + static_cast(_destroy_agg_status(mapped)); + mapped = nullptr; + } + }); + if (data.has_null_key_data()) { + auto st = _destroy_agg_status( + data.template get_null_key_data()); + if (!st) { + throw Exception(st.code(), st.to_string()); + } + } + }, + agg_data->method_variant); + _release_tracker(); + } + void _close_without_key() { + //because prepare maybe failed, and couldn't create agg data. + //but finally call close to destory agg data, if agg data has bitmapValue + //will be core dump, it's not initialized + if (agg_data_created_without_key) { + static_cast(_destroy_agg_status(agg_data->without_key)); + agg_data_created_without_key = false; + } + _release_tracker(); + } + Status _destroy_agg_status(vectorized::AggregateDataPtr data) { + for (int i = 0; i < aggregate_evaluators.size(); ++i) { + aggregate_evaluators[i]->function()->destroy(data + offsets_of_aggregate_states[i]); + } + return Status::OK(); + } }; struct SortSharedState : public BasicSharedState { diff --git a/be/src/pipeline/pipeline_x/operator.cpp b/be/src/pipeline/pipeline_x/operator.cpp index 050b198a22e22f..0eafada38a98e5 100644 --- a/be/src/pipeline/pipeline_x/operator.cpp +++ b/be/src/pipeline/pipeline_x/operator.cpp @@ -348,6 +348,7 @@ Status PipelineXLocalState::init(RuntimeState* state, LocalState auto& deps = info.upstream_dependencies; _dependency->set_shared_state(deps.front()->shared_state()); _shared_state = (typename DependencyType::SharedState*)_dependency->shared_state().get(); + _shared_state->ref(); _wait_for_dependency_timer = ADD_TIMER(_runtime_profile, "WaitForDependency[" + _dependency->name() + "]Time"); _shared_state->source_dep = _dependency; @@ -382,6 +383,9 @@ Status PipelineXLocalState::close(RuntimeState* state) { if (_closed) { return Status::OK(); } + if (_shared_state) { + RETURN_IF_ERROR(_shared_state->close(state)); + } if constexpr (!std::is_same_v) { COUNTER_SET(_wait_for_dependency_timer, _dependency->watcher_elapse_time()); } @@ -410,6 +414,7 @@ Status PipelineXSinkLocalState::init(RuntimeState* state, _wait_for_dependency_timer = ADD_TIMER(_profile, "WaitForDependency[" + _dependency->name() + "]Time"); } + _shared_state->ref(); } else { auto& deps = info.dependencys; deps.front() = std::make_shared(0, 0, state->get_query_ctx()); @@ -429,6 +434,9 @@ Status PipelineXSinkLocalState::close(RuntimeState* state, Statu if (_closed) { return Status::OK(); } + if (_shared_state) { + RETURN_IF_ERROR(_shared_state->close(state)); + } if constexpr (!std::is_same_v) { COUNTER_SET(_wait_for_dependency_timer, _dependency->watcher_elapse_time()); } diff --git a/be/src/pipeline/pipeline_x/operator.h b/be/src/pipeline/pipeline_x/operator.h index 3f9548099a2ffb..7d534c048b8e6e 100644 --- a/be/src/pipeline/pipeline_x/operator.h +++ b/be/src/pipeline/pipeline_x/operator.h @@ -326,7 +326,7 @@ class PipelineXLocalState : public PipelineXLocalStateBase { protected: DependencyType* _dependency; - typename DependencyType::SharedState* _shared_state; + typename DependencyType::SharedState* _shared_state = nullptr; }; class DataSinkOperatorXBase; diff --git a/be/src/pipeline/pipeline_x/pipeline_x_fragment_context.cpp b/be/src/pipeline/pipeline_x/pipeline_x_fragment_context.cpp index e832124bf8899b..b26e9ead695a1b 100644 --- a/be/src/pipeline/pipeline_x/pipeline_x_fragment_context.cpp +++ b/be/src/pipeline/pipeline_x/pipeline_x_fragment_context.cpp @@ -149,6 +149,9 @@ void PipelineXFragmentContext::cancel(const PPlanFragmentCancelReason& reason, // TODO pipeline incomp // _exec_env->result_queue_mgr()->update_queue_status(id, Status::Aborted(msg)); } + if (reason == PPlanFragmentCancelReason::TIMEOUT) { + LOG(WARNING) << "PipelineXFragmentContext is cancelled due to timeout : " << debug_string(); + } for (auto& tasks : _tasks) { for (auto& task : tasks) { task->clear_blocking_state();