Skip to content

Commit

Permalink
[minor](annotation) Do refactoring and complete annotations
Browse files Browse the repository at this point in the history
  • Loading branch information
Gabriel39 committed Sep 24, 2024
1 parent 7a7bc6a commit af41e49
Show file tree
Hide file tree
Showing 3 changed files with 79 additions and 94 deletions.
143 changes: 62 additions & 81 deletions be/src/pipeline/pipeline_fragment_context.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -112,7 +112,6 @@
#include "vec/runtime/vdata_stream_mgr.h"

namespace doris::pipeline {
bvar::Adder<int64_t> g_pipeline_tasks_count("doris_pipeline_tasks_count");

PipelineFragmentContext::PipelineFragmentContext(
const TUniqueId& query_id, const int fragment_id, std::shared_ptr<QueryContext> query_ctx,
Expand Down Expand Up @@ -181,9 +180,10 @@ void PipelineFragmentContext::cancel(const Status reason) {
LOG(WARNING) << "PipelineFragmentContext is cancelled due to timeout : " << debug_string();
}

// `ILLEGAL_STATE` means queries this fragment belongs to was not found in FE (maybe finished)
if (reason.is<ErrorCode::ILLEGAL_STATE>()) {
LOG_WARNING("PipelineFragmentContext is cancelled due to illegal state : {}",
this->debug_string());
debug_string());
}

_query_ctx->cancel(reason, _fragment_id);
Expand All @@ -208,18 +208,7 @@ void PipelineFragmentContext::cancel(const Status reason) {
}
}

PipelinePtr PipelineFragmentContext::add_pipeline() {
// _prepared、_submitted, _canceled should do not add pipeline
PipelineId id = _next_pipeline_id++;
auto pipeline = std::make_shared<Pipeline>(
id, _num_instances,
std::dynamic_pointer_cast<PipelineFragmentContext>(shared_from_this()));
_pipelines.emplace_back(pipeline);
return pipeline;
}

PipelinePtr PipelineFragmentContext::add_pipeline(PipelinePtr parent, int idx) {
// _prepared、_submitted, _canceled should do not add pipeline
PipelineId id = _next_pipeline_id++;
auto pipeline = std::make_shared<Pipeline>(
id, _num_instances,
Expand All @@ -229,7 +218,9 @@ PipelinePtr PipelineFragmentContext::add_pipeline(PipelinePtr parent, int idx) {
} else {
_pipelines.emplace_back(pipeline);
}
parent->set_children(pipeline);
if (parent) {
parent->set_children(pipeline);
}
return pipeline;
}

Expand All @@ -247,7 +238,7 @@ Status PipelineFragmentContext::prepare(const doris::TPipelineFragmentParams& re
SCOPED_TIMER(_prepare_timer);
_build_pipelines_timer = ADD_TIMER(_fragment_level_profile, "BuildPipelinesTime");
_init_context_timer = ADD_TIMER(_fragment_level_profile, "InitContextTime");
_plan_local_shuffle_timer = ADD_TIMER(_fragment_level_profile, "PlanLocalShuffleTime");
_plan_local_exchanger_timer = ADD_TIMER(_fragment_level_profile, "PlanLocalLocalExchangerTime");
_build_tasks_timer = ADD_TIMER(_fragment_level_profile, "BuildTasksTime");
_prepare_all_pipelines_timer = ADD_TIMER(_fragment_level_profile, "PrepareAllPipelinesTime");
{
Expand Down Expand Up @@ -334,14 +325,15 @@ Status PipelineFragmentContext::prepare(const doris::TPipelineFragmentParams& re
RETURN_IF_ERROR(pipeline->sink()->set_child(pipeline->operators().back()));
}
}
if (_enable_local_shuffle()) {
SCOPED_TIMER(_plan_local_shuffle_timer);
// 4. Build local exchanger
if (_runtime_state->enable_local_shuffle()) {
SCOPED_TIMER(_plan_local_exchanger_timer);
RETURN_IF_ERROR(_plan_local_exchange(request.num_buckets,
request.bucket_seq_to_instance_idx,
request.shuffle_idx_to_instance_idx));
}

// 4. Initialize global states in pipelines.
// 5. Initialize global states in pipelines.
for (PipelinePtr& pipeline : _pipelines) {
SCOPED_TIMER(_prepare_all_pipelines_timer);
pipeline->children().clear();
Expand All @@ -350,7 +342,7 @@ Status PipelineFragmentContext::prepare(const doris::TPipelineFragmentParams& re

{
SCOPED_TIMER(_build_tasks_timer);
// 5. Build pipeline tasks and initialize local state.
// 6. Build pipeline tasks and initialize local state.
RETURN_IF_ERROR(_build_pipeline_tasks(request, thread_pool));
}

Expand All @@ -377,40 +369,6 @@ Status PipelineFragmentContext::_build_pipeline_tasks(const doris::TPipelineFrag
const auto& local_params = request.local_params[i];
auto fragment_instance_id = local_params.fragment_instance_id;
_fragment_instance_ids[i] = fragment_instance_id;
std::unique_ptr<RuntimeFilterMgr> runtime_filter_mgr;
auto init_runtime_state = [&](std::unique_ptr<RuntimeState>& runtime_state) {
runtime_state->set_query_mem_tracker(_query_ctx->query_mem_tracker);

runtime_state->set_task_execution_context(shared_from_this());
runtime_state->set_be_number(local_params.backend_num);

if (request.__isset.backend_id) {
runtime_state->set_backend_id(request.backend_id);
}
if (request.__isset.import_label) {
runtime_state->set_import_label(request.import_label);
}
if (request.__isset.db_name) {
runtime_state->set_db_name(request.db_name);
}
if (request.__isset.load_job_id) {
runtime_state->set_load_job_id(request.load_job_id);
}
if (request.__isset.wal_id) {
runtime_state->set_wal_id(request.wal_id);
}

runtime_state->set_desc_tbl(_desc_tbl);
runtime_state->set_per_fragment_instance_idx(local_params.sender_id);
runtime_state->set_num_per_fragment_instances(request.num_senders);
runtime_state->resize_op_id_to_local_state(max_operator_id());
runtime_state->set_max_operator_id(max_operator_id());
runtime_state->set_load_stream_per_node(request.load_stream_per_node);
runtime_state->set_total_load_streams(request.total_load_streams);
runtime_state->set_num_local_sink(request.num_local_sink);
DCHECK(runtime_filter_mgr);
runtime_state->set_runtime_filter_mgr(runtime_filter_mgr.get());
};

auto filterparams = std::make_unique<RuntimeFilterParamsContext>();

Expand All @@ -429,8 +387,7 @@ Status PipelineFragmentContext::_build_pipeline_tasks(const doris::TPipelineFrag
filterparams->query_ctx = _query_ctx.get();
}

// build local_runtime_filter_mgr for each instance
runtime_filter_mgr = std::make_unique<RuntimeFilterMgr>(
auto runtime_filter_mgr = std::make_unique<RuntimeFilterMgr>(
request.query_id, filterparams.get(), _query_ctx->query_mem_tracker);

filterparams->runtime_filter_mgr = runtime_filter_mgr.get();
Expand Down Expand Up @@ -467,7 +424,41 @@ Status PipelineFragmentContext::_build_pipeline_tasks(const doris::TPipelineFrag
request.fragment_id, request.query_options, _query_ctx->query_globals,
_exec_env, _query_ctx.get());
auto& task_runtime_state = _task_runtime_states[pip_idx][i];
init_runtime_state(task_runtime_state);
{
// Initialize runtime state for this task
task_runtime_state->set_query_mem_tracker(_query_ctx->query_mem_tracker);

task_runtime_state->set_task_execution_context(shared_from_this());
task_runtime_state->set_be_number(local_params.backend_num);

if (request.__isset.backend_id) {
task_runtime_state->set_backend_id(request.backend_id);
}
if (request.__isset.import_label) {
task_runtime_state->set_import_label(request.import_label);
}
if (request.__isset.db_name) {
task_runtime_state->set_db_name(request.db_name);
}
if (request.__isset.load_job_id) {
task_runtime_state->set_load_job_id(request.load_job_id);
}
if (request.__isset.wal_id) {
task_runtime_state->set_wal_id(request.wal_id);
}

task_runtime_state->set_desc_tbl(_desc_tbl);
task_runtime_state->set_per_fragment_instance_idx(local_params.sender_id);
task_runtime_state->set_num_per_fragment_instances(request.num_senders);
task_runtime_state->resize_op_id_to_local_state(max_operator_id());
task_runtime_state->set_max_operator_id(max_operator_id());
task_runtime_state->set_load_stream_per_node(request.load_stream_per_node);
task_runtime_state->set_total_load_streams(request.total_load_streams);
task_runtime_state->set_num_local_sink(request.num_local_sink);
DCHECK(_runtime_filter_states[i]->runtime_filter_mgr);
task_runtime_state->set_runtime_filter_mgr(
_runtime_filter_states[i]->runtime_filter_mgr);
}
auto cur_task_id = _total_tasks++;
task_runtime_state->set_task_id(cur_task_id);
task_runtime_state->set_task_num(pipeline->num_tasks());
Expand Down Expand Up @@ -498,22 +489,12 @@ Status PipelineFragmentContext::_build_pipeline_tasks(const doris::TPipelineFrag
* Finally, we have two upstream dependencies in Pipeline1 corresponding to JoinProbeOperator1
* and JoinProbeOperator2.
*/

// First, set up the parent profile,task runtime state

auto prepare_and_set_parent_profile = [&](PipelineTask* task, size_t pip_idx) {
DCHECK(pipeline_id_to_profile[pip_idx]);
RETURN_IF_ERROR(
task->prepare(local_params, request.fragment.output_sink, _query_ctx.get()));
return Status::OK();
};

for (auto& _pipeline : _pipelines) {
if (pipeline_id_to_task.contains(_pipeline->id())) {
auto* task = pipeline_id_to_task[_pipeline->id()];
DCHECK(task != nullptr);

// if this task has upstream dependency, then record them.
// If this task has upstream dependency, then inject it into this task.
if (_dag.find(_pipeline->id()) != _dag.end()) {
auto& deps = _dag[_pipeline->id()];
for (auto& dep : deps) {
Expand All @@ -533,7 +514,9 @@ Status PipelineFragmentContext::_build_pipeline_tasks(const doris::TPipelineFrag
for (size_t pip_idx = 0; pip_idx < _pipelines.size(); pip_idx++) {
if (pipeline_id_to_task.contains(_pipelines[pip_idx]->id())) {
auto* task = pipeline_id_to_task[_pipelines[pip_idx]->id()];
RETURN_IF_ERROR(prepare_and_set_parent_profile(task, pip_idx));
DCHECK(pipeline_id_to_profile[pip_idx]);
RETURN_IF_ERROR(task->prepare(local_params, request.fragment.output_sink,
_query_ctx.get()));
}
}
{
Expand All @@ -545,6 +528,7 @@ Status PipelineFragmentContext::_build_pipeline_tasks(const doris::TPipelineFrag
if (target_size > 1 &&
(_runtime_state->query_options().__isset.parallel_prepare_threshold &&
target_size > _runtime_state->query_options().parallel_prepare_threshold)) {
// If instances parallelism is big enough ( > parallel_prepare_threshold), we will prepare all tasks by multi-threads
std::vector<Status> prepare_status(target_size);
std::mutex m;
std::condition_variable cv;
Expand Down Expand Up @@ -638,8 +622,8 @@ void PipelineFragmentContext::trigger_report_if_necessary() {
_runtime_state->load_channel_profile()->pretty_print(&ss);
}

VLOG_FILE << "Query " << print_id(this->get_query_id()) << " fragment "
<< this->get_fragment_id() << " profile:\n"
VLOG_FILE << "Query " << print_id(get_query_id()) << " fragment " << get_fragment_id()
<< " profile:\n"
<< ss.str();
}
auto st = send_report(false);
Expand Down Expand Up @@ -932,7 +916,6 @@ Status PipelineFragmentContext::_add_local_exchange(
const std::map<int, int>& bucket_seq_to_instance_idx,
const std::map<int, int>& shuffle_idx_to_instance_idx,
const bool ignore_data_distribution) {
DCHECK(_enable_local_shuffle());
if (_num_instances <= 1) {
return Status::OK();
}
Expand Down Expand Up @@ -1752,8 +1735,7 @@ void PipelineFragmentContext::_close_fragment_instance() {
_runtime_state->load_channel_profile()->pretty_print(&ss);
}

LOG_INFO("Query {} fragment {} profile:\n {}", print_id(this->_query_id),
this->_fragment_id, ss.str());
LOG_INFO("Query {} fragment {} profile:\n {}", print_id(_query_id), _fragment_id, ss.str());
}

if (_query_ctx->enable_profile()) {
Expand All @@ -1768,7 +1750,6 @@ void PipelineFragmentContext::_close_fragment_instance() {

void PipelineFragmentContext::close_a_pipeline() {
std::lock_guard<std::mutex> l(_task_mutex);
g_pipeline_tasks_count << -1;
++_closed_tasks;
if (_closed_tasks == _total_tasks) {
_close_fragment_instance();
Expand Down Expand Up @@ -1836,9 +1817,9 @@ PipelineFragmentContext::collect_realtime_profile() const {
// we do not have mutex to protect pipeline_id_to_profile
// so we need to make sure this funciton is invoked after fragment context
// has already been prepared.
if (!this->_prepared) {
if (!_prepared) {
std::string msg =
"Query " + print_id(this->_query_id) + " collecting profile, but its not prepared";
"Query " + print_id(_query_id) + " collecting profile, but its not prepared";
DCHECK(false) << msg;
LOG_ERROR(msg);
return res;
Expand All @@ -1864,29 +1845,29 @@ PipelineFragmentContext::collect_realtime_load_channel_profile() const {
// we do not have mutex to protect pipeline_id_to_profile
// so we need to make sure this funciton is invoked after fragment context
// has already been prepared.
if (!this->_prepared) {
if (!_prepared) {
std::string msg =
"Query " + print_id(this->_query_id) + " collecting profile, but its not prepared";
"Query " + print_id(_query_id) + " collecting profile, but its not prepared";
DCHECK(false) << msg;
LOG_ERROR(msg);
return nullptr;
}

for (auto& runtime_states : _task_runtime_states) {
for (auto& runtime_state : runtime_states) {
if (runtime_state->runtime_profile() == nullptr) {
if (runtime_state == nullptr || runtime_state->runtime_profile() == nullptr) {
continue;
}

auto tmp_load_channel_profile = std::make_shared<TRuntimeProfileTree>();

runtime_state->runtime_profile()->to_thrift(tmp_load_channel_profile.get());
this->_runtime_state->load_channel_profile()->update(*tmp_load_channel_profile);
_runtime_state->load_channel_profile()->update(*tmp_load_channel_profile);
}
}

auto load_channel_profile = std::make_shared<TRuntimeProfileTree>();
this->_runtime_state->load_channel_profile()->to_thrift(load_channel_profile.get());
_runtime_state->load_channel_profile()->to_thrift(load_channel_profile.get());
return load_channel_profile;
}

Expand Down
23 changes: 16 additions & 7 deletions be/src/pipeline/pipeline_fragment_context.h
Original file line number Diff line number Diff line change
Expand Up @@ -78,9 +78,7 @@ class PipelineFragmentContext : public TaskExecutionContext {

int timeout_second() const { return _timeout; }

PipelinePtr add_pipeline();

PipelinePtr add_pipeline(PipelinePtr parent, int idx = -1);
PipelinePtr add_pipeline(PipelinePtr parent = nullptr, int idx = -1);

RuntimeState* get_runtime_state() { return _runtime_state.get(); }

Expand Down Expand Up @@ -185,8 +183,6 @@ class PipelineFragmentContext : public TaskExecutionContext {
const std::map<int, int>& shuffle_idx_to_instance_idx,
const bool ignore_data_hash_distribution);

bool _enable_local_shuffle() const { return _runtime_state->enable_local_shuffle(); }

Status _build_pipeline_tasks(const doris::TPipelineFragmentParams& request,
ThreadPool* thread_pool);
void _close_fragment_instance();
Expand Down Expand Up @@ -220,7 +216,7 @@ class PipelineFragmentContext : public TaskExecutionContext {
RuntimeProfile::Counter* _prepare_timer = nullptr;
RuntimeProfile::Counter* _init_context_timer = nullptr;
RuntimeProfile::Counter* _build_pipelines_timer = nullptr;
RuntimeProfile::Counter* _plan_local_shuffle_timer = nullptr;
RuntimeProfile::Counter* _plan_local_exchanger_timer = nullptr;
RuntimeProfile::Counter* _prepare_all_pipelines_timer = nullptr;
RuntimeProfile::Counter* _build_tasks_timer = nullptr;

Expand Down Expand Up @@ -303,7 +299,20 @@ class PipelineFragmentContext : public TaskExecutionContext {
// - _task_runtime_states is at the task level, unique to each task.

std::vector<TUniqueId> _fragment_instance_ids;
// Local runtime states for each task
/**
* Local runtime states for each task.
*
* 2-D matrix:
* +-------------------------+------------+-------+
* | | Instance 0 | Instance 1 | ... |
* +------------+------------+------------+-------+
* | Pipeline 0 | task 0-0 | task 0-1 | ... |
* +------------+------------+------------+-------+
* | Pipeline 1 | task 1-0 | task 1-1 | ... |
* +------------+------------+------------+-------+
* | ... |
* +--------------------------------------+-------+
*/
std::vector<std::vector<std::unique_ptr<RuntimeState>>> _task_runtime_states;

std::vector<std::unique_ptr<RuntimeFilterParamsContext>> _runtime_filter_states;
Expand Down
7 changes: 1 addition & 6 deletions be/src/vec/aggregate_functions/aggregate_function_window.h
Original file line number Diff line number Diff line change
Expand Up @@ -41,15 +41,10 @@
#include "vec/core/types.h"
#include "vec/data_types/data_type_number.h"

namespace doris {
namespace vectorized {
namespace doris::vectorized {
class Arena;
class BufferReadable;
class BufferWritable;
} // namespace vectorized
} // namespace doris

namespace doris::vectorized {

struct RowNumberData {
int64_t count = 0;
Expand Down

0 comments on commit af41e49

Please sign in to comment.