diff --git a/dbms/src/Flash/Coprocessor/DAGContext.cpp b/dbms/src/Flash/Coprocessor/DAGContext.cpp index a38eeef3145..1f6618d3170 100644 --- a/dbms/src/Flash/Coprocessor/DAGContext.cpp +++ b/dbms/src/Flash/Coprocessor/DAGContext.cpp @@ -14,6 +14,8 @@ #include #include +#include +#include #include #include #include @@ -33,6 +35,24 @@ bool strictSqlMode(UInt64 sql_mode) return sql_mode & TiDBSQLMode::STRICT_ALL_TABLES || sql_mode & TiDBSQLMode::STRICT_TRANS_TABLES; } +void DAGContext::initOutputInfo() +{ + output_field_types = collectOutputFieldTypes(*dag_request); + output_offsets.clear(); + result_field_types.clear(); + for (UInt32 i : dag_request->output_offsets()) + { + output_offsets.push_back(i); + if (unlikely(i >= output_field_types.size())) + throw TiFlashException( + fmt::format("{}: Invalid output offset(schema has {} columns, access index {}", __PRETTY_FUNCTION__, output_field_types.size(), i), + Errors::Coprocessor::BadRequest); + result_field_types.push_back(output_field_types[i]); + } + encode_type = analyzeDAGEncodeType(*this); + keep_session_timezone_info = encode_type == tipb::EncodeType::TypeChunk || encode_type == tipb::EncodeType::TypeCHBlock; +} + bool DAGContext::allowZeroInDate() const { return flags & TiDBSQLFlags::IGNORE_ZERO_IN_DATE; diff --git a/dbms/src/Flash/Coprocessor/DAGContext.h b/dbms/src/Flash/Coprocessor/DAGContext.h index b1c92a9035e..30397dc496a 100644 --- a/dbms/src/Flash/Coprocessor/DAGContext.h +++ b/dbms/src/Flash/Coprocessor/DAGContext.h @@ -25,8 +25,8 @@ #include #include +#include #include -#include #include #include #include @@ -112,6 +112,7 @@ constexpr UInt64 ALLOW_INVALID_DATES = 1ul << 32ul; class DAGContext { public: + // for non-mpp(cop/batchCop) explicit DAGContext(const tipb::DAGRequest & dag_request_) : dag_request(&dag_request_) , collect_execution_summaries(dag_request->has_collect_execution_summaries() && dag_request->collect_execution_summaries()) @@ -126,8 +127,11 @@ class DAGContext { assert(dag_request->has_root_executor() || dag_request->executors_size() > 0); return_executor_id = dag_request->root_executor().has_executor_id() || dag_request->executors(0).has_executor_id(); + + initOutputInfo(); } + // for mpp DAGContext(const tipb::DAGRequest & dag_request_, const mpp::TaskMeta & meta_, bool is_root_mpp_task_) : dag_request(&dag_request_) , collect_execution_summaries(dag_request->has_collect_execution_summaries() && dag_request->collect_execution_summaries()) @@ -144,8 +148,13 @@ class DAGContext , warning_count(0) { assert(dag_request->has_root_executor() && dag_request->root_executor().has_executor_id()); + + // only mpp task has join executor. + initExecutorIdToJoinIdMap(); + initOutputInfo(); } + // for test explicit DAGContext(UInt64 max_error_count_) : dag_request(nullptr) , collect_execution_summaries(false) @@ -162,7 +171,6 @@ class DAGContext void attachBlockIO(const BlockIO & io_); std::unordered_map & getProfileStreamsMap(); - void initExecutorIdToJoinIdMap(); std::unordered_map> & getExecutorIdToJoinIdMap(); std::unordered_map & getJoinExecuteInfoMap(); @@ -291,9 +299,17 @@ class DAGContext LoggerPtr log; - bool keep_session_timezone_info = false; + // initialized in `initOutputInfo`. std::vector result_field_types; tipb::EncodeType encode_type = tipb::EncodeType::TypeDefault; + // only meaningful in final projection. + bool keep_session_timezone_info = false; + std::vector output_field_types; + std::vector output_offsets; + +private: + void initExecutorIdToJoinIdMap(); + void initOutputInfo(); private: /// Hold io for correcting the destruction order. diff --git a/dbms/src/Flash/Coprocessor/DAGQueryBlock.h b/dbms/src/Flash/Coprocessor/DAGQueryBlock.h index 6ad35bc63be..486345efa03 100644 --- a/dbms/src/Flash/Coprocessor/DAGQueryBlock.h +++ b/dbms/src/Flash/Coprocessor/DAGQueryBlock.h @@ -68,10 +68,6 @@ class DAGQueryBlock String qb_column_prefix; std::vector> children; - // only meaningful for root query block. - std::vector output_field_types; - std::vector output_offsets; - bool isRootQueryBlock() const { return id == 1; }; bool isTableScanSource() const { return source->tp() == tipb::ExecType::TypeTableScan || source->tp() == tipb::ExecType::TypePartitionTableScan; } }; diff --git a/dbms/src/Flash/Coprocessor/DAGQueryBlockInterpreter.cpp b/dbms/src/Flash/Coprocessor/DAGQueryBlockInterpreter.cpp index 6c3c6700577..51cd1bf671f 100644 --- a/dbms/src/Flash/Coprocessor/DAGQueryBlockInterpreter.cpp +++ b/dbms/src/Flash/Coprocessor/DAGQueryBlockInterpreter.cpp @@ -57,12 +57,10 @@ DAGQueryBlockInterpreter::DAGQueryBlockInterpreter( const std::vector & input_streams_vec_, const DAGQueryBlock & query_block_, size_t max_streams_, - bool keep_session_timezone_info_, std::vector & subqueries_for_sets_) : context(context_) , input_streams_vec(input_streams_vec_) , query_block(query_block_) - , keep_session_timezone_info(keep_session_timezone_info_) , max_streams(max_streams_) , subqueries_for_sets(subqueries_for_sets_) , log(Logger::get("DAGQueryBlockInterpreter", dagContext().log ? dagContext().log->identifier() : "")) @@ -118,7 +116,6 @@ AnalysisResult analyzeExpressions( Context & context, DAGExpressionAnalyzer & analyzer, const DAGQueryBlock & query_block, - bool keep_session_timezone_info, NamesWithAliases & final_project) { AnalysisResult res; @@ -174,14 +171,15 @@ AnalysisResult analyzeExpressions( res.order_columns = analyzer.appendOrderBy(chain, query_block.limit_or_topn->topn()); } + const auto & dag_context = *context.getDAGContext(); // Append final project results if needed. final_project = query_block.isRootQueryBlock() ? analyzer.appendFinalProjectForRootQueryBlock( chain, - query_block.output_field_types, - query_block.output_offsets, + dag_context.output_field_types, + dag_context.output_offsets, query_block.qb_column_prefix, - keep_session_timezone_info) + dag_context.keep_session_timezone_info) : analyzer.appendFinalProjectForNonRootQueryBlock( chain, query_block.qb_column_prefix); @@ -1057,7 +1055,6 @@ void DAGQueryBlockInterpreter::executeImpl(DAGPipeline & pipeline) context, *analyzer, query_block, - keep_session_timezone_info, final_project); if (res.before_where) diff --git a/dbms/src/Flash/Coprocessor/DAGQueryBlockInterpreter.h b/dbms/src/Flash/Coprocessor/DAGQueryBlockInterpreter.h index 5325b76eec6..35627cd19ee 100644 --- a/dbms/src/Flash/Coprocessor/DAGQueryBlockInterpreter.h +++ b/dbms/src/Flash/Coprocessor/DAGQueryBlockInterpreter.h @@ -48,7 +48,6 @@ class DAGQueryBlockInterpreter const std::vector & input_streams_vec_, const DAGQueryBlock & query_block_, size_t max_streams_, - bool keep_session_timezone_info_, std::vector & subqueries_for_sets_); ~DAGQueryBlockInterpreter() = default; @@ -110,7 +109,6 @@ class DAGQueryBlockInterpreter Context & context; std::vector input_streams_vec; const DAGQueryBlock & query_block; - const bool keep_session_timezone_info; NamesWithAliases final_project; diff --git a/dbms/src/Flash/Coprocessor/DAGQuerySource.cpp b/dbms/src/Flash/Coprocessor/DAGQuerySource.cpp index 72d93f86e85..882699e1599 100644 --- a/dbms/src/Flash/Coprocessor/DAGQuerySource.cpp +++ b/dbms/src/Flash/Coprocessor/DAGQuerySource.cpp @@ -20,11 +20,6 @@ namespace DB { -namespace ErrorCodes -{ -extern const int COP_BAD_DAG_REQUEST; -} // namespace ErrorCodes - DAGQuerySource::DAGQuerySource(Context & context_) : context(context_) { @@ -38,22 +33,6 @@ DAGQuerySource::DAGQuerySource(Context & context_) { root_query_block = std::make_shared(1, dag_request.executors()); } - - root_query_block->output_field_types = collectOutputFieldTypes(dag_request); - getDAGContext().initExecutorIdToJoinIdMap(); - - for (UInt32 i : dag_request.output_offsets()) - { - root_query_block->output_offsets.push_back(i); - if (unlikely(i >= root_query_block->output_field_types.size())) - throw TiFlashException( - fmt::format("{}: Invalid output offset(schema has {} columns, access index {}", __PRETTY_FUNCTION__, root_query_block->output_field_types.size(), i), - Errors::Coprocessor::BadRequest); - getDAGContext().result_field_types.push_back(root_query_block->output_field_types[i]); - } - auto encode_type = analyzeDAGEncodeType(getDAGContext()); - getDAGContext().encode_type = encode_type; - getDAGContext().keep_session_timezone_info = encode_type == tipb::EncodeType::TypeChunk || encode_type == tipb::EncodeType::TypeCHBlock; } std::tuple DAGQuerySource::parse(size_t) diff --git a/dbms/src/Flash/Coprocessor/InterpreterDAG.cpp b/dbms/src/Flash/Coprocessor/InterpreterDAG.cpp index d3c23fe2e16..1bfe87e5695 100644 --- a/dbms/src/Flash/Coprocessor/InterpreterDAG.cpp +++ b/dbms/src/Flash/Coprocessor/InterpreterDAG.cpp @@ -51,7 +51,6 @@ BlockInputStreams InterpreterDAG::executeQueryBlock(DAGQueryBlock & query_block, input_streams_vec, query_block, max_streams, - dagContext().keep_session_timezone_info || !query_block.isRootQueryBlock(), subqueries_for_sets); return query_block_interpreter.execute(); }