Skip to content

Commit

Permalink
[Improvement](pipeline) Do parallel preparation for multiple fragments
Browse files Browse the repository at this point in the history
  • Loading branch information
Gabriel39 committed Sep 14, 2024
1 parent 957e2b1 commit e0034fc
Show file tree
Hide file tree
Showing 18 changed files with 441 additions and 29 deletions.
4 changes: 2 additions & 2 deletions be/src/pipeline/exec/hashjoin_build_sink.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -68,8 +68,8 @@ Status HashJoinBuildSinkLocalState::init(RuntimeState* state, LocalSinkStateInfo
_should_build_hash_table = info.task_idx == 0;
if (_should_build_hash_table) {
profile()->add_info_string("ShareHashTableEnabled", "true");
CHECK(p._shared_hashtable_controller->should_build_hash_table(
state->fragment_instance_id(), p.node_id()));
p._shared_hashtable_controller->set_builder_and_consumers(
state->fragment_instance_id(), p.node_id());
}
} else {
profile()->add_info_string("ShareHashTableEnabled", "false");
Expand Down
4 changes: 4 additions & 0 deletions be/src/pipeline/pipeline_fragment_context.h
Original file line number Diff line number Diff line change
Expand Up @@ -92,6 +92,10 @@ class PipelineFragmentContext : public TaskExecutionContext {
virtual Status prepare(const doris::TPipelineFragmentParams& request, ThreadPool* thread_pool) {
return Status::InternalError("Pipeline fragment context do not implement prepare");
}
virtual Status prepare(const doris::TPipelineFragmentParams& request, ThreadPool* thread_pool,
const TQueryOptions& query_options) {
return Status::InternalError("Pipeline fragment context do not implement prepare");
}

virtual Status submit();

Expand Down
103 changes: 98 additions & 5 deletions be/src/pipeline/pipeline_x/pipeline_x_fragment_context.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -178,6 +178,98 @@ void PipelineXFragmentContext::cancel(const PPlanFragmentCancelReason& reason,
}
}

Status PipelineXFragmentContext::prepare(const doris::TPipelineFragmentParams& request,
ThreadPool* thread_pool,
const TQueryOptions& query_options) {
if (_prepared) {
return Status::InternalError("Already prepared");
}
_num_instances = request.local_params.size();
_total_instances = request.__isset.total_instances ? request.total_instances : _num_instances;
_runtime_profile = std::make_unique<RuntimeProfile>("PipelineContext");
_prepare_timer = ADD_TIMER(_runtime_profile, "PrepareTime");
SCOPED_TIMER(_prepare_timer);

auto* fragment_context = this;

LOG_INFO("PipelineXFragmentContext::prepare")
.tag("query_id", print_id(_query_id))
.tag("fragment_id", _fragment_id)
.tag("pthread_id", (uintptr_t)pthread_self());

if (query_options.__isset.is_report_success) {
fragment_context->set_is_report_success(query_options.is_report_success);
}

// 1. Set up the global runtime state.
_runtime_state =
RuntimeState::create_unique(request.query_id, request.fragment_id, query_options,
_query_ctx->query_globals, _exec_env, _query_ctx.get());
SCOPED_SWITCH_THREAD_MEM_TRACKER_LIMITER(_runtime_state->query_mem_tracker());
if (request.__isset.backend_id) {
_runtime_state->set_backend_id(request.backend_id);
}
if (request.__isset.import_label) {
_runtime_state->set_import_label(request.import_label);
}
if (request.__isset.db_name) {
_runtime_state->set_db_name(request.db_name);
}
if (request.__isset.load_job_id) {
_runtime_state->set_load_job_id(request.load_job_id);
}

_desc_tbl = _query_ctx->desc_tbl;
_runtime_state->set_desc_tbl(_desc_tbl);
_runtime_state->set_num_per_fragment_instances(request.num_senders);
_runtime_state->set_load_stream_per_node(request.load_stream_per_node);
_runtime_state->set_total_load_streams(request.total_load_streams);
_runtime_state->set_num_local_sink(request.num_local_sink);

const auto& local_params = request.local_params[0];
_need_local_merge = request.__isset.parallel_instances;

// 2. Build pipelines with operators in this fragment.
auto root_pipeline = add_pipeline();
RETURN_IF_ERROR_OR_CATCH_EXCEPTION(_build_pipelines(
_runtime_state->obj_pool(), request, *_query_ctx->desc_tbl, &_root_op, root_pipeline));

// 3. Create sink operator
if (!request.fragment.__isset.output_sink) {
return Status::InternalError("No output sink in this fragment!");
}
RETURN_IF_ERROR_OR_CATCH_EXCEPTION(_create_data_sink(
_runtime_state->obj_pool(), request.fragment.output_sink, request.fragment.output_exprs,
request, root_pipeline->output_row_desc(), _runtime_state.get(), *_desc_tbl,
root_pipeline->id()));
RETURN_IF_ERROR(_sink->init(request.fragment.output_sink));
RETURN_IF_ERROR(root_pipeline->set_sink(_sink));

for (PipelinePtr& pipeline : _pipelines) {
DCHECK(pipeline->sink_x() != nullptr) << pipeline->operator_xs().size();
RETURN_IF_ERROR(pipeline->sink_x()->set_child(pipeline->operator_xs().back()));
}
if (_enable_local_shuffle()) {
RETURN_IF_ERROR(_plan_local_exchange(request.num_buckets,
request.bucket_seq_to_instance_idx,
request.shuffle_idx_to_instance_idx));
}

// 4. Initialize global states in pipelines.
for (PipelinePtr& pipeline : _pipelines) {
pipeline->children().clear();
RETURN_IF_ERROR(pipeline->prepare(_runtime_state.get()));
}

// 5. Build pipeline tasks and initialize local state.
RETURN_IF_ERROR(_build_pipeline_x_tasks(request, thread_pool, query_options));

_init_next_report_time();

_prepared = true;
return Status::OK();
}

Status PipelineXFragmentContext::prepare(const doris::TPipelineFragmentParams& request,
ThreadPool* thread_pool) {
if (_prepared) {
Expand Down Expand Up @@ -234,7 +326,7 @@ Status PipelineXFragmentContext::prepare(const doris::TPipelineFragmentParams& r
const auto& local_params = request.local_params[0];
if (local_params.__isset.runtime_filter_params) {
_query_ctx->runtime_filter_mgr()->set_runtime_filter_params(
local_params.runtime_filter_params);
local_params.runtime_filter_params.runtime_filter_merge_addr);
}
if (local_params.__isset.topn_filter_source_node_ids) {
_query_ctx->init_runtime_predicates(local_params.topn_filter_source_node_ids);
Expand Down Expand Up @@ -277,7 +369,7 @@ Status PipelineXFragmentContext::prepare(const doris::TPipelineFragmentParams& r
}

// 5. Build pipeline tasks and initialize local state.
RETURN_IF_ERROR(_build_pipeline_x_tasks(request, thread_pool));
RETURN_IF_ERROR(_build_pipeline_x_tasks(request, thread_pool, request.query_options));

_init_next_report_time();

Expand Down Expand Up @@ -501,7 +593,8 @@ Status PipelineXFragmentContext::_create_data_sink(ObjectPool* pool, const TData
}

Status PipelineXFragmentContext::_build_pipeline_x_tasks(
const doris::TPipelineFragmentParams& request, ThreadPool* thread_pool) {
const doris::TPipelineFragmentParams& request, ThreadPool* thread_pool,
const TQueryOptions& query_options) {
_total_tasks = 0;
int target_size = request.local_params.size();
_tasks.resize(target_size);
Expand Down Expand Up @@ -613,8 +706,8 @@ Status PipelineXFragmentContext::_build_pipeline_x_tasks(
// build task runtime state
_task_runtime_states[pip_idx][i] = RuntimeState::create_unique(
this, local_params.fragment_instance_id, request.query_id,
request.fragment_id, request.query_options, _query_ctx->query_globals,
_exec_env, _query_ctx.get());
request.fragment_id, query_options, _query_ctx->query_globals, _exec_env,
_query_ctx.get());
auto& task_runtime_state = _task_runtime_states[pip_idx][i];
init_runtime_state(task_runtime_state);
task_runtime_state->set_task_id(cur_task_id);
Expand Down
4 changes: 3 additions & 1 deletion be/src/pipeline/pipeline_x/pipeline_x_fragment_context.h
Original file line number Diff line number Diff line change
Expand Up @@ -100,6 +100,8 @@ class PipelineXFragmentContext : public PipelineFragmentContext {

// Prepare global information including global states and the unique operator tree shared by all pipeline tasks.
Status prepare(const doris::TPipelineFragmentParams& request, ThreadPool* thread_pool) override;
Status prepare(const doris::TPipelineFragmentParams& request, ThreadPool* thread_pool,
const TQueryOptions& query_options) override;

Status submit() override;

Expand All @@ -124,7 +126,7 @@ class PipelineXFragmentContext : public PipelineFragmentContext {
private:
void _close_fragment_instance() override;
Status _build_pipeline_x_tasks(const doris::TPipelineFragmentParams& request,
ThreadPool* thread_pool);
ThreadPool* thread_pool, const TQueryOptions& query_options);
Status _add_local_exchange(int pip_idx, int idx, int node_id, ObjectPool* pool,
PipelinePtr cur_pipe, DataDistribution data_distribution,
bool* do_local_exchange, int num_buckets,
Expand Down
192 changes: 192 additions & 0 deletions be/src/runtime/fragment_mgr.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -710,6 +710,83 @@ Status FragmentMgr::_get_query_ctx(const Params& params, TUniqueId query_id, boo
return Status::OK();
}

Status FragmentMgr::_get_query_ctx(const TPipelineFragmentParamsList& t_request, TUniqueId query_id,
std::shared_ptr<QueryContext>& query_ctx) {
std::lock_guard<std::mutex> lock(_lock);
auto search = _query_ctx_map.find(query_id);
if (search == _query_ctx_map.end()) {
// This may be a first fragment request of the query.
// Create the query fragments context.
query_ctx = QueryContext::create_shared(query_id, t_request.fragment_num_on_host, _exec_env,
t_request.query_options, t_request.coord, true,
t_request.is_nereids);
SCOPED_SWITCH_THREAD_MEM_TRACKER_LIMITER(query_ctx->query_mem_tracker);
RETURN_IF_ERROR(DescriptorTbl::create(&(query_ctx->obj_pool), t_request.desc_tbl,
&(query_ctx->desc_tbl)));
// set file scan range params
if (t_request.__isset.file_scan_params) {
query_ctx->file_scan_range_params_map = t_request.file_scan_params;
}

LOG(INFO) << "query_id: " << UniqueId(query_ctx->query_id().hi, query_ctx->query_id().lo)
<< " coord_addr " << query_ctx->coord_addr
<< " total fragment num on current host: " << t_request.fragment_num_on_host
<< " fe process uuid: " << t_request.query_options.fe_process_uuid;
query_ctx->query_globals = t_request.query_globals;

if (t_request.__isset.resource_info) {
query_ctx->user = t_request.resource_info.user;
query_ctx->group = t_request.resource_info.group;
query_ctx->set_rsc_info = true;
}

query_ctx->get_shared_hash_table_controller()->set_pipeline_engine_enabled(true);
_set_scan_concurrency(t_request, query_ctx.get());

if (t_request.__isset.workload_groups && !t_request.workload_groups.empty()) {
uint64_t tg_id = t_request.workload_groups[0].id;
WorkloadGroupPtr workload_group_ptr =
_exec_env->workload_group_mgr()->get_task_group_by_id(tg_id);
if (workload_group_ptr != nullptr) {
RETURN_IF_ERROR(workload_group_ptr->add_query(query_id, query_ctx));
RETURN_IF_ERROR(query_ctx->set_workload_group(workload_group_ptr));
_exec_env->runtime_query_statistics_mgr()->set_workload_group_id(print_id(query_id),
tg_id);

LOG(INFO) << "Query/load id: " << print_id(query_ctx->query_id())
<< ", use workload group: " << workload_group_ptr->debug_string()
<< ", enable cgroup soft limit: "
<< ((int)config::enable_cgroup_cpu_soft_limit);
} else {
LOG(INFO) << "Query/load id: " << print_id(query_ctx->query_id())
<< " carried group info but can not find group in be";
}
}
RETURN_IF_ERROR(DescriptorTbl::create(&(query_ctx->obj_pool), t_request.desc_tbl,
&(query_ctx->desc_tbl)));

if (t_request.__isset.topn_filter_source_node_ids) {
query_ctx->init_runtime_predicates(t_request.topn_filter_source_node_ids);
} else {
query_ctx->init_runtime_predicates({0});
}
if (t_request.__isset.runtime_filter_merge_addr) {
query_ctx->runtime_filter_mgr()->set_runtime_filter_params(
t_request.runtime_filter_merge_addr);
}

// There is some logic in query ctx's dctor, we could not check if exists and delete the
// temp query ctx now. For example, the query id maybe removed from workload group's queryset.
_query_ctx_map.insert(std::make_pair(query_ctx->query_id(), query_ctx));
LOG(INFO) << "Register query/load memory tracker, query/load id: "
<< print_id(query_ctx->query_id())
<< " limit: " << PrettyPrinter::print(query_ctx->mem_limit(), TUnit::BYTES);
} else {
query_ctx = search->second;
}
return Status::OK();
}

Status FragmentMgr::exec_plan_fragment(const TExecPlanFragmentParams& params,
const FinishCallback& cb) {
VLOG_ROW << "exec_plan_fragment params is "
Expand Down Expand Up @@ -1017,6 +1094,121 @@ Status FragmentMgr::exec_plan_fragment(const TPipelineFragmentParams& params,
return Status::OK();
}

Status FragmentMgr::exec_plan_fragment(const TPipelineFragmentParamsList& t_request, const int i) {
if (t_request.params_list[i].txn_conf.need_txn) {
std::shared_ptr<StreamLoadContext> stream_load_ctx =
std::make_shared<StreamLoadContext>(_exec_env);
stream_load_ctx->db = t_request.params_list[i].txn_conf.db;
stream_load_ctx->db_id = t_request.params_list[i].txn_conf.db_id;
stream_load_ctx->table = t_request.params_list[i].txn_conf.tbl;
stream_load_ctx->txn_id = t_request.params_list[i].txn_conf.txn_id;
stream_load_ctx->id = UniqueId(t_request.params_list[i].query_id);
stream_load_ctx->put_result.pipeline_params = t_request.params_list[i];
stream_load_ctx->use_streaming = true;
stream_load_ctx->load_type = TLoadType::MANUL_LOAD;
stream_load_ctx->load_src_type = TLoadSourceType::RAW;
stream_load_ctx->label = t_request.params_list[i].import_label;
stream_load_ctx->format = TFileFormatType::FORMAT_CSV_PLAIN;
stream_load_ctx->timeout_second = 3600;
stream_load_ctx->auth.token = t_request.params_list[i].txn_conf.token;
stream_load_ctx->need_commit_self = true;
stream_load_ctx->need_rollback = true;
auto pipe = std::make_shared<io::StreamLoadPipe>(
io::kMaxPipeBufferedBytes /* max_buffered_bytes */, 64 * 1024 /* min_chunk_size */,
-1 /* total_length */, true /* use_proto */);
stream_load_ctx->body_sink = pipe;
stream_load_ctx->pipe = pipe;
stream_load_ctx->max_filter_ratio = t_request.params_list[i].txn_conf.max_filter_ratio;

RETURN_IF_ERROR(
_exec_env->new_load_stream_mgr()->put(stream_load_ctx->id, stream_load_ctx));

RETURN_IF_ERROR(_exec_env->stream_load_executor()->execute_plan_fragment(stream_load_ctx));
return Status::OK();
} else {
return exec_plan_fragment(t_request, i, empty_function);
}
}

Status FragmentMgr::exec_plan_fragment(const TPipelineFragmentParamsList& t_request, const int i,
const FinishCallback& cb) {
VLOG_ROW << "query: " << print_id(t_request.query_id) << " exec_plan_fragment params is "
<< apache::thrift::ThriftDebugString(t_request.params_list[i]).c_str();
// sometimes TExecPlanFragmentParams debug string is too long and glog
// will truncate the log line, so print query options seperately for debuggin purpose
VLOG_ROW << "query: " << print_id(t_request.query_id) << "query options is "
<< apache::thrift::ThriftDebugString(t_request.query_options).c_str();

std::shared_ptr<QueryContext> query_ctx;
RETURN_IF_ERROR(_get_query_ctx(t_request, t_request.query_id, query_ctx));
SCOPED_ATTACH_TASK_WITH_ID(query_ctx->query_mem_tracker, t_request.query_id);
DCHECK((t_request.query_options.__isset.enable_pipeline_x_engine &&
t_request.query_options.enable_pipeline_x_engine) ||
(t_request.query_options.__isset.enable_pipeline_engine &&
t_request.query_options.enable_pipeline_engine));
int64_t duration_ns = 0;
std::shared_ptr<pipeline::PipelineFragmentContext> context =
std::make_shared<pipeline::PipelineFragmentContext>(
query_ctx->query_id(), t_request.params_list[i].fragment_id, query_ctx,
_exec_env, cb,
std::bind<Status>(std::mem_fn(&FragmentMgr::trigger_pipeline_context_report),
this, std::placeholders::_1, std::placeholders::_2));
{
SCOPED_RAW_TIMER(&duration_ns);
auto prepare_st = context->prepare(t_request.params_list[i], t_request.query_options);
if (!prepare_st.ok()) {
context->close_if_prepare_failed(prepare_st);
query_ctx->set_execution_dependency_ready();
return prepare_st;
}
}
g_fragmentmgr_prepare_latency << (duration_ns / 1000);

std::shared_ptr<RuntimeFilterMergeControllerEntity> handler;
RETURN_IF_ERROR(_runtimefilter_controller.add_entity(
t_request.params_list[i].local_params[0], t_request.query_id, t_request.query_options,
&handler, RuntimeFilterParamsContext::create(context->get_runtime_state())));
if (handler) {
query_ctx->set_merge_controller_handler(handler);
}

for (const auto& local_param : t_request.params_list[i].local_params) {
const TUniqueId& fragment_instance_id = local_param.fragment_instance_id;
std::lock_guard<std::mutex> lock(_lock);
auto iter = _pipeline_map.find(fragment_instance_id);
if (iter != _pipeline_map.end()) {
return Status::InternalError(
"exec_plan_fragment input duplicated fragment_instance_id({})",
UniqueId(fragment_instance_id).to_string());
}
query_ctx->fragment_instance_ids.push_back(fragment_instance_id);
}

if (!t_request.params_list[i].__isset.need_wait_execution_trigger ||
!t_request.params_list[i].need_wait_execution_trigger) {
query_ctx->set_ready_to_execute_only();
}

int64 now = duration_cast<std::chrono::milliseconds>(
std::chrono::system_clock::now().time_since_epoch())
.count();
{
g_fragment_executing_count << 1;
g_fragment_last_active_time.set_value(now);
std::lock_guard<std::mutex> lock(_lock);
std::vector<TUniqueId> ins_ids;
context->instance_ids(ins_ids);
// TODO: simplify this mapping
for (const auto& ins_id : ins_ids) {
_pipeline_map.insert({ins_id, context});
}
}
query_ctx->set_pipeline_context(t_request.params_list[i].fragment_id, context);

RETURN_IF_ERROR(context->submit());
return Status::OK();
}

template <typename Param>
void FragmentMgr::_set_scan_concurrency(const Param& params, QueryContext* query_ctx) {
#ifndef BE_TEST
Expand Down
Loading

0 comments on commit e0034fc

Please sign in to comment.