Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
34 changes: 13 additions & 21 deletions be/src/pipeline/task_queue.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -132,42 +132,35 @@ Status PriorityTaskQueue::push(PipelineTask* task) {

MultiCoreTaskQueue::~MultiCoreTaskQueue() = default;

MultiCoreTaskQueue::MultiCoreTaskQueue(int core_size) : TaskQueue(core_size), _closed(false) {
_prio_task_queue_list =
std::make_shared<std::vector<std::unique_ptr<PriorityTaskQueue>>>(core_size);
for (int i = 0; i < core_size; i++) {
(*_prio_task_queue_list)[i] = std::make_unique<PriorityTaskQueue>();
}
}
MultiCoreTaskQueue::MultiCoreTaskQueue(int core_size)
: TaskQueue(core_size), _prio_task_queue_list(core_size), _closed(false) {}

void MultiCoreTaskQueue::close() {
if (_closed) {
return;
}
_closed = true;
for (int i = 0; i < _core_size; ++i) {
(*_prio_task_queue_list)[i]->close();
}
std::atomic_store(&_prio_task_queue_list,
std::shared_ptr<std::vector<std::unique_ptr<PriorityTaskQueue>>>(nullptr));
// close all priority task queue
std::ranges::for_each(_prio_task_queue_list,
[](auto& prio_task_queue) { prio_task_queue.close(); });
}

PipelineTask* MultiCoreTaskQueue::take(int core_id) {
PipelineTask* task = nullptr;
while (!_closed) {
DCHECK(_prio_task_queue_list->size() > core_id)
<< " list size: " << _prio_task_queue_list->size() << " core_id: " << core_id
DCHECK(_prio_task_queue_list.size() > core_id)
<< " list size: " << _prio_task_queue_list.size() << " core_id: " << core_id
<< " _core_size: " << _core_size << " _next_core: " << _next_core.load();
task = (*_prio_task_queue_list)[core_id]->try_take(false);
task = _prio_task_queue_list[core_id].try_take(false);
if (task) {
task->set_core_id(core_id);
break;
}
task = _steal_take(core_id, *_prio_task_queue_list);
task = _steal_take(core_id);
if (task) {
break;
}
task = (*_prio_task_queue_list)[core_id]->take(WAIT_CORE_TASK_TIMEOUT_MS /* timeout_ms */);
task = _prio_task_queue_list[core_id].take(WAIT_CORE_TASK_TIMEOUT_MS /* timeout_ms */);
if (task) {
task->set_core_id(core_id);
break;
Expand All @@ -179,8 +172,7 @@ PipelineTask* MultiCoreTaskQueue::take(int core_id) {
return task;
}

PipelineTask* MultiCoreTaskQueue::_steal_take(
int core_id, std::vector<std::unique_ptr<PriorityTaskQueue>>& prio_task_queue_list) {
PipelineTask* MultiCoreTaskQueue::_steal_take(int core_id) {
DCHECK(core_id < _core_size);
int next_id = core_id;
for (int i = 1; i < _core_size; ++i) {
Expand All @@ -189,7 +181,7 @@ PipelineTask* MultiCoreTaskQueue::_steal_take(
next_id = 0;
}
DCHECK(next_id < _core_size);
auto task = prio_task_queue_list[next_id]->try_take(true);
auto task = _prio_task_queue_list[next_id].try_take(true);
if (task) {
task->set_core_id(next_id);
return task;
Expand All @@ -209,7 +201,7 @@ Status MultiCoreTaskQueue::push_back(PipelineTask* task) {
Status MultiCoreTaskQueue::push_back(PipelineTask* task, int core_id) {
DCHECK(core_id < _core_size);
task->put_in_runnable_queue();
return (*_prio_task_queue_list)[core_id]->push(task);
return _prio_task_queue_list[core_id].push(task);
}

} // namespace pipeline
Expand Down
9 changes: 4 additions & 5 deletions be/src/pipeline/task_queue.h
Original file line number Diff line number Diff line change
Expand Up @@ -144,15 +144,14 @@ class MultiCoreTaskQueue : public TaskQueue {

void update_statistics(PipelineTask* task, int64_t time_spent) override {
task->inc_runtime_ns(time_spent);
(*_prio_task_queue_list)[task->get_core_id()]->inc_sub_queue_runtime(
task->get_queue_level(), time_spent);
_prio_task_queue_list[task->get_core_id()].inc_sub_queue_runtime(task->get_queue_level(),
time_spent);
}

private:
PipelineTask* _steal_take(
int core_id, std::vector<std::unique_ptr<PriorityTaskQueue>>& prio_task_queue_list);
PipelineTask* _steal_take(int core_id);

std::shared_ptr<std::vector<std::unique_ptr<PriorityTaskQueue>>> _prio_task_queue_list;
std::vector<PriorityTaskQueue> _prio_task_queue_list;
std::atomic<uint32_t> _next_core = 0;
std::atomic<bool> _closed;
};
Expand Down
5 changes: 4 additions & 1 deletion be/src/runtime/query_context.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -147,10 +147,12 @@ QueryContext::~QueryContext() {
MemTracker::print_bytes(query_mem_tracker->peak_consumption()));
}
uint64_t group_id = 0;
std::string wg_name = "";
if (_workload_group) {
group_id = _workload_group->id(); // before remove
_workload_group->remove_mem_tracker_limiter(query_mem_tracker);
_workload_group->remove_query(_query_id);
wg_name = _workload_group->name();
}

_exec_env->runtime_query_statistics_mgr()->set_query_finished(print_id(_query_id));
Expand Down Expand Up @@ -186,7 +188,8 @@ QueryContext::~QueryContext() {
_exec_env->spill_stream_mgr()->async_cleanup_query(_query_id);
DorisMetrics::instance()->query_ctx_cnt->increment(-1);
// the only one msg shows query's end. any other msg should append to it if need.
LOG(INFO) << fmt::format("Query {} deconstructed, mem_tracker: {}", print_id(this->_query_id),
LOG(INFO) << fmt::format("Query {} deconstructed, use wg: {}, query type: {}, mem_tracker: {}",
print_id(this->_query_id), wg_name, _query_options.query_type,
mem_tracker_msg);
}

Expand Down
Loading