diff --git a/be/src/pipeline/exec/spill_utils.h b/be/src/pipeline/exec/spill_utils.h index d6f2a811f3a5e3..beb2ecaa984afa 100644 --- a/be/src/pipeline/exec/spill_utils.h +++ b/be/src/pipeline/exec/spill_utils.h @@ -201,8 +201,8 @@ class SpillRecoverRunnable : public SpillRunnable { } void _on_task_started() override { - LOG(INFO) << "SpillRecoverRunnable, Query: " << print_id(_state->query_id()) - << " spill task started, pipeline task id: " << _state->task_id(); + VLOG_DEBUG << "SpillRecoverRunnable, Query: " << print_id(_state->query_id()) + << " spill task started, pipeline task id: " << _state->task_id(); COUNTER_UPDATE(_read_wait_in_queue_task_count, -1); COUNTER_UPDATE(_reading_task_count, 1); } diff --git a/be/src/pipeline/pipeline_task.cpp b/be/src/pipeline/pipeline_task.cpp index 9f1512982b9249..cee0c0562b250d 100644 --- a/be/src/pipeline/pipeline_task.cpp +++ b/be/src/pipeline/pipeline_task.cpp @@ -23,6 +23,7 @@ #include #include +#include #include #include @@ -35,6 +36,7 @@ #include "pipeline/pipeline_fragment_context.h" #include "pipeline/task_queue.h" #include "pipeline/task_scheduler.h" +#include "revokable_task.h" #include "runtime/descriptors.h" #include "runtime/exec_env.h" #include "runtime/query_context.h" @@ -99,14 +101,15 @@ PipelineTask::~PipelineTask() { // But pipeline task hold some objects, like operators, shared state, etc. So that should release // memory manually. #ifndef BE_TEST - SCOPED_SWITCH_THREAD_MEM_TRACKER_LIMITER(_query_mem_tracker); + if (_query_mem_tracker) { + SCOPED_SWITCH_THREAD_MEM_TRACKER_LIMITER(_query_mem_tracker); + } #endif _shared_state_map.clear(); _sink_shared_state.reset(); _op_shared_states.clear(); _sink.reset(); _operators.clear(); - _spill_context.reset(); _block.reset(); _pipeline.reset(); } @@ -306,17 +309,12 @@ bool PipelineTask::is_blockable() const { } } - return _need_to_revoke_memory || - std::ranges::any_of(_operators, + return std::ranges::any_of(_operators, [&](OperatorPtr op) -> bool { return op->is_blockable(_state); }) || _sink->is_blockable(_state); } bool PipelineTask::_is_blocked() { - if (_need_to_revoke_memory) { - return false; - } - // `_dry_run = true` means we do not need data from source operator. if (!_dry_run) { for (int i = cast_set(_read_dependencies.size() - 1); i >= 0; i--) { @@ -378,11 +376,15 @@ void PipelineTask::terminate() { * @return */ Status PipelineTask::execute(bool* done) { - if (!_need_to_revoke_memory && (_exec_state != State::RUNNABLE || _blocked_dep != nullptr)) - [[unlikely]] { + if (_exec_state != State::RUNNABLE || _blocked_dep != nullptr) [[unlikely]] { +#ifdef BE_TEST return Status::InternalError("Pipeline task is not runnable! Task info: {}", debug_string()); +#else + return Status::FatalError("Pipeline task is not runnable! Task info: {}", debug_string()); +#endif } + auto fragment_context = _fragment_context.lock(); if (!fragment_context) { return Status::InternalError("Fragment already finished! Query: {}", print_id(_query_id)); @@ -477,11 +479,6 @@ Status PipelineTask::execute(bool* done) { break; } - if (_need_to_revoke_memory) { - _need_to_revoke_memory = false; - return _sink->revoke_memory(_state, _spill_context); - } - if (time_spent > _exec_time_slice) { COUNTER_UPDATE(_yield_counts, 1); break; @@ -610,6 +607,33 @@ Status PipelineTask::execute(bool* done) { return Status::OK(); } +Status PipelineTask::do_revoke_memory(const std::shared_ptr& spill_context) { + auto fragment_context = _fragment_context.lock(); + if (!fragment_context) { + return Status::InternalError("Fragment already finished! Query: {}", print_id(_query_id)); + } + + SCOPED_ATTACH_TASK(_state); + ThreadCpuStopWatch cpu_time_stop_watch; + cpu_time_stop_watch.start(); + Defer running_defer {[&]() { + int64_t delta_cpu_time = cpu_time_stop_watch.elapsed_time(); + _task_cpu_timer->update(delta_cpu_time); + fragment_context->get_query_ctx()->resource_ctx()->cpu_context()->update_cpu_cost_ms( + delta_cpu_time); + + // If task is woke up early, we should terminate all operators, and this task could be closed immediately. + if (_wake_up_early) { + terminate(); + THROW_IF_ERROR(_root->terminate(_state)); + THROW_IF_ERROR(_sink->terminate(_state)); + _eos = true; + } + }}; + + return _sink->revoke_memory(_state, spill_context); +} + bool PipelineTask::_try_to_reserve_memory(const size_t reserve_size, OperatorBase* op) { auto st = thread_context()->thread_mem_tracker_mgr->try_reserve(reserve_size); COUNTER_UPDATE(_memory_reserve_times, 1); @@ -794,7 +818,7 @@ std::string PipelineTask::debug_string() { } size_t PipelineTask::get_revocable_size() const { - if (is_finalized() || _running || (_eos && !_spilling)) { + if (!_opened || is_finalized() || _running || (_eos && !_spilling)) { return 0; } @@ -802,22 +826,19 @@ size_t PipelineTask::get_revocable_size() const { } Status PipelineTask::revoke_memory(const std::shared_ptr& spill_context) { + DCHECK(spill_context); if (is_finalized()) { - if (spill_context) { - spill_context->on_task_finished(); - VLOG_DEBUG << "Query: " << print_id(_state->query_id()) << ", task: " << ((void*)this) - << " finalized"; - } + spill_context->on_task_finished(); + VLOG_DEBUG << "Query: " << print_id(_state->query_id()) << ", task: " << ((void*)this) + << " finalized"; return Status::OK(); } const auto revocable_size = _sink->revocable_mem_size(_state); if (revocable_size >= vectorized::SpillStream::MIN_SPILL_WRITE_BATCH_MEM) { - _need_to_revoke_memory = true; - _spill_context = spill_context; - RETURN_IF_ERROR( - _state->get_query_ctx()->get_pipe_exec_scheduler()->submit(shared_from_this())); - } else if (spill_context) { + auto revokable_task = std::make_shared(shared_from_this(), spill_context); + RETURN_IF_ERROR(_state->get_query_ctx()->get_pipe_exec_scheduler()->submit(revokable_task)); + } else { spill_context->on_task_finished(); LOG(INFO) << "Query: " << print_id(_state->query_id()) << ", task: " << ((void*)this) << " has not enough data to revoke: " << revocable_size; diff --git a/be/src/pipeline/pipeline_task.h b/be/src/pipeline/pipeline_task.h index 41019e4c59806e..e2d51858be4498 100644 --- a/be/src/pipeline/pipeline_task.h +++ b/be/src/pipeline/pipeline_task.h @@ -55,24 +55,24 @@ class PipelineTask : public std::enable_shared_from_this { shared_state_map, int task_idx); - ~PipelineTask(); + virtual ~PipelineTask(); Status prepare(const std::vector& scan_range, const int sender_id, const TDataSink& tsink); - Status execute(bool* done); + virtual Status execute(bool* done); // if the pipeline create a bunch of pipeline task // must be call after all pipeline task is finish to release resource - Status close(Status exec_status, bool close_sink = true); + virtual Status close(Status exec_status, bool close_sink = true); - std::weak_ptr& fragment_context() { return _fragment_context; } + virtual std::weak_ptr& fragment_context() { return _fragment_context; } int get_thread_id(int num_threads) const { return _thread_id == -1 ? _thread_id : _thread_id % num_threads; } - PipelineTask& set_thread_id(int thread_id) { + virtual PipelineTask& set_thread_id(int thread_id) { _thread_id = thread_id; if (thread_id != _thread_id) { COUNTER_UPDATE(_core_change_times, 1); @@ -80,7 +80,7 @@ class PipelineTask : public std::enable_shared_from_this { return *this; } - Status finalize(); + virtual Status finalize(); std::string debug_string(); @@ -94,7 +94,7 @@ class PipelineTask : public std::enable_shared_from_this { * Pipeline task is blockable means it will be blocked in the next run. So we should put it into * the blocking task scheduler. */ - bool is_blockable() const; + virtual bool is_blockable() const; /** * `shared_state` is shared by different pipeline tasks. This function aims to establish @@ -125,7 +125,7 @@ class PipelineTask : public std::enable_shared_from_this { DataSinkOperatorPtr sink() const { return _sink; } int task_id() const { return _index; }; - bool is_finalized() const { return _exec_state == State::FINALIZED; } + virtual bool is_finalized() const { return _exec_state == State::FINALIZED; } void set_wake_up_early(PipelineId wake_by = -1) { _wake_up_early = true; @@ -153,19 +153,24 @@ class PipelineTask : public std::enable_shared_from_this { void pop_out_runnable_queue() { _wait_worker_watcher.stop(); } bool is_running() { return _running.load(); } - PipelineTask& set_running(bool running) { - _running.exchange(running); - return *this; + virtual bool set_running(bool running) { + bool old_value = !running; + _running.compare_exchange_weak(old_value, running); + return old_value; } - RuntimeState* runtime_state() const { return _state; } + virtual RuntimeState* runtime_state() const { return _state; } + + virtual std::string task_name() const { + return fmt::format("task{}({})", _index, _pipeline->_name); + } - std::string task_name() const { return fmt::format("task{}({})", _index, _pipeline->_name); } + [[nodiscard]] Status do_revoke_memory(const std::shared_ptr& spill_context); // TODO: Maybe we do not need this safe code anymore void stop_if_finished(); - PipelineId pipeline_id() const { return _pipeline->id(); } + virtual PipelineId pipeline_id() const { return _pipeline->id(); } [[nodiscard]] size_t get_revocable_size() const; [[nodiscard]] Status revoke_memory(const std::shared_ptr& spill_context); @@ -175,6 +180,10 @@ class PipelineTask : public std::enable_shared_from_this { return _state_transition(PipelineTask::State::BLOCKED); } +protected: + // Only used for RevokableTask + PipelineTask() : _index(0) {} + private: // Whether this task is blocked before execution (FE 2-phase commit trigger, runtime filters) bool _wait_to_start(); @@ -214,9 +223,6 @@ class PipelineTask : public std::enable_shared_from_this { // 3 update task statistics(update _queue_level/_core_id) int _queue_level = 0; - bool _need_to_revoke_memory = false; - std::shared_ptr _spill_context; - RuntimeProfile* _parent_profile = nullptr; std::unique_ptr _task_profile; RuntimeProfile::Counter* _task_cpu_timer = nullptr; diff --git a/be/src/pipeline/revokable_task.h b/be/src/pipeline/revokable_task.h new file mode 100644 index 00000000000000..d4d253c2703422 --- /dev/null +++ b/be/src/pipeline/revokable_task.h @@ -0,0 +1,76 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#pragma once + +#include +#include + +#include "common/status.h" +#include "pipeline/dependency.h" +#include "pipeline/exec/operator.h" +#include "pipeline/exec/spill_utils.h" +#include "pipeline/pipeline.h" +#include "pipeline/pipeline_task.h" +#include "pipeline_task.h" + +namespace doris { +class RuntimeState; + +namespace pipeline { +class PipelineFragmentContext; + +class RevokableTask : public PipelineTask { +public: + RevokableTask(PipelineTaskSPtr task, std::shared_ptr spill_context) + : _task(std::move(task)), _spill_context(std::move(spill_context)) {} + + ~RevokableTask() override = default; + + RuntimeState* runtime_state() const override { return _task->runtime_state(); } + + Status close(Status exec_status, bool close_sink) override { + return _task->close(exec_status, close_sink); + } + + Status finalize() override { return _task->finalize(); } + + bool set_running(bool running) override { return _task->set_running(running); } + + bool is_finalized() const override { return _task->is_finalized(); } + + std::weak_ptr& fragment_context() override { + return _task->fragment_context(); + } + + PipelineTask& set_thread_id(int thread_id) override { return _task->set_thread_id(thread_id); } + + PipelineId pipeline_id() const override { return _task->pipeline_id(); } + + std::string task_name() const override { return _task->task_name(); } + + Status execute(bool* done) override { return _task->do_revoke_memory(_spill_context); } + + bool is_blockable() const override { return true; } + +private: + PipelineTaskSPtr _task; + std::shared_ptr _spill_context; +}; + +} // namespace pipeline +} // namespace doris \ No newline at end of file diff --git a/be/src/pipeline/task_scheduler.cpp b/be/src/pipeline/task_scheduler.cpp index 794a08155d18ea..228335d1aa0ba1 100644 --- a/be/src/pipeline/task_scheduler.cpp +++ b/be/src/pipeline/task_scheduler.cpp @@ -104,19 +104,26 @@ void TaskScheduler::_do_work(int index) { // The task is already running, maybe block in now dependency wake up by other thread // but the block thread still hold the task, so put it back to the queue, until the hold // thread set task->set_running(false) - if (task->is_running()) { + // set_running return the old value + if (task->set_running(true)) { static_cast(_task_queue.push_back(task, index)); continue; } + if (task->is_finalized()) { + task->set_running(false); continue; } + auto fragment_context = task->fragment_context().lock(); if (!fragment_context) { // Fragment already finished + task->set_running(false); continue; } - task->set_running(true).set_thread_id(index); + + task->set_thread_id(index); + bool done = false; auto status = Status::OK(); int64_t exec_ns = 0;