Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
19 changes: 18 additions & 1 deletion be/src/pipeline/pipeline_task.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -74,13 +74,30 @@ PipelineTask::PipelineTask(
_task_idx(task_idx),
_pipeline_name(_pipeline->name()) {
_pipeline_task_watcher.start();
#ifndef BE_TEST
_query_mem_tracker = fragment_context->get_query_ctx()->query_mem_tracker;
#endif
_execution_dependencies.push_back(state->get_query_ctx()->get_execution_dependency());
auto shared_state = _sink->create_shared_state();
if (shared_state) {
_sink_shared_state = shared_state;
}
}

PipelineTask::~PipelineTask() {
// PipelineTask is also hold by task queue( https://github.com/apache/doris/pull/49753),
// so that it maybe the last one to be destructed.
// But pipeline task hold some objects, like operators, shared state, etc. So that should release
// memory manually.
#ifndef BE_TEST
SCOPED_SWITCH_THREAD_MEM_TRACKER_LIMITER(_query_mem_tracker);
#endif
_sink_shared_state.reset();
_op_shared_states.clear();
_sink.reset();
_operators.clear();
_block.reset();
_pipeline.reset();
}
Status PipelineTask::prepare(const TPipelineInstanceParams& local_params, const TDataSink& tsink,
QueryContext* query_ctx) {
DCHECK(_sink);
Expand Down
3 changes: 3 additions & 0 deletions be/src/pipeline/pipeline_task.h
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,7 @@ class PipelineTask : public std::enable_shared_from_this<PipelineTask> {
std::shared_ptr<Dependency>>>
le_state_map,
int task_idx);
~PipelineTask();

Status prepare(const TPipelineInstanceParams& local_params, const TDataSink& tsink,
QueryContext* query_ctx);
Expand Down Expand Up @@ -316,6 +317,8 @@ class PipelineTask : public std::enable_shared_from_this<PipelineTask> {
std::atomic<bool> _eos = false;
std::atomic<bool> _wake_up_early = false;
const std::string _pipeline_name;
// PipelineTask maybe hold by TaskQueue
std::shared_ptr<MemTrackerLimiter> _query_mem_tracker;
};

using PipelineTaskSPtr = std::shared_ptr<PipelineTask>;
Expand Down
Loading