diff --git a/be/src/pipeline/task_queue.cpp b/be/src/pipeline/task_queue.cpp index 601db7014b1d65..ad89a7d56a9275 100644 --- a/be/src/pipeline/task_queue.cpp +++ b/be/src/pipeline/task_queue.cpp @@ -132,42 +132,35 @@ Status PriorityTaskQueue::push(PipelineTask* task) { MultiCoreTaskQueue::~MultiCoreTaskQueue() = default; -MultiCoreTaskQueue::MultiCoreTaskQueue(int core_size) : TaskQueue(core_size), _closed(false) { - _prio_task_queue_list = - std::make_shared>>(core_size); - for (int i = 0; i < core_size; i++) { - (*_prio_task_queue_list)[i] = std::make_unique(); - } -} +MultiCoreTaskQueue::MultiCoreTaskQueue(int core_size) + : TaskQueue(core_size), _prio_task_queue_list(core_size), _closed(false) {} void MultiCoreTaskQueue::close() { if (_closed) { return; } _closed = true; - for (int i = 0; i < _core_size; ++i) { - (*_prio_task_queue_list)[i]->close(); - } - std::atomic_store(&_prio_task_queue_list, - std::shared_ptr>>(nullptr)); + // close all priority task queue + std::ranges::for_each(_prio_task_queue_list, + [](auto& prio_task_queue) { prio_task_queue.close(); }); } PipelineTask* MultiCoreTaskQueue::take(int core_id) { PipelineTask* task = nullptr; while (!_closed) { - DCHECK(_prio_task_queue_list->size() > core_id) - << " list size: " << _prio_task_queue_list->size() << " core_id: " << core_id + DCHECK(_prio_task_queue_list.size() > core_id) + << " list size: " << _prio_task_queue_list.size() << " core_id: " << core_id << " _core_size: " << _core_size << " _next_core: " << _next_core.load(); - task = (*_prio_task_queue_list)[core_id]->try_take(false); + task = _prio_task_queue_list[core_id].try_take(false); if (task) { task->set_core_id(core_id); break; } - task = _steal_take(core_id, *_prio_task_queue_list); + task = _steal_take(core_id); if (task) { break; } - task = (*_prio_task_queue_list)[core_id]->take(WAIT_CORE_TASK_TIMEOUT_MS /* timeout_ms */); + task = _prio_task_queue_list[core_id].take(WAIT_CORE_TASK_TIMEOUT_MS /* timeout_ms */); if (task) { task->set_core_id(core_id); break; @@ -179,8 +172,7 @@ PipelineTask* MultiCoreTaskQueue::take(int core_id) { return task; } -PipelineTask* MultiCoreTaskQueue::_steal_take( - int core_id, std::vector>& prio_task_queue_list) { +PipelineTask* MultiCoreTaskQueue::_steal_take(int core_id) { DCHECK(core_id < _core_size); int next_id = core_id; for (int i = 1; i < _core_size; ++i) { @@ -189,7 +181,7 @@ PipelineTask* MultiCoreTaskQueue::_steal_take( next_id = 0; } DCHECK(next_id < _core_size); - auto task = prio_task_queue_list[next_id]->try_take(true); + auto task = _prio_task_queue_list[next_id].try_take(true); if (task) { task->set_core_id(next_id); return task; @@ -209,7 +201,7 @@ Status MultiCoreTaskQueue::push_back(PipelineTask* task) { Status MultiCoreTaskQueue::push_back(PipelineTask* task, int core_id) { DCHECK(core_id < _core_size); task->put_in_runnable_queue(); - return (*_prio_task_queue_list)[core_id]->push(task); + return _prio_task_queue_list[core_id].push(task); } } // namespace pipeline diff --git a/be/src/pipeline/task_queue.h b/be/src/pipeline/task_queue.h index 01282be7be0d13..1c6d2be4929167 100644 --- a/be/src/pipeline/task_queue.h +++ b/be/src/pipeline/task_queue.h @@ -144,15 +144,14 @@ class MultiCoreTaskQueue : public TaskQueue { void update_statistics(PipelineTask* task, int64_t time_spent) override { task->inc_runtime_ns(time_spent); - (*_prio_task_queue_list)[task->get_core_id()]->inc_sub_queue_runtime( - task->get_queue_level(), time_spent); + _prio_task_queue_list[task->get_core_id()].inc_sub_queue_runtime(task->get_queue_level(), + time_spent); } private: - PipelineTask* _steal_take( - int core_id, std::vector>& prio_task_queue_list); + PipelineTask* _steal_take(int core_id); - std::shared_ptr>> _prio_task_queue_list; + std::vector _prio_task_queue_list; std::atomic _next_core = 0; std::atomic _closed; }; diff --git a/be/src/runtime/query_context.cpp b/be/src/runtime/query_context.cpp index 13f9c0c99740b1..4e050f17b4e048 100644 --- a/be/src/runtime/query_context.cpp +++ b/be/src/runtime/query_context.cpp @@ -147,10 +147,12 @@ QueryContext::~QueryContext() { MemTracker::print_bytes(query_mem_tracker->peak_consumption())); } uint64_t group_id = 0; + std::string wg_name = ""; if (_workload_group) { group_id = _workload_group->id(); // before remove _workload_group->remove_mem_tracker_limiter(query_mem_tracker); _workload_group->remove_query(_query_id); + wg_name = _workload_group->name(); } _exec_env->runtime_query_statistics_mgr()->set_query_finished(print_id(_query_id)); @@ -186,7 +188,8 @@ QueryContext::~QueryContext() { _exec_env->spill_stream_mgr()->async_cleanup_query(_query_id); DorisMetrics::instance()->query_ctx_cnt->increment(-1); // the only one msg shows query's end. any other msg should append to it if need. - LOG(INFO) << fmt::format("Query {} deconstructed, mem_tracker: {}", print_id(this->_query_id), + LOG(INFO) << fmt::format("Query {} deconstructed, use wg: {}, query type: {}, mem_tracker: {}", + print_id(this->_query_id), wg_name, _query_options.query_type, mem_tracker_msg); }