diff --git a/be/src/pipeline/pipeline_fragment_context.cpp b/be/src/pipeline/pipeline_fragment_context.cpp index 05f698ffeba6ff..ec048d0db51ab8 100644 --- a/be/src/pipeline/pipeline_fragment_context.cpp +++ b/be/src/pipeline/pipeline_fragment_context.cpp @@ -144,12 +144,7 @@ PipelineFragmentContext::~PipelineFragmentContext() { auto st = _query_ctx->exec_status(); for (size_t i = 0; i < _tasks.size(); i++) { if (!_tasks[i].empty()) { - _call_back(_tasks[i].front()->runtime_state(), &st); - } - } - for (auto& runtime_states : _task_runtime_states) { - for (auto& runtime_state : runtime_states) { - runtime_state.reset(); + _call_back(_tasks[i].front().first->runtime_state(), &st); } } _tasks.clear(); @@ -234,7 +229,7 @@ void PipelineFragmentContext::cancel(const Status reason) { for (auto& tasks : _tasks) { for (auto& task : tasks) { - task->terminate(); + task.first->terminate(); } } } @@ -379,9 +374,7 @@ Status PipelineFragmentContext::_build_pipeline_tasks(ThreadPool* thread_pool) { const auto target_size = _params.local_params.size(); _tasks.resize(target_size); _runtime_filter_mgr_map.resize(target_size); - _task_runtime_states.resize(_pipelines.size()); for (size_t pip_idx = 0; pip_idx < _pipelines.size(); pip_idx++) { - _task_runtime_states[pip_idx].resize(_pipelines[pip_idx]->num_tasks()); _pip_id_to_pipeline[_pipelines[pip_idx]->id()] = _pipelines[pip_idx].get(); } auto pipeline_id_to_profile = _runtime_state->build_pipeline_profile(_pipelines.size()); @@ -416,14 +409,10 @@ Status PipelineFragmentContext::_build_pipeline_tasks(ThreadPool* thread_pool) { for (size_t pip_idx = 0; pip_idx < _pipelines.size(); pip_idx++) { auto& pipeline = _pipelines[pip_idx]; if (pipeline->num_tasks() > 1 || i == 0) { - DCHECK(_task_runtime_states[pip_idx][i] == nullptr) - << print_id(_task_runtime_states[pip_idx][i]->fragment_instance_id()) << " " - << pipeline->debug_string(); - _task_runtime_states[pip_idx][i] = RuntimeState::create_unique( + auto task_runtime_state = RuntimeState::create_unique( local_params.fragment_instance_id, _params.query_id, _params.fragment_id, _params.query_options, _query_ctx->query_globals, _exec_env, _query_ctx.get()); - auto& task_runtime_state = _task_runtime_states[pip_idx][i]; { // Initialize runtime state for this task task_runtime_state->set_query_mem_tracker(_query_ctx->query_mem_tracker()); @@ -470,7 +459,9 @@ Status PipelineFragmentContext::_build_pipeline_tasks(ThreadPool* thread_pool) { pipeline_id_to_profile[pip_idx].get(), get_shared_state(pipeline), i); pipeline->incr_created_tasks(i, task.get()); pipeline_id_to_task.insert({pipeline->id(), task.get()}); - _tasks[i].emplace_back(std::move(task)); + _tasks[i].emplace_back( + std::pair, std::unique_ptr> { + std::move(task), std::move(task_runtime_state)}); } } @@ -1703,7 +1694,7 @@ Status PipelineFragmentContext::submit() { auto* scheduler = _query_ctx->get_pipe_exec_scheduler(); for (auto& task : _tasks) { for (auto& t : task) { - st = scheduler->submit(t); + st = scheduler->submit(t.first); DBUG_EXECUTE_IF("PipelineFragmentContext.submit.failed", { st = Status::Aborted("PipelineFragmentContext.submit.failed"); }); if (!st) { @@ -1810,12 +1801,9 @@ std::string PipelineFragmentContext::get_load_error_url() { if (const auto& str = _runtime_state->get_error_log_file_path(); !str.empty()) { return to_load_error_http_path(str); } - for (auto& task_states : _task_runtime_states) { - for (auto& task_state : task_states) { - if (!task_state) { - continue; - } - if (const auto& str = task_state->get_error_log_file_path(); !str.empty()) { + for (auto& tasks : _tasks) { + for (auto& task : tasks) { + if (const auto& str = task.second->get_error_log_file_path(); !str.empty()) { return to_load_error_http_path(str); } } @@ -1827,12 +1815,9 @@ std::string PipelineFragmentContext::get_first_error_msg() { if (const auto& str = _runtime_state->get_first_error_msg(); !str.empty()) { return str; } - for (auto& task_states : _task_runtime_states) { - for (auto& task_state : task_states) { - if (!task_state) { - continue; - } - if (const auto& str = task_state->get_first_error_msg(); !str.empty()) { + for (auto& tasks : _tasks) { + for (auto& task : tasks) { + if (const auto& str = task.second->get_first_error_msg(); !str.empty()) { return str; } } @@ -1862,11 +1847,9 @@ Status PipelineFragmentContext::send_report(bool done) { std::vector runtime_states; - for (auto& task_states : _task_runtime_states) { - for (auto& task_state : task_states) { - if (task_state) { - runtime_states.push_back(task_state.get()); - } + for (auto& tasks : _tasks) { + for (auto& task : tasks) { + runtime_states.push_back(task.second.get()); } } @@ -1900,15 +1883,15 @@ size_t PipelineFragmentContext::get_revocable_size(bool* has_running_task) const // here to traverse the vector. for (const auto& task_instances : _tasks) { for (const auto& task : task_instances) { - if (task->is_running()) { + if (task.first->is_running()) { LOG_EVERY_N(INFO, 50) << "Query: " << print_id(_query_id) - << " is running, task: " << (void*)task.get() - << ", is_running: " << task->is_running(); + << " is running, task: " << (void*)task.first.get() + << ", is_running: " << task.first->is_running(); *has_running_task = true; return 0; } - size_t revocable_size = task->get_revocable_size(); + size_t revocable_size = task.first->get_revocable_size(); if (revocable_size >= vectorized::SpillStream::MIN_SPILL_WRITE_BATCH_MEM) { res += revocable_size; } @@ -1921,9 +1904,9 @@ std::vector PipelineFragmentContext::get_revocable_tasks() const std::vector revocable_tasks; for (const auto& task_instances : _tasks) { for (const auto& task : task_instances) { - size_t revocable_size_ = task->get_revocable_size(); + size_t revocable_size_ = task.first->get_revocable_size(); if (revocable_size_ >= vectorized::SpillStream::MIN_SPILL_WRITE_BATCH_MEM) { - revocable_tasks.emplace_back(task.get()); + revocable_tasks.emplace_back(task.first.get()); } } } @@ -1936,9 +1919,8 @@ std::string PipelineFragmentContext::debug_string() { for (size_t j = 0; j < _tasks.size(); j++) { fmt::format_to(debug_string_buffer, "Tasks in instance {}:\n", j); for (size_t i = 0; i < _tasks[j].size(); i++) { - fmt::format_to(debug_string_buffer, "Task {}: {}\n{}\n", i, - _tasks[j][i]->debug_string(), - _task_runtime_states[i][j]->local_runtime_filter_mgr()->debug_string()); + fmt::format_to(debug_string_buffer, "Task {}: {}\n", i, + _tasks[j][i].first->debug_string()); } } @@ -1988,16 +1970,16 @@ PipelineFragmentContext::collect_realtime_load_channel_profile() const { return nullptr; } - for (const auto& runtime_states : _task_runtime_states) { - for (const auto& runtime_state : runtime_states) { - if (runtime_state == nullptr || runtime_state->runtime_profile() == nullptr) { + for (const auto& tasks : _tasks) { + for (const auto& task : tasks) { + if (task.second->runtime_profile() == nullptr) { continue; } auto tmp_load_channel_profile = std::make_shared(); - runtime_state->runtime_profile()->to_thrift(tmp_load_channel_profile.get(), - _runtime_state->profile_level()); + task.second->runtime_profile()->to_thrift(tmp_load_channel_profile.get(), + _runtime_state->profile_level()); _runtime_state->load_channel_profile()->update(*tmp_load_channel_profile); } } diff --git a/be/src/pipeline/pipeline_fragment_context.h b/be/src/pipeline/pipeline_fragment_context.h index f3ee112b0a049e..81b3f57b01fbf8 100644 --- a/be/src/pipeline/pipeline_fragment_context.h +++ b/be/src/pipeline/pipeline_fragment_context.h @@ -117,7 +117,7 @@ class PipelineFragmentContext : public TaskExecutionContext { void clear_finished_tasks() { for (size_t j = 0; j < _tasks.size(); j++) { for (size_t i = 0; i < _tasks[j].size(); i++) { - _tasks[j][i]->stop_if_finished(); + _tasks[j][i].first->stop_if_finished(); } } } @@ -228,8 +228,25 @@ class PipelineFragmentContext : public TaskExecutionContext { bool _use_serial_source = false; OperatorPtr _root_op = nullptr; - // this is a [n * m] matrix. n is parallelism of pipeline engine and m is the number of pipelines. - std::vector>> _tasks; + // + /** + * Matrix stores tasks with local runtime states. + * This is a [n * m] matrix. n is parallelism of pipeline engine and m is the number of pipelines. + * + * 2-D matrix: + * +-------------------------+------------+-------+ + * | | Pipeline 0 | Pipeline 1 | ... | + * +------------+------------+------------+-------+ + * | Instance 0 | task 0-0 | task 0-1 | ... | + * +------------+------------+------------+-------+ + * | Instance 1 | task 1-0 | task 1-1 | ... | + * +------------+------------+------------+-------+ + * | ... | + * +--------------------------------------+-------+ + */ + std::vector< + std::vector, std::unique_ptr>>> + _tasks; // TODO: remove the _sink and _multi_cast_stream_sink_senders to set both // of it in pipeline task not the fragment_context @@ -299,21 +316,6 @@ class PipelineFragmentContext : public TaskExecutionContext { // - _task_runtime_states is at the task level, unique to each task. std::vector _fragment_instance_ids; - /** - * Local runtime states for each task. - * - * 2-D matrix: - * +-------------------------+------------+-------+ - * | | Instance 0 | Instance 1 | ... | - * +------------+------------+------------+-------+ - * | Pipeline 0 | task 0-0 | task 0-1 | ... | - * +------------+------------+------------+-------+ - * | Pipeline 1 | task 1-0 | task 1-1 | ... | - * +------------+------------+------------+-------+ - * | ... | - * +--------------------------------------+-------+ - */ - std::vector>> _task_runtime_states; // Total instance num running on all BEs int _total_instances = -1; diff --git a/be/src/pipeline/pipeline_task.cpp b/be/src/pipeline/pipeline_task.cpp index 2da386e83a1c16..e069cb840d1ca3 100644 --- a/be/src/pipeline/pipeline_task.cpp +++ b/be/src/pipeline/pipeline_task.cpp @@ -735,10 +735,15 @@ std::string PipelineTask::debug_string() { return fmt::to_string(debug_string_buffer); } auto elapsed = fragment->elapsed_time() / NANOS_PER_SEC; - fmt::format_to(debug_string_buffer, - " elapse time = {}s, block dependency = [{}]\noperators: ", elapsed, + fmt::format_to(debug_string_buffer, " elapse time = {}s, block dependency = [{}]\n", elapsed, cur_blocked_dep && !is_finalized() ? cur_blocked_dep->debug_string() : "NULL"); + if (_state && _state->local_runtime_filter_mgr()) { + fmt::format_to(debug_string_buffer, "local_runtime_filter_mgr: [{}]\n", + _state->local_runtime_filter_mgr()->debug_string()); + } + + fmt::format_to(debug_string_buffer, "operators: "); for (size_t i = 0; i < _operators.size(); i++) { fmt::format_to(debug_string_buffer, "\n{}", _opened && !is_finalized()