diff --git a/be/src/pipeline/pipeline_task.cpp b/be/src/pipeline/pipeline_task.cpp index 394afab1e2b6c5..2da386e83a1c16 100644 --- a/be/src/pipeline/pipeline_task.cpp +++ b/be/src/pipeline/pipeline_task.cpp @@ -81,6 +81,9 @@ PipelineTask::PipelineTask(PipelinePtr& pipeline, uint32_t task_id, RuntimeState _task_idx(task_idx), _memory_sufficient_dependency(state->get_query_ctx()->get_memory_sufficient_dependency()), _pipeline_name(_pipeline->name()) { +#ifndef BE_TEST + _query_mem_tracker = fragment_context->get_query_ctx()->query_mem_tracker(); +#endif _execution_dependencies.push_back(state->get_query_ctx()->get_execution_dependency()); if (!_shared_state_map.contains(_sink->dests_id().front())) { auto shared_state = _sink->create_shared_state(); @@ -90,6 +93,24 @@ PipelineTask::PipelineTask(PipelinePtr& pipeline, uint32_t task_id, RuntimeState } } +PipelineTask::~PipelineTask() { +// PipelineTask is also hold by task queue( https://github.com/apache/doris/pull/49753), +// so that it maybe the last one to be destructed. +// But pipeline task hold some objects, like operators, shared state, etc. So that should release +// memory manually. +#ifndef BE_TEST + SCOPED_SWITCH_THREAD_MEM_TRACKER_LIMITER(_query_mem_tracker); +#endif + _shared_state_map.clear(); + _sink_shared_state.reset(); + _op_shared_states.clear(); + _sink.reset(); + _operators.clear(); + _spill_context.reset(); + _block.reset(); + _pipeline.reset(); +} + Status PipelineTask::prepare(const std::vector& scan_range, const int sender_id, const TDataSink& tsink) { DCHECK(_sink); diff --git a/be/src/pipeline/pipeline_task.h b/be/src/pipeline/pipeline_task.h index ed27cc753b0b1f..416671384099fa 100644 --- a/be/src/pipeline/pipeline_task.h +++ b/be/src/pipeline/pipeline_task.h @@ -55,6 +55,8 @@ class PipelineTask : public std::enable_shared_from_this { shared_state_map, int task_idx); + ~PipelineTask(); + Status prepare(const std::vector& scan_range, const int sender_id, const TDataSink& tsink); @@ -259,6 +261,8 @@ class PipelineTask : public std::enable_shared_from_this { std::atomic _running {false}; std::atomic _eos {false}; std::atomic _wake_up_early {false}; + // PipelineTask maybe hold by TaskQueue + std::shared_ptr _query_mem_tracker; /** *