diff --git a/be/src/pipeline/pipeline_task.cpp b/be/src/pipeline/pipeline_task.cpp index dfda613af52d52..04f869007da2b4 100644 --- a/be/src/pipeline/pipeline_task.cpp +++ b/be/src/pipeline/pipeline_task.cpp @@ -74,13 +74,30 @@ PipelineTask::PipelineTask( _task_idx(task_idx), _pipeline_name(_pipeline->name()) { _pipeline_task_watcher.start(); +#ifndef BE_TEST + _query_mem_tracker = fragment_context->get_query_ctx()->query_mem_tracker; +#endif _execution_dependencies.push_back(state->get_query_ctx()->get_execution_dependency()); auto shared_state = _sink->create_shared_state(); if (shared_state) { _sink_shared_state = shared_state; } } - +PipelineTask::~PipelineTask() { +// PipelineTask is also hold by task queue( https://github.com/apache/doris/pull/49753), +// so that it maybe the last one to be destructed. +// But pipeline task hold some objects, like operators, shared state, etc. So that should release +// memory manually. +#ifndef BE_TEST + SCOPED_SWITCH_THREAD_MEM_TRACKER_LIMITER(_query_mem_tracker); +#endif + _sink_shared_state.reset(); + _op_shared_states.clear(); + _sink.reset(); + _operators.clear(); + _block.reset(); + _pipeline.reset(); +} Status PipelineTask::prepare(const TPipelineInstanceParams& local_params, const TDataSink& tsink, QueryContext* query_ctx) { DCHECK(_sink); diff --git a/be/src/pipeline/pipeline_task.h b/be/src/pipeline/pipeline_task.h index 3a50436280d832..645879b7043bf8 100644 --- a/be/src/pipeline/pipeline_task.h +++ b/be/src/pipeline/pipeline_task.h @@ -54,6 +54,7 @@ class PipelineTask : public std::enable_shared_from_this { std::shared_ptr>> le_state_map, int task_idx); + ~PipelineTask(); Status prepare(const TPipelineInstanceParams& local_params, const TDataSink& tsink, QueryContext* query_ctx); @@ -316,6 +317,8 @@ class PipelineTask : public std::enable_shared_from_this { std::atomic _eos = false; std::atomic _wake_up_early = false; const std::string _pipeline_name; + // PipelineTask maybe hold by TaskQueue + std::shared_ptr _query_mem_tracker; }; using PipelineTaskSPtr = std::shared_ptr;