Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions be/src/pipeline/exec/spill_utils.h
Original file line number Diff line number Diff line change
Expand Up @@ -201,8 +201,8 @@ class SpillRecoverRunnable : public SpillRunnable {
}

void _on_task_started() override {
LOG(INFO) << "SpillRecoverRunnable, Query: " << print_id(_state->query_id())
<< " spill task started, pipeline task id: " << _state->task_id();
VLOG_DEBUG << "SpillRecoverRunnable, Query: " << print_id(_state->query_id())
<< " spill task started, pipeline task id: " << _state->task_id();
COUNTER_UPDATE(_read_wait_in_queue_task_count, -1);
COUNTER_UPDATE(_reading_task_count, 1);
}
Expand Down
73 changes: 47 additions & 26 deletions be/src/pipeline/pipeline_task.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
#include <glog/logging.h>

#include <algorithm>
#include <memory>
#include <ostream>
#include <vector>

Expand All @@ -35,6 +36,7 @@
#include "pipeline/pipeline_fragment_context.h"
#include "pipeline/task_queue.h"
#include "pipeline/task_scheduler.h"
#include "revokable_task.h"
#include "runtime/descriptors.h"
#include "runtime/exec_env.h"
#include "runtime/query_context.h"
Expand Down Expand Up @@ -99,14 +101,15 @@ PipelineTask::~PipelineTask() {
// But pipeline task hold some objects, like operators, shared state, etc. So that should release
// memory manually.
#ifndef BE_TEST
SCOPED_SWITCH_THREAD_MEM_TRACKER_LIMITER(_query_mem_tracker);
if (_query_mem_tracker) {
SCOPED_SWITCH_THREAD_MEM_TRACKER_LIMITER(_query_mem_tracker);
}
#endif
_shared_state_map.clear();
_sink_shared_state.reset();
_op_shared_states.clear();
_sink.reset();
_operators.clear();
_spill_context.reset();
_block.reset();
_pipeline.reset();
}
Expand Down Expand Up @@ -306,17 +309,12 @@ bool PipelineTask::is_blockable() const {
}
}

return _need_to_revoke_memory ||
std::ranges::any_of(_operators,
return std::ranges::any_of(_operators,
[&](OperatorPtr op) -> bool { return op->is_blockable(_state); }) ||
_sink->is_blockable(_state);
}

bool PipelineTask::_is_blocked() {
if (_need_to_revoke_memory) {
return false;
}

// `_dry_run = true` means we do not need data from source operator.
if (!_dry_run) {
for (int i = cast_set<int>(_read_dependencies.size() - 1); i >= 0; i--) {
Expand Down Expand Up @@ -378,11 +376,15 @@ void PipelineTask::terminate() {
* @return
*/
Status PipelineTask::execute(bool* done) {
if (!_need_to_revoke_memory && (_exec_state != State::RUNNABLE || _blocked_dep != nullptr))
[[unlikely]] {
if (_exec_state != State::RUNNABLE || _blocked_dep != nullptr) [[unlikely]] {
#ifdef BE_TEST
return Status::InternalError("Pipeline task is not runnable! Task info: {}",
debug_string());
#else
return Status::FatalError("Pipeline task is not runnable! Task info: {}", debug_string());
#endif
}

auto fragment_context = _fragment_context.lock();
if (!fragment_context) {
return Status::InternalError("Fragment already finished! Query: {}", print_id(_query_id));
Expand Down Expand Up @@ -477,11 +479,6 @@ Status PipelineTask::execute(bool* done) {
break;
}

if (_need_to_revoke_memory) {
_need_to_revoke_memory = false;
return _sink->revoke_memory(_state, _spill_context);
}

if (time_spent > _exec_time_slice) {
COUNTER_UPDATE(_yield_counts, 1);
break;
Expand Down Expand Up @@ -610,6 +607,33 @@ Status PipelineTask::execute(bool* done) {
return Status::OK();
}

Status PipelineTask::do_revoke_memory(const std::shared_ptr<SpillContext>& spill_context) {
auto fragment_context = _fragment_context.lock();
if (!fragment_context) {
return Status::InternalError("Fragment already finished! Query: {}", print_id(_query_id));
}

SCOPED_ATTACH_TASK(_state);
ThreadCpuStopWatch cpu_time_stop_watch;
cpu_time_stop_watch.start();
Defer running_defer {[&]() {
int64_t delta_cpu_time = cpu_time_stop_watch.elapsed_time();
_task_cpu_timer->update(delta_cpu_time);
fragment_context->get_query_ctx()->resource_ctx()->cpu_context()->update_cpu_cost_ms(
delta_cpu_time);

// If task is woke up early, we should terminate all operators, and this task could be closed immediately.
if (_wake_up_early) {
terminate();
THROW_IF_ERROR(_root->terminate(_state));
THROW_IF_ERROR(_sink->terminate(_state));
_eos = true;
}
}};

return _sink->revoke_memory(_state, spill_context);
}

bool PipelineTask::_try_to_reserve_memory(const size_t reserve_size, OperatorBase* op) {
auto st = thread_context()->thread_mem_tracker_mgr->try_reserve(reserve_size);
COUNTER_UPDATE(_memory_reserve_times, 1);
Expand Down Expand Up @@ -794,30 +818,27 @@ std::string PipelineTask::debug_string() {
}

size_t PipelineTask::get_revocable_size() const {
if (is_finalized() || _running || (_eos && !_spilling)) {
if (!_opened || is_finalized() || _running || (_eos && !_spilling)) {
return 0;
}

return _sink->revocable_mem_size(_state);
}

Status PipelineTask::revoke_memory(const std::shared_ptr<SpillContext>& spill_context) {
DCHECK(spill_context);
if (is_finalized()) {
if (spill_context) {
spill_context->on_task_finished();
VLOG_DEBUG << "Query: " << print_id(_state->query_id()) << ", task: " << ((void*)this)
<< " finalized";
}
spill_context->on_task_finished();
VLOG_DEBUG << "Query: " << print_id(_state->query_id()) << ", task: " << ((void*)this)
<< " finalized";
return Status::OK();
}

const auto revocable_size = _sink->revocable_mem_size(_state);
if (revocable_size >= vectorized::SpillStream::MIN_SPILL_WRITE_BATCH_MEM) {
_need_to_revoke_memory = true;
_spill_context = spill_context;
RETURN_IF_ERROR(
_state->get_query_ctx()->get_pipe_exec_scheduler()->submit(shared_from_this()));
} else if (spill_context) {
auto revokable_task = std::make_shared<RevokableTask>(shared_from_this(), spill_context);
RETURN_IF_ERROR(_state->get_query_ctx()->get_pipe_exec_scheduler()->submit(revokable_task));
} else {
spill_context->on_task_finished();
LOG(INFO) << "Query: " << print_id(_state->query_id()) << ", task: " << ((void*)this)
<< " has not enough data to revoke: " << revocable_size;
Expand Down
40 changes: 23 additions & 17 deletions be/src/pipeline/pipeline_task.h
Original file line number Diff line number Diff line change
Expand Up @@ -55,32 +55,32 @@ class PipelineTask : public std::enable_shared_from_this<PipelineTask> {
shared_state_map,
int task_idx);

~PipelineTask();
virtual ~PipelineTask();

Status prepare(const std::vector<TScanRangeParams>& scan_range, const int sender_id,
const TDataSink& tsink);

Status execute(bool* done);
virtual Status execute(bool* done);

// if the pipeline create a bunch of pipeline task
// must be call after all pipeline task is finish to release resource
Status close(Status exec_status, bool close_sink = true);
virtual Status close(Status exec_status, bool close_sink = true);

std::weak_ptr<PipelineFragmentContext>& fragment_context() { return _fragment_context; }
virtual std::weak_ptr<PipelineFragmentContext>& fragment_context() { return _fragment_context; }

int get_thread_id(int num_threads) const {
return _thread_id == -1 ? _thread_id : _thread_id % num_threads;
}

PipelineTask& set_thread_id(int thread_id) {
virtual PipelineTask& set_thread_id(int thread_id) {
_thread_id = thread_id;
if (thread_id != _thread_id) {
COUNTER_UPDATE(_core_change_times, 1);
}
return *this;
}

Status finalize();
virtual Status finalize();

std::string debug_string();

Expand All @@ -94,7 +94,7 @@ class PipelineTask : public std::enable_shared_from_this<PipelineTask> {
* Pipeline task is blockable means it will be blocked in the next run. So we should put it into
* the blocking task scheduler.
*/
bool is_blockable() const;
virtual bool is_blockable() const;

/**
* `shared_state` is shared by different pipeline tasks. This function aims to establish
Expand Down Expand Up @@ -125,7 +125,7 @@ class PipelineTask : public std::enable_shared_from_this<PipelineTask> {
DataSinkOperatorPtr sink() const { return _sink; }

int task_id() const { return _index; };
bool is_finalized() const { return _exec_state == State::FINALIZED; }
virtual bool is_finalized() const { return _exec_state == State::FINALIZED; }

void set_wake_up_early(PipelineId wake_by = -1) {
_wake_up_early = true;
Expand Down Expand Up @@ -153,19 +153,24 @@ class PipelineTask : public std::enable_shared_from_this<PipelineTask> {
void pop_out_runnable_queue() { _wait_worker_watcher.stop(); }

bool is_running() { return _running.load(); }
PipelineTask& set_running(bool running) {
_running.exchange(running);
return *this;
virtual bool set_running(bool running) {
bool old_value = !running;
_running.compare_exchange_weak(old_value, running);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Add a check for the return value by compare_exchange_weak

return old_value;
}

RuntimeState* runtime_state() const { return _state; }
virtual RuntimeState* runtime_state() const { return _state; }

virtual std::string task_name() const {
return fmt::format("task{}({})", _index, _pipeline->_name);
}

std::string task_name() const { return fmt::format("task{}({})", _index, _pipeline->_name); }
[[nodiscard]] Status do_revoke_memory(const std::shared_ptr<SpillContext>& spill_context);

// TODO: Maybe we do not need this safe code anymore
void stop_if_finished();

PipelineId pipeline_id() const { return _pipeline->id(); }
virtual PipelineId pipeline_id() const { return _pipeline->id(); }
[[nodiscard]] size_t get_revocable_size() const;
[[nodiscard]] Status revoke_memory(const std::shared_ptr<SpillContext>& spill_context);

Expand All @@ -175,6 +180,10 @@ class PipelineTask : public std::enable_shared_from_this<PipelineTask> {
return _state_transition(PipelineTask::State::BLOCKED);
}

protected:
// Only used for RevokableTask
PipelineTask() : _index(0) {}

private:
// Whether this task is blocked before execution (FE 2-phase commit trigger, runtime filters)
bool _wait_to_start();
Expand Down Expand Up @@ -214,9 +223,6 @@ class PipelineTask : public std::enable_shared_from_this<PipelineTask> {
// 3 update task statistics(update _queue_level/_core_id)
int _queue_level = 0;

bool _need_to_revoke_memory = false;
std::shared_ptr<SpillContext> _spill_context;

RuntimeProfile* _parent_profile = nullptr;
std::unique_ptr<RuntimeProfile> _task_profile;
RuntimeProfile::Counter* _task_cpu_timer = nullptr;
Expand Down
76 changes: 76 additions & 0 deletions be/src/pipeline/revokable_task.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,76 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

#pragma once

#include <memory>
#include <string>

#include "common/status.h"
#include "pipeline/dependency.h"
#include "pipeline/exec/operator.h"
#include "pipeline/exec/spill_utils.h"
#include "pipeline/pipeline.h"
#include "pipeline/pipeline_task.h"
#include "pipeline_task.h"

namespace doris {
class RuntimeState;

namespace pipeline {
class PipelineFragmentContext;

class RevokableTask : public PipelineTask {
public:
RevokableTask(PipelineTaskSPtr task, std::shared_ptr<SpillContext> spill_context)
: _task(std::move(task)), _spill_context(std::move(spill_context)) {}

~RevokableTask() override = default;

RuntimeState* runtime_state() const override { return _task->runtime_state(); }

Status close(Status exec_status, bool close_sink) override {
return _task->close(exec_status, close_sink);
}

Status finalize() override { return _task->finalize(); }

bool set_running(bool running) override { return _task->set_running(running); }

bool is_finalized() const override { return _task->is_finalized(); }

std::weak_ptr<PipelineFragmentContext>& fragment_context() override {
return _task->fragment_context();
}

PipelineTask& set_thread_id(int thread_id) override { return _task->set_thread_id(thread_id); }

PipelineId pipeline_id() const override { return _task->pipeline_id(); }

std::string task_name() const override { return _task->task_name(); }

Status execute(bool* done) override { return _task->do_revoke_memory(_spill_context); }

bool is_blockable() const override { return true; }

private:
PipelineTaskSPtr _task;
std::shared_ptr<SpillContext> _spill_context;
};

} // namespace pipeline
} // namespace doris
11 changes: 9 additions & 2 deletions be/src/pipeline/task_scheduler.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -104,19 +104,26 @@ void TaskScheduler::_do_work(int index) {
// The task is already running, maybe block in now dependency wake up by other thread
// but the block thread still hold the task, so put it back to the queue, until the hold
// thread set task->set_running(false)
if (task->is_running()) {
// set_running return the old value
if (task->set_running(true)) {
static_cast<void>(_task_queue.push_back(task, index));
continue;
}

if (task->is_finalized()) {
task->set_running(false);
continue;
}

auto fragment_context = task->fragment_context().lock();
if (!fragment_context) {
// Fragment already finished
task->set_running(false);
continue;
}
task->set_running(true).set_thread_id(index);

task->set_thread_id(index);

bool done = false;
auto status = Status::OK();
int64_t exec_ns = 0;
Expand Down
Loading