diff --git a/dbms/src/Flash/Coprocessor/DAGContext.h b/dbms/src/Flash/Coprocessor/DAGContext.h index 8b94d4637a8..a50a4d4007b 100644 --- a/dbms/src/Flash/Coprocessor/DAGContext.h +++ b/dbms/src/Flash/Coprocessor/DAGContext.h @@ -349,6 +349,10 @@ class DAGContext std::vector output_field_types; std::vector output_offsets; + /// Hold the order of list based executors. + /// It is used to ensure that the order of Execution summary of list based executors is the same as the order of list based executors. + std::vector list_based_executors_order; + private: void initExecutorIdToJoinIdMap(); void initOutputInfo(); @@ -356,7 +360,7 @@ class DAGContext private: /// Hold io for correcting the destruction order. BlockIO io; - /// profile_streams_map is a map that maps from executor_id to profile BlockInputStreams + /// profile_streams_map is a map that maps from executor_id to profile BlockInputStreams. std::unordered_map profile_streams_map; /// executor_id_to_join_id_map is a map that maps executor id to all the join executor id of itself and all its children. std::unordered_map> executor_id_to_join_id_map; diff --git a/dbms/src/Flash/Coprocessor/DAGQuerySource.cpp b/dbms/src/Flash/Coprocessor/DAGQuerySource.cpp index 882699e1599..d68a7b17aaa 100644 --- a/dbms/src/Flash/Coprocessor/DAGQuerySource.cpp +++ b/dbms/src/Flash/Coprocessor/DAGQuerySource.cpp @@ -20,6 +20,26 @@ namespace DB { +namespace +{ +void fillOrderForListBasedExecutors(DAGContext & dag_context, const DAGQueryBlock & query_block) +{ + assert(query_block.source); + auto & list_based_executors_order = dag_context.list_based_executors_order; + list_based_executors_order.push_back(query_block.source_name); + if (query_block.selection) + list_based_executors_order.push_back(query_block.selection_name); + if (query_block.aggregation) + list_based_executors_order.push_back(query_block.aggregation_name); + if (query_block.having) + list_based_executors_order.push_back(query_block.having_name); + if (query_block.limit_or_topn) + list_based_executors_order.push_back(query_block.limit_or_topn_name); + if (query_block.exchange_sender) + dag_context.list_based_executors_order.push_back(query_block.exchange_sender_name); +} +} // namespace + DAGQuerySource::DAGQuerySource(Context & context_) : context(context_) { @@ -32,6 +52,9 @@ DAGQuerySource::DAGQuerySource(Context & context_) else { root_query_block = std::make_shared(1, dag_request.executors()); + auto & dag_context = getDAGContext(); + if (!dag_context.return_executor_id) + fillOrderForListBasedExecutors(dag_context, *root_query_block); } } diff --git a/dbms/src/Flash/Coprocessor/DAGResponseWriter.cpp b/dbms/src/Flash/Coprocessor/DAGResponseWriter.cpp index 53bebc91da8..33f6d99f9d8 100644 --- a/dbms/src/Flash/Coprocessor/DAGResponseWriter.cpp +++ b/dbms/src/Flash/Coprocessor/DAGResponseWriter.cpp @@ -89,12 +89,10 @@ void DAGResponseWriter::addExecuteSummaries(tipb::SelectResponse & response, boo } } - /// add execution_summary for local executor - for (auto & p : dag_context.getProfileStreamsMap()) - { + auto fill_execution_summary = [&](const String & executor_id, const BlockInputStreams & streams) { ExecutionSummary current; /// part 1: local execution info - for (auto & stream_ptr : p.second) + for (const auto & stream_ptr : streams) { if (auto * p_stream = dynamic_cast(stream_ptr.get())) { @@ -105,16 +103,16 @@ void DAGResponseWriter::addExecuteSummaries(tipb::SelectResponse & response, boo current.concurrency++; } /// part 2: remote execution info - if (merged_remote_execution_summaries.find(p.first) != merged_remote_execution_summaries.end()) + if (merged_remote_execution_summaries.find(executor_id) != merged_remote_execution_summaries.end()) { - for (auto & remote : merged_remote_execution_summaries[p.first]) + for (auto & remote : merged_remote_execution_summaries[executor_id]) current.merge(remote, false); } /// part 3: for join need to add the build time /// In TiFlash, a hash join's build side is finished before probe side starts, /// so the join probe side's running time does not include hash table's build time, /// when construct ExecSummaries, we need add the build cost to probe executor - auto all_join_id_it = dag_context.getExecutorIdToJoinIdMap().find(p.first); + auto all_join_id_it = dag_context.getExecutorIdToJoinIdMap().find(executor_id); if (all_join_id_it != dag_context.getExecutorIdToJoinIdMap().end()) { for (const auto & join_executor_id : all_join_id_it->second) @@ -138,8 +136,27 @@ void DAGResponseWriter::addExecuteSummaries(tipb::SelectResponse & response, boo } current.time_processed_ns += dag_context.compile_time_ns; - fillTiExecutionSummary(response.add_execution_summaries(), current, p.first, delta_mode); + fillTiExecutionSummary(response.add_execution_summaries(), current, executor_id, delta_mode); + }; + + /// add execution_summary for local executor + if (dag_context.return_executor_id) + { + for (auto & p : dag_context.getProfileStreamsMap()) + fill_execution_summary(p.first, p.second); + } + else + { + const auto & profile_streams_map = dag_context.getProfileStreamsMap(); + assert(profile_streams_map.size() == dag_context.list_based_executors_order.size()); + for (const auto & executor_id : dag_context.list_based_executors_order) + { + auto it = profile_streams_map.find(executor_id); + assert(it != profile_streams_map.end()); + fill_execution_summary(executor_id, it->second); + } } + for (auto & p : merged_remote_execution_summaries) { if (local_executors.find(p.first) == local_executors.end())