diff --git a/.github/workflows/license-checker.yml b/.github/workflows/license-checker.yml index c4c510677b1..85bce2ca9e1 100644 --- a/.github/workflows/license-checker.yml +++ b/.github/workflows/license-checker.yml @@ -15,7 +15,7 @@ jobs: steps: - uses: actions/checkout@v2 - name: Check License Header - uses: apache/skywalking-eyes@main + uses: apache/skywalking-eyes@v0.3.0 env: GITHUB_TOKEN: ${{ secrets.GITHUB_TOKEN }} with: diff --git a/dbms/src/DataStreams/TiRemoteBlockInputStream.h b/dbms/src/DataStreams/TiRemoteBlockInputStream.h index f249bf1a0dc..76fda0b57d0 100644 --- a/dbms/src/DataStreams/TiRemoteBlockInputStream.h +++ b/dbms/src/DataStreams/TiRemoteBlockInputStream.h @@ -19,6 +19,7 @@ #include #include #include +#include #include #include #include @@ -176,21 +177,13 @@ class TiRemoteBlockInputStream : public IProfilingBlockInputStream , log(Logger::get(name, req_id, executor_id)) , total_rows(0) { - // generate sample block - ColumnsWithTypeAndName columns; - for (auto & dag_col : remote_reader->getOutputSchema()) - { - auto tp = getDataTypeByColumnInfoForComputingLayer(dag_col.second); - ColumnWithTypeAndName col(tp, dag_col.first); - columns.emplace_back(col); - } - for (size_t i = 0; i < source_num; i++) + for (size_t i = 0; i < source_num; ++i) { execution_summaries_inited[i].store(false); } execution_summaries.resize(source_num); connection_profile_infos.resize(source_num); - sample_block = Block(columns); + sample_block = Block(getColumnWithTypeAndName(toNamesAndTypes(remote_reader->getOutputSchema()))); } Block getHeader() const override { return sample_block; } diff --git a/dbms/src/Debug/astToExecutor.cpp b/dbms/src/Debug/astToExecutor.cpp index 82f894905e6..a1e9295b3f5 100644 --- a/dbms/src/Debug/astToExecutor.cpp +++ b/dbms/src/Debug/astToExecutor.cpp @@ -170,6 +170,7 @@ std::unordered_map func_name_to_sig({ {"cast_decimal_datetime", tipb::ScalarFuncSig::CastDecimalAsTime}, {"cast_time_datetime", tipb::ScalarFuncSig::CastTimeAsTime}, {"cast_string_datetime", tipb::ScalarFuncSig::CastStringAsTime}, + {"concat", tipb::ScalarFuncSig::Concat}, {"round_int", tipb::ScalarFuncSig::RoundInt}, {"round_uint", tipb::ScalarFuncSig::RoundInt}, {"round_dec", tipb::ScalarFuncSig::RoundDec}, @@ -455,6 +456,14 @@ void functionToPB(const DAGSchema & input, ASTFunction * func, tipb::Expr * expr ft->set_collate(collator_id); break; } + case tipb::ScalarFuncSig::Concat: + { + expr->set_sig(it_sig->second); + auto * ft = expr->mutable_field_type(); + ft->set_tp(TiDB::TypeString); + ft->set_collate(collator_id); + break; + } case tipb::ScalarFuncSig::RoundInt: case tipb::ScalarFuncSig::RoundWithFracInt: { diff --git a/dbms/src/Flash/CMakeLists.txt b/dbms/src/Flash/CMakeLists.txt index f250029b333..5162f04fa1b 100644 --- a/dbms/src/Flash/CMakeLists.txt +++ b/dbms/src/Flash/CMakeLists.txt @@ -27,5 +27,6 @@ target_link_libraries(flash_service dbms) if (ENABLE_TESTS) add_subdirectory(Coprocessor/tests) + add_subdirectory(Planner/tests) add_subdirectory(tests) endif () diff --git a/dbms/src/Flash/Coprocessor/DAGContext.cpp b/dbms/src/Flash/Coprocessor/DAGContext.cpp index 1736e0b6cec..a17eaf53b64 100644 --- a/dbms/src/Flash/Coprocessor/DAGContext.cpp +++ b/dbms/src/Flash/Coprocessor/DAGContext.cpp @@ -75,6 +75,11 @@ std::unordered_map & DAGContext::getProfileStreamsMap return profile_streams_map; } +void DAGContext::updateFinalConcurrency(size_t cur_streams_size, size_t streams_upper_limit) +{ + final_concurrency = std::min(std::max(final_concurrency, cur_streams_size), streams_upper_limit); +} + void DAGContext::initExecutorIdToJoinIdMap() { // only mpp task has join executor diff --git a/dbms/src/Flash/Coprocessor/DAGContext.h b/dbms/src/Flash/Coprocessor/DAGContext.h index c20eb3a367e..8d84a7c6add 100644 --- a/dbms/src/Flash/Coprocessor/DAGContext.h +++ b/dbms/src/Flash/Coprocessor/DAGContext.h @@ -297,6 +297,8 @@ class DAGContext return sql_mode & f; } + void updateFinalConcurrency(size_t cur_streams_size, size_t streams_upper_limit); + bool isTest() const { return is_test; } void setColumnsForTest(std::unordered_map & columns_for_test_map_) { columns_for_test_map = columns_for_test_map_; } ColumnsWithTypeAndName columnsForTest(String executor_id); diff --git a/dbms/src/Flash/Coprocessor/DAGExpressionAnalyzer.cpp b/dbms/src/Flash/Coprocessor/DAGExpressionAnalyzer.cpp index aa269469cdb..9b765f30cc6 100644 --- a/dbms/src/Flash/Coprocessor/DAGExpressionAnalyzer.cpp +++ b/dbms/src/Flash/Coprocessor/DAGExpressionAnalyzer.cpp @@ -1124,30 +1124,40 @@ std::pair DAGExpressionAnalyzer::isCastRequiredForRootFinalProjec return std::make_pair(need_append_type_cast, std::move(need_append_type_cast_vec)); } -NamesWithAliases DAGExpressionAnalyzer::appendFinalProjectForRootQueryBlock( - ExpressionActionsChain & chain, +NamesWithAliases DAGExpressionAnalyzer::buildFinalProjection( + const ExpressionActionsPtr & actions, const std::vector & schema, const std::vector & output_offsets, const String & column_prefix, bool keep_session_timezone_info) { if (unlikely(output_offsets.empty())) - throw Exception("Root Query block without output_offsets", ErrorCodes::LOGICAL_ERROR); + throw Exception("DAGRequest without output_offsets", ErrorCodes::LOGICAL_ERROR); bool need_append_timezone_cast = !keep_session_timezone_info && !context.getTimezoneInfo().is_utc_timezone; auto [need_append_type_cast, need_append_type_cast_vec] = isCastRequiredForRootFinalProjection(schema, output_offsets); assert(need_append_type_cast_vec.size() == output_offsets.size()); - auto & step = initAndGetLastStep(chain); - if (need_append_timezone_cast || need_append_type_cast) { // after appendCastForRootFinalProjection, source_columns has been modified. - appendCastForRootFinalProjection(step.actions, schema, output_offsets, need_append_timezone_cast, need_append_type_cast_vec); + appendCastForRootFinalProjection(actions, schema, output_offsets, need_append_timezone_cast, need_append_type_cast_vec); } // generate project aliases from source_columns. - NamesWithAliases final_project = genRootFinalProjectAliases(column_prefix, output_offsets); + return genRootFinalProjectAliases(column_prefix, output_offsets); +} + +NamesWithAliases DAGExpressionAnalyzer::appendFinalProjectForRootQueryBlock( + ExpressionActionsChain & chain, + const std::vector & schema, + const std::vector & output_offsets, + const String & column_prefix, + bool keep_session_timezone_info) +{ + auto & step = initAndGetLastStep(chain); + + NamesWithAliases final_project = buildFinalProjection(step.actions, schema, output_offsets, column_prefix, keep_session_timezone_info); for (const auto & name : final_project) { diff --git a/dbms/src/Flash/Coprocessor/DAGExpressionAnalyzer.h b/dbms/src/Flash/Coprocessor/DAGExpressionAnalyzer.h index 3b7112af02d..7506047b34f 100644 --- a/dbms/src/Flash/Coprocessor/DAGExpressionAnalyzer.h +++ b/dbms/src/Flash/Coprocessor/DAGExpressionAnalyzer.h @@ -56,6 +56,12 @@ class DAGExpressionAnalyzer : private boost::noncopyable const Context & getContext() const { return context; } + void reset(const std::vector & source_columns_) + { + source_columns = source_columns_; + prepared_sets.clear(); + } + const std::vector & getCurrentInputColumns() const; DAGPreparedSets & getPreparedSets() { return prepared_sets; } @@ -102,6 +108,8 @@ class DAGExpressionAnalyzer : private boost::noncopyable ExpressionActionsChain & chain, const String & column_prefix) const; + NamesWithAliases genNonRootFinalProjectAliases(const String & column_prefix) const; + // Generate a project action for root DAGQueryBlock, // to keep the schema of Block and tidb-schema the same. NamesWithAliases appendFinalProjectForRootQueryBlock( @@ -111,6 +119,13 @@ class DAGExpressionAnalyzer : private boost::noncopyable const String & column_prefix, bool keep_session_timezone_info); + NamesWithAliases buildFinalProjection( + const ExpressionActionsPtr & actions, + const std::vector & schema, + const std::vector & output_offsets, + const String & column_prefix, + bool keep_session_timezone_info); + String getActions( const tipb::Expr & expr, const ExpressionActionsPtr & actions, @@ -153,17 +168,37 @@ class DAGExpressionAnalyzer : private boost::noncopyable const tipb::Window & window, const size_t window_columns_start_index); -#ifndef DBMS_PUBLIC_GTEST -private: -#endif NamesAndTypes buildOrderColumns( const ExpressionActionsPtr & actions, const ::google::protobuf::RepeatedPtrField & order_by); + String buildFilterColumn( + const ExpressionActionsPtr & actions, + const std::vector & conditions); + + void buildAggFuncs( + const tipb::Aggregation & aggregation, + const ExpressionActionsPtr & actions, + AggregateDescriptions & aggregate_descriptions, + NamesAndTypes & aggregated_columns); + + void buildAggGroupBy( + const google::protobuf::RepeatedPtrField & group_by, + const ExpressionActionsPtr & actions, + AggregateDescriptions & aggregate_descriptions, + NamesAndTypes & aggregated_columns, + Names & aggregation_keys, + std::unordered_set & agg_key_set, + bool group_by_collation_sensitive, + TiDB::TiDBCollators & collators); + void appendCastAfterAgg( const ExpressionActionsPtr & actions, const tipb::Aggregation & agg); +#ifndef DBMS_PUBLIC_GTEST +private: +#endif String buildTupleFunctionForGroupConcat( const tipb::Expr & expr, SortDescription & sort_desc, @@ -187,22 +222,6 @@ class DAGExpressionAnalyzer : private boost::noncopyable NamesAndTypes & aggregated_columns, bool empty_input_as_null); - void buildAggFuncs( - const tipb::Aggregation & aggregation, - const ExpressionActionsPtr & actions, - AggregateDescriptions & aggregate_descriptions, - NamesAndTypes & aggregated_columns); - - void buildAggGroupBy( - const google::protobuf::RepeatedPtrField & group_by, - const ExpressionActionsPtr & actions, - AggregateDescriptions & aggregate_descriptions, - NamesAndTypes & aggregated_columns, - Names & aggregation_keys, - std::unordered_set & agg_key_set, - bool group_by_collation_sensitive, - TiDB::TiDBCollators & collators); - void fillArgumentDetail( const ExpressionActionsPtr & actions, const tipb::Expr & arg, @@ -275,12 +294,6 @@ class DAGExpressionAnalyzer : private boost::noncopyable const ExpressionActionsPtr & actions, const String & column_name); - String buildFilterColumn( - const ExpressionActionsPtr & actions, - const std::vector & conditions); - - NamesWithAliases genNonRootFinalProjectAliases(const String & column_prefix) const; - NamesWithAliases genRootFinalProjectAliases( const String & column_prefix, const std::vector & output_offsets) const; diff --git a/dbms/src/Flash/Coprocessor/DAGQueryBlockInterpreter.cpp b/dbms/src/Flash/Coprocessor/DAGQueryBlockInterpreter.cpp index 86d6428c92a..81fe2c8f713 100644 --- a/dbms/src/Flash/Coprocessor/DAGQueryBlockInterpreter.cpp +++ b/dbms/src/Flash/Coprocessor/DAGQueryBlockInterpreter.cpp @@ -18,7 +18,6 @@ #include #include #include -#include #include #include #include @@ -268,7 +267,7 @@ void DAGQueryBlockInterpreter::handleJoin(const tipb::Join & join, DAGPipeline & size_t join_build_concurrency = settings.join_concurrent_build ? std::min(max_streams, build_pipeline.streams.size()) : 1; /// build side streams - executeExpression(build_pipeline, build_side_prepare_actions, "append join key and join filters for build side"); + executeExpression(build_pipeline, build_side_prepare_actions, log, "append join key and join filters for build side"); // add a HashJoinBuildBlockInputStream to build a shared hash table auto get_concurrency_build_index = JoinInterpreterHelper::concurrencyBuildIndexGenerator(join_build_concurrency); build_pipeline.transform([&](auto & stream) { @@ -284,7 +283,7 @@ void DAGQueryBlockInterpreter::handleJoin(const tipb::Join & join, DAGPipeline & join_ptr->init(right_query.source->getHeader(), join_build_concurrency); /// probe side streams - executeExpression(probe_pipeline, probe_side_prepare_actions, "append join key and join filters for probe side"); + executeExpression(probe_pipeline, probe_side_prepare_actions, log, "append join key and join filters for probe side"); NamesAndTypes source_columns; for (const auto & p : probe_pipeline.firstStream()->getHeader()) source_columns.emplace_back(p.name, p.type); @@ -349,7 +348,7 @@ void DAGQueryBlockInterpreter::executeWindow( DAGPipeline & pipeline, WindowDescription & window_description) { - executeExpression(pipeline, window_description.before_window, "before window"); + executeExpression(pipeline, window_description.before_window, log, "before window"); /// If there are several streams, we merge them into one executeUnion(pipeline, max_streams, log, false, "merge into one for window input"); @@ -365,10 +364,7 @@ void DAGQueryBlockInterpreter::executeAggregation( AggregateDescriptions & aggregate_descriptions, bool is_final_agg) { - pipeline.transform([&](auto & stream) { - stream = std::make_shared(stream, expression_actions_ptr, log->identifier()); - stream->setExtraInfo("before aggregation"); - }); + executeExpression(pipeline, expression_actions_ptr, log, "before aggregation"); Block before_agg_header = pipeline.firstStream()->getHeader(); @@ -421,56 +417,15 @@ void DAGQueryBlockInterpreter::executeAggregation( } } -void DAGQueryBlockInterpreter::executeExpression(DAGPipeline & pipeline, const ExpressionActionsPtr & expressionActionsPtr, const String & extra_info) -{ - if (!expressionActionsPtr->getActions().empty()) - { - pipeline.transform([&](auto & stream) { - stream = std::make_shared(stream, expressionActionsPtr, log->identifier()); - stream->setExtraInfo(extra_info); - }); - } -} - void DAGQueryBlockInterpreter::executeWindowOrder(DAGPipeline & pipeline, SortDescription sort_desc) { - orderStreams(pipeline, sort_desc, 0); + orderStreams(pipeline, max_streams, sort_desc, 0, context, log); } void DAGQueryBlockInterpreter::executeOrder(DAGPipeline & pipeline, const NamesAndTypes & order_columns) { Int64 limit = query_block.limit_or_topn->topn().limit(); - orderStreams(pipeline, getSortDescription(order_columns, query_block.limit_or_topn->topn().order_by()), limit); -} - -void DAGQueryBlockInterpreter::orderStreams(DAGPipeline & pipeline, SortDescription order_descr, Int64 limit) -{ - const Settings & settings = context.getSettingsRef(); - - pipeline.transform([&](auto & stream) { - auto sorting_stream = std::make_shared(stream, order_descr, log->identifier(), limit); - - /// Limits on sorting - IProfilingBlockInputStream::LocalLimits limits; - limits.mode = IProfilingBlockInputStream::LIMITS_TOTAL; - limits.size_limits = SizeLimits(settings.max_rows_to_sort, settings.max_bytes_to_sort, settings.sort_overflow_mode); - sorting_stream->setLimits(limits); - - stream = sorting_stream; - }); - - /// If there are several streams, we merge them into one - executeUnion(pipeline, max_streams, log, false, "for partial order"); - - /// Merge the sorted blocks. - pipeline.firstStream() = std::make_shared( - pipeline.firstStream(), - order_descr, - settings.max_block_size, - limit, - settings.max_bytes_before_external_sort, - context.getTemporaryPath(), - log->identifier()); + orderStreams(pipeline, max_streams, getSortDescription(order_columns, query_block.limit_or_topn->topn().order_by()), limit, context, log); } void DAGQueryBlockInterpreter::recordProfileStreams(DAGPipeline & pipeline, const String & key) @@ -548,10 +503,7 @@ void DAGQueryBlockInterpreter::handleProjection(DAGPipeline & pipeline, const ti output_columns.emplace_back(alias, col.type); project_cols.emplace_back(col.name, alias); } - pipeline.transform([&](auto & stream) { - stream = std::make_shared(stream, chain.getLastActions(), log->identifier()); - stream->setExtraInfo("before projection"); - }); + executeExpression(pipeline, chain.getLastActions(), log, "before projection"); executeProject(pipeline, project_cols, "projection"); analyzer = std::make_unique(std::move(output_columns), context); } @@ -566,7 +518,7 @@ void DAGQueryBlockInterpreter::handleWindow(DAGPipeline & pipeline, const tipb:: DAGExpressionAnalyzer dag_analyzer(input_columns, context); WindowDescription window_description = dag_analyzer.buildWindowDescription(window); executeWindow(pipeline, window_description); - executeExpression(pipeline, window_description.after_window, "cast after window"); + executeExpression(pipeline, window_description.after_window, log, "cast after window"); analyzer = std::make_unique(window_description.after_window_columns, context); } @@ -663,7 +615,7 @@ void DAGQueryBlockInterpreter::executeImpl(DAGPipeline & pipeline) "execution stream size for query block(before aggregation) {} is {}", query_block.qb_column_prefix, pipeline.streams.size()); - dagContext().final_concurrency = std::min(std::max(dagContext().final_concurrency, pipeline.streams.size()), max_streams); + dagContext().updateFinalConcurrency(pipeline.streams.size(), max_streams); if (res.before_aggregation) { @@ -678,7 +630,7 @@ void DAGQueryBlockInterpreter::executeImpl(DAGPipeline & pipeline) } if (res.before_order_and_select) { - executeExpression(pipeline, res.before_order_and_select, "before order and select"); + executeExpression(pipeline, res.before_order_and_select, log, "before order and select"); } if (!res.order_columns.empty()) @@ -714,10 +666,7 @@ void DAGQueryBlockInterpreter::executeProject(DAGPipeline & pipeline, NamesWithA if (project_cols.empty()) return; ExpressionActionsPtr project = generateProjectExpressionActions(pipeline.firstStream(), context, project_cols); - pipeline.transform([&](auto & stream) { - stream = std::make_shared(stream, project, log->identifier()); - stream->setExtraInfo(extra_info); - }); + executeExpression(pipeline, project, log, extra_info); } void DAGQueryBlockInterpreter::executeLimit(DAGPipeline & pipeline) diff --git a/dbms/src/Flash/Coprocessor/DAGQueryBlockInterpreter.h b/dbms/src/Flash/Coprocessor/DAGQueryBlockInterpreter.h index e68c4f91cee..cabdd4dc9be 100644 --- a/dbms/src/Flash/Coprocessor/DAGQueryBlockInterpreter.h +++ b/dbms/src/Flash/Coprocessor/DAGQueryBlockInterpreter.h @@ -67,9 +67,7 @@ class DAGQueryBlockInterpreter void handleWindow(DAGPipeline & pipeline, const tipb::Window & window); void handleWindowOrder(DAGPipeline & pipeline, const tipb::Sort & window_sort); void executeWhere(DAGPipeline & pipeline, const ExpressionActionsPtr & expressionActionsPtr, String & filter_column, const String & extra_info = ""); - void executeExpression(DAGPipeline & pipeline, const ExpressionActionsPtr & expressionActionsPtr, const String & extra_info = ""); void executeWindowOrder(DAGPipeline & pipeline, SortDescription sort_desc); - void orderStreams(DAGPipeline & pipeline, SortDescription order_descr, Int64 limit); void executeOrder(DAGPipeline & pipeline, const NamesAndTypes & order_columns); void executeLimit(DAGPipeline & pipeline); void executeWindow( diff --git a/dbms/src/Flash/Coprocessor/GenSchemaAndColumn.cpp b/dbms/src/Flash/Coprocessor/GenSchemaAndColumn.cpp index be3475f714f..efb8a08f1d8 100644 --- a/dbms/src/Flash/Coprocessor/GenSchemaAndColumn.cpp +++ b/dbms/src/Flash/Coprocessor/GenSchemaAndColumn.cpp @@ -54,4 +54,15 @@ ColumnsWithTypeAndName getColumnWithTypeAndName(const NamesAndTypes & names_and_ } return column_with_type_and_names; } -} // namespace DB \ No newline at end of file + +NamesAndTypes toNamesAndTypes(const DAGSchema & dag_schema) +{ + NamesAndTypes names_and_types; + for (const auto & col : dag_schema) + { + auto tp = getDataTypeByColumnInfoForComputingLayer(col.second); + names_and_types.emplace_back(col.first, tp); + } + return names_and_types; +} +} // namespace DB diff --git a/dbms/src/Flash/Coprocessor/GenSchemaAndColumn.h b/dbms/src/Flash/Coprocessor/GenSchemaAndColumn.h index 617f69de925..96f202d800e 100644 --- a/dbms/src/Flash/Coprocessor/GenSchemaAndColumn.h +++ b/dbms/src/Flash/Coprocessor/GenSchemaAndColumn.h @@ -16,6 +16,7 @@ #include #include +#include #include #include @@ -23,4 +24,5 @@ namespace DB { NamesAndTypes genNamesAndTypes(const TiDBTableScan & table_scan); ColumnsWithTypeAndName getColumnWithTypeAndName(const NamesAndTypes & names_and_types); -} // namespace DB \ No newline at end of file +NamesAndTypes toNamesAndTypes(const DAGSchema & dag_schema); +} // namespace DB diff --git a/dbms/src/Flash/Coprocessor/InterpreterDAG.cpp b/dbms/src/Flash/Coprocessor/InterpreterDAG.cpp index 570271ec93b..5ca8d26758f 100644 --- a/dbms/src/Flash/Coprocessor/InterpreterDAG.cpp +++ b/dbms/src/Flash/Coprocessor/InterpreterDAG.cpp @@ -68,6 +68,7 @@ BlockInputStreams InterpreterDAG::executeQueryBlock(DAGQueryBlock & query_block) } if (context.getSettingsRef().enable_planner && Planner::isSupported(query_block)) { + LOG_FMT_DEBUG(dagContext().log, "use planer for query block with source {}", query_block.source_name); Planner planner( context, input_streams_vec, diff --git a/dbms/src/Flash/Coprocessor/InterpreterUtils.cpp b/dbms/src/Flash/Coprocessor/InterpreterUtils.cpp index c9810454218..9de5b83626f 100644 --- a/dbms/src/Flash/Coprocessor/InterpreterUtils.cpp +++ b/dbms/src/Flash/Coprocessor/InterpreterUtils.cpp @@ -12,6 +12,9 @@ // See the License for the specific language governing permissions and // limitations under the License. +#include +#include +#include #include #include #include @@ -102,4 +105,55 @@ ExpressionActionsPtr generateProjectExpressionActions( project->add(ExpressionAction::project(project_cols)); return project; } + +void executeExpression( + DAGPipeline & pipeline, + const ExpressionActionsPtr & expr_actions, + const LoggerPtr & log, + const String & extra_info) +{ + if (expr_actions && !expr_actions->getActions().empty()) + { + pipeline.transform([&](auto & stream) { + stream = std::make_shared(stream, expr_actions, log->identifier()); + stream->setExtraInfo(extra_info); + }); + } +} + +void orderStreams( + DAGPipeline & pipeline, + size_t max_streams, + SortDescription order_descr, + Int64 limit, + const Context & context, + const LoggerPtr & log) +{ + const Settings & settings = context.getSettingsRef(); + + pipeline.transform([&](auto & stream) { + auto sorting_stream = std::make_shared(stream, order_descr, log->identifier(), limit); + + /// Limits on sorting + IProfilingBlockInputStream::LocalLimits limits; + limits.mode = IProfilingBlockInputStream::LIMITS_TOTAL; + limits.size_limits = SizeLimits(settings.max_rows_to_sort, settings.max_bytes_to_sort, settings.sort_overflow_mode); + sorting_stream->setLimits(limits); + + stream = sorting_stream; + }); + + /// If there are several streams, we merge them into one + executeUnion(pipeline, max_streams, log, false, "for partial order"); + + /// Merge the sorted blocks. + pipeline.firstStream() = std::make_shared( + pipeline.firstStream(), + order_descr, + settings.max_block_size, + limit, + settings.max_bytes_before_external_sort, + context.getTemporaryPath(), + log->identifier()); +} } // namespace DB diff --git a/dbms/src/Flash/Coprocessor/InterpreterUtils.h b/dbms/src/Flash/Coprocessor/InterpreterUtils.h index 5c4d4721d5e..36280f3b903 100644 --- a/dbms/src/Flash/Coprocessor/InterpreterUtils.h +++ b/dbms/src/Flash/Coprocessor/InterpreterUtils.h @@ -15,6 +15,7 @@ #pragma once #include +#include #include #include @@ -44,4 +45,18 @@ ExpressionActionsPtr generateProjectExpressionActions( const BlockInputStreamPtr & stream, const Context & context, const NamesWithAliases & project_cols); + +void executeExpression( + DAGPipeline & pipeline, + const ExpressionActionsPtr & expr_actions, + const LoggerPtr & log, + const String & extra_info = ""); + +void orderStreams( + DAGPipeline & pipeline, + size_t max_streams, + SortDescription order_descr, + Int64 limit, + const Context & context, + const LoggerPtr & log); } // namespace DB diff --git a/dbms/src/Flash/Planner/ExecutorIdGenerator.h b/dbms/src/Flash/Planner/ExecutorIdGenerator.h new file mode 100644 index 00000000000..4c48b5d2aad --- /dev/null +++ b/dbms/src/Flash/Planner/ExecutorIdGenerator.h @@ -0,0 +1,75 @@ +// Copyright 2022 PingCAP, Ltd. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#pragma once + +#include +#include +#include +#include +#include + +#include +#include + +namespace DB +{ +class ExecutorIdGenerator +{ +public: + String generate(const tipb::Executor & executor) + { + String executor_id = executor.has_executor_id() ? executor.executor_id() : doGenerate(executor); + assert(!executor_id.empty()); + if (unlikely(ids.find(executor_id) != ids.end())) + throw TiFlashException( + fmt::format("executor id ({}) duplicate", executor_id), + Errors::Planner::Internal); + ids.insert(executor_id); + return executor_id; + } + +private: + String doGenerate(const tipb::Executor & executor) + { + assert(!executor.has_executor_id()); + switch (executor.tp()) + { + case tipb::ExecType::TypeSelection: + return fmt::format("{}_selection", ++current_id); + case tipb::ExecType::TypeProjection: + return fmt::format("{}_projection", ++current_id); + case tipb::ExecType::TypeStreamAgg: + case tipb::ExecType::TypeAggregation: + return fmt::format("{}_aggregation", ++current_id); + case tipb::ExecType::TypeTopN: + return fmt::format("{}_top_n", ++current_id); + case tipb::ExecType::TypeLimit: + return fmt::format("{}_limit", ++current_id); + case tipb::ExecType::TypeExchangeSender: + return fmt::format("{}_exchange_sender", ++current_id); + case tipb::ExecType::TypeExchangeReceiver: + return fmt::format("{}_exchange_receiver", ++current_id); + default: + throw TiFlashException( + fmt::format("Unsupported executor in DAG request: {}", executor.DebugString()), + Errors::Planner::Unimplemented); + } + } + + UInt32 current_id = 0; + + std::unordered_set ids; +}; +} // namespace DB diff --git a/dbms/src/Flash/Planner/FinalizeHelper.cpp b/dbms/src/Flash/Planner/FinalizeHelper.cpp new file mode 100644 index 00000000000..fad85908a8d --- /dev/null +++ b/dbms/src/Flash/Planner/FinalizeHelper.cpp @@ -0,0 +1,146 @@ +// Copyright 2022 PingCAP, Ltd. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#include +#include +#include +#include +#include +#include +#include +#include + +#include + +namespace DB::FinalizeHelper +{ +namespace +{ +String namesToString(const Names & names) +{ + return fmt::format("{{{}}}", fmt::join(names, ",")); +} + +String schemaToString(const NamesAndTypes & schema) +{ + FmtBuffer bf; + bf.append("{"); + bf.joinStr( + schema.cbegin(), + schema.cend(), + [](const auto & col, FmtBuffer & fb) { fb.fmtAppend("<{}, {}>", col.name, col.type->getName()); }, + ", "); + bf.append("}"); + return bf.toString(); +} + +String blockMetaToString(const Block & block) +{ + FmtBuffer bf; + bf.append("{"); + bf.joinStr( + block.cbegin(), + block.cend(), + [](const ColumnWithTypeAndName & col, FmtBuffer & fb) { fb.fmtAppend("<{}, {}>", col.name, col.type->getName()); }, + ", "); + bf.append("}"); + return bf.toString(); +} +} // namespace + +void prependProjectInputIfNeed(ExpressionActionsPtr & actions, size_t columns_from_previous) +{ + if (!actions->getRequiredColumnsWithTypes().empty() + && columns_from_previous > actions->getRequiredColumnsWithTypes().size()) + { + actions->prependProjectInput(); + } +} + +void checkSchemaContainsParentRequire(const NamesAndTypes & schema, const Names & parent_require) +{ + NameSet schema_set; + for (const auto & column : schema) + schema_set.insert(column.name); + for (const auto & parent_require_column : parent_require) + { + if (unlikely(schema_set.find(parent_require_column) == schema_set.end())) + throw TiFlashException( + fmt::format("schema {} don't contain parent require column: {}", schemaToString(schema), parent_require_column), + Errors::Planner::Internal); + } +} + +void checkParentRequireContainsSchema(const Names & parent_require, const NamesAndTypes & schema) +{ + NameSet parent_require_set; + for (const auto & parent_require_column : parent_require) + parent_require_set.insert(parent_require_column); + for (const auto & schema_column : schema) + { + if (unlikely(parent_require_set.find(schema_column.name) == parent_require_set.end())) + throw TiFlashException( + fmt::format("parent require {} don't contain schema column: {}", namesToString(parent_require), schema_column.name), + Errors::Planner::Internal); + } +} + +void checkSampleBlockContainsSchema(const Block & sample_block, const NamesAndTypes & schema) +{ + for (const auto & schema_column : schema) + { + if (unlikely(!sample_block.has(schema_column.name))) + throw TiFlashException( + fmt::format("sample block {} don't contain schema column: {}", blockMetaToString(sample_block), schema_column.name), + Errors::Planner::Internal); + + const auto & type_in_sample_block = sample_block.getByName(schema_column.name).type->getName(); + const auto & type_in_schema = schema_column.type->getName(); + if (unlikely(type_in_sample_block != type_in_schema)) + throw TiFlashException( + fmt::format( + "the type of column `{}` in sample block `{}` is different from the one in schema `{}`", + schema_column.name, + type_in_sample_block, + type_in_schema), + Errors::Planner::Internal); + } +} + +void checkSchemaContainsSampleBlock(const NamesAndTypes & schema, const Block & sample_block) +{ + std::unordered_map schema_map; + for (const auto & column : schema) + schema_map[column.name] = column.type; + for (const auto & sample_block_column : sample_block) + { + auto it = schema_map.find(sample_block_column.name); + if (unlikely(it == schema_map.end())) + throw TiFlashException( + fmt::format("schema {} don't contain sample block column: {}", schemaToString(schema), sample_block_column.name), + Errors::Planner::Internal); + + const auto & type_in_schema = it->second->getName(); + const auto & type_in_sample_block = sample_block_column.type->getName(); + if (unlikely(type_in_sample_block != type_in_schema)) + throw TiFlashException( + fmt::format( + "the type of column `{}` in schema `{}` is different from the one in sample block `{}`", + sample_block_column.name, + type_in_schema, + type_in_sample_block), + Errors::Planner::Internal); + } +} +} // namespace DB::FinalizeHelper diff --git a/dbms/src/Flash/Planner/FinalizeHelper.h b/dbms/src/Flash/Planner/FinalizeHelper.h new file mode 100644 index 00000000000..7e9bd3681ce --- /dev/null +++ b/dbms/src/Flash/Planner/FinalizeHelper.h @@ -0,0 +1,34 @@ +// Copyright 2022 PingCAP, Ltd. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#pragma once + +#include +#include +#include +#include +#include + +namespace DB::FinalizeHelper +{ +void prependProjectInputIfNeed(ExpressionActionsPtr & actions, size_t columns_from_previous); + +void checkSchemaContainsParentRequire(const NamesAndTypes & schema, const Names & parent_require); + +void checkParentRequireContainsSchema(const Names & parent_require, const NamesAndTypes & schema); + +void checkSampleBlockContainsSchema(const Block & sample_block, const NamesAndTypes & schema); + +void checkSchemaContainsSampleBlock(const NamesAndTypes & schema, const Block & sample_block); +} // namespace DB::FinalizeHelper diff --git a/dbms/src/Flash/Planner/PhysicalPlan.cpp b/dbms/src/Flash/Planner/PhysicalPlan.cpp index 73fcb839ae1..db97423772b 100644 --- a/dbms/src/Flash/Planner/PhysicalPlan.cpp +++ b/dbms/src/Flash/Planner/PhysicalPlan.cpp @@ -12,62 +12,149 @@ // See the License for the specific language governing permissions and // limitations under the License. -#include #include -#include +#include #include -#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include #include namespace DB { -PhysicalPlan::PhysicalPlan( - const String & executor_id_, - const PlanType & type_, - const NamesAndTypes & schema_, - const String & req_id) - : executor_id(executor_id_) - , type(type_) - , schema(schema_) - , log(Logger::get(type_.toString(), req_id)) -{} +void PhysicalPlan::build(const tipb::DAGRequest * dag_request) +{ + assert(dag_request); + ExecutorIdGenerator id_generator; + traverseExecutorsReverse( + dag_request, + [&](const tipb::Executor & executor) { + build(id_generator.generate(executor), &executor); + return true; + }); +} + +void PhysicalPlan::build(const String & executor_id, const tipb::Executor * executor) +{ + assert(executor); + switch (executor->tp()) + { + case tipb::ExecType::TypeLimit: + pushBack(PhysicalLimit::build(executor_id, log, executor->limit(), popBack())); + break; + case tipb::ExecType::TypeTopN: + pushBack(PhysicalTopN::build(context, executor_id, log, executor->topn(), popBack())); + break; + case tipb::ExecType::TypeSelection: + pushBack(PhysicalFilter::build(context, executor_id, log, executor->selection(), popBack())); + break; + case tipb::ExecType::TypeAggregation: + case tipb::ExecType::TypeStreamAgg: + pushBack(PhysicalAggregation::build(context, executor_id, log, executor->aggregation(), popBack())); + break; + case tipb::ExecType::TypeExchangeSender: + { + if (unlikely(dagContext().isTest())) + pushBack(PhysicalMockExchangeSender::build(executor_id, log, popBack())); + else + pushBack(PhysicalExchangeSender::build(executor_id, log, executor->exchange_sender(), popBack())); + break; + } + case tipb::ExecType::TypeExchangeReceiver: + { + if (unlikely(dagContext().isTest())) + pushBack(PhysicalMockExchangeReceiver::build(context, executor_id, log, executor->exchange_receiver())); + else + pushBack(PhysicalExchangeReceiver::build(context, executor_id, log)); + break; + } + case tipb::ExecType::TypeProjection: + pushBack(PhysicalProjection::build(context, executor_id, log, executor->projection(), popBack())); + break; + default: + throw TiFlashException(fmt::format("{} executor is not supported", executor->tp()), Errors::Planner::Unimplemented); + } +} -String PhysicalPlan::toString() +void PhysicalPlan::buildFinalProjection(const String & column_prefix, bool is_root) { - auto schema_to_string = [&]() { - FmtBuffer buffer; - buffer.joinStr( - schema.cbegin(), - schema.cend(), - [](const auto & item, FmtBuffer & buf) { buf.fmtAppend("<{}, {}>", item.name, item.type->getName()); }, - ", "); - return buffer.toString(); - }; - return fmt::format( - "type: {}, executor_id: {}, is_record_profile_streams: {}, schema: {}", - type.toString(), - executor_id, - is_record_profile_streams, - schema_to_string()); + const auto & final_projection = is_root + ? PhysicalProjection::buildRootFinal( + context, + log, + dagContext().output_field_types, + dagContext().output_offsets, + column_prefix, + dagContext().keep_session_timezone_info, + popBack()) + : PhysicalProjection::buildNonRootFinal( + context, + log, + column_prefix, + popBack()); + pushBack(final_projection); } -void PhysicalPlan::finalize() +DAGContext & PhysicalPlan::dagContext() const { - finalize(PhysicalPlanHelper::schemaToNames(schema)); + return *context.getDAGContext(); } -void PhysicalPlan::recordProfileStreams(DAGPipeline & pipeline, const Context & context) +void PhysicalPlan::pushBack(const PhysicalPlanNodePtr & plan_node) { - if (is_record_profile_streams) - { - auto & profile_streams = context.getDAGContext()->getProfileStreamsMap()[executor_id]; - pipeline.transform([&profile_streams](auto & stream) { profile_streams.push_back(stream); }); - } + assert(plan_node); + cur_plan_nodes.push_back(plan_node); +} + +PhysicalPlanNodePtr PhysicalPlan::popBack() +{ + if (unlikely(cur_plan_nodes.empty())) + throw TiFlashException("cur_plan_nodes is empty, cannot popBack", Errors::Planner::Internal); + PhysicalPlanNodePtr back = cur_plan_nodes.back(); + assert(back); + cur_plan_nodes.pop_back(); + return back; +} + +void PhysicalPlan::buildSource(const BlockInputStreams & source_streams) +{ + pushBack(PhysicalSource::build(source_streams, log)); +} + +void PhysicalPlan::outputAndOptimize() +{ + RUNTIME_ASSERT(!root_node, log, "root_node shoud be nullptr before `outputAndOptimize`"); + RUNTIME_ASSERT(cur_plan_nodes.size() == 1, log, "There can only be one plan node output, but here are {}", cur_plan_nodes.size()); + root_node = popBack(); + + LOG_FMT_DEBUG( + log, + "build unoptimized physical plan: \n{}", + toString()); + + root_node = optimize(context, root_node); + RUNTIME_ASSERT(root_node, log, "root_node shoudn't be nullptr after `outputAndOptimize`"); +} + +String PhysicalPlan::toString() const +{ + assert(root_node); + return PhysicalPlanVisitor::visitToString(root_node); } void PhysicalPlan::transform(DAGPipeline & pipeline, Context & context, size_t max_streams) { - transformImpl(pipeline, context, max_streams); - recordProfileStreams(pipeline, context); + assert(root_node); + root_node->transform(pipeline, context, max_streams); } } // namespace DB diff --git a/dbms/src/Flash/Planner/PhysicalPlan.h b/dbms/src/Flash/Planner/PhysicalPlan.h index 8a69545f10b..0dfdda1b941 100644 --- a/dbms/src/Flash/Planner/PhysicalPlan.h +++ b/dbms/src/Flash/Planner/PhysicalPlan.h @@ -14,69 +14,53 @@ #pragma once +#include #include -#include -#include -#include -#include - -#include +#include +#include +#include +#include +#include namespace DB { -struct DAGPipeline; -class Context; -class DAGContext; - -class PhysicalPlan; -using PhysicalPlanPtr = std::shared_ptr; - class PhysicalPlan { public: - PhysicalPlan( - const String & executor_id_, - const PlanType & type_, - const NamesAndTypes & schema_, - const String & req_id); - - virtual ~PhysicalPlan() = default; - - virtual PhysicalPlanPtr children(size_t /*i*/) const = 0; - - virtual void setChild(size_t /*i*/, const PhysicalPlanPtr & /*new_child*/) = 0; + explicit PhysicalPlan(Context & context_, const String & req_id) + : context(context_) + , log(Logger::get("PhysicalPlan", req_id)) + {} - const PlanType & tp() const { return type; } + void build(const tipb::DAGRequest * dag_request); - const String & execId() const { return executor_id; } + void build(const String & executor_id, const tipb::Executor * executor); - const NamesAndTypes & getSchema() const { return schema; } + void buildSource(const BlockInputStreams & source_streams); - virtual void appendChild(const PhysicalPlanPtr & /*new_child*/) = 0; + void buildFinalProjection(const String & column_prefix, bool is_root); - virtual size_t childrenSize() const = 0; + // after outputAndOptimize, the physical plan node tree is done. + void outputAndOptimize(); - virtual void transform(DAGPipeline & pipeline, Context & context, size_t max_streams); + String toString() const; - virtual void finalize(const Names & parent_require) = 0; - void finalize(); + void transform(DAGPipeline & pipeline, Context & context, size_t max_streams); - /// Obtain a sample block that contains the names and types of result columns. - virtual const Block & getSampleBlock() const = 0; +private: + PhysicalPlanNodePtr popBack(); - void disableRecordProfileStreams() { is_record_profile_streams = false; } + void pushBack(const PhysicalPlanNodePtr & plan); - String toString(); + DAGContext & dagContext() const; -protected: - virtual void transformImpl(DAGPipeline & /*pipeline*/, Context & /*context*/, size_t /*max_streams*/){}; +private: + std::vector cur_plan_nodes{}; - void recordProfileStreams(DAGPipeline & pipeline, const Context & context); + // hold the root node of physical plan node tree after `outputAndOptimize`. + PhysicalPlanNodePtr root_node; - String executor_id; - PlanType type; - NamesAndTypes schema; - bool is_record_profile_streams = true; + Context & context; LoggerPtr log; }; diff --git a/dbms/src/Flash/Planner/PhysicalPlanBuilder.h b/dbms/src/Flash/Planner/PhysicalPlanBuilder.h deleted file mode 100644 index bc97d84f5b3..00000000000 --- a/dbms/src/Flash/Planner/PhysicalPlanBuilder.h +++ /dev/null @@ -1,47 +0,0 @@ -// Copyright 2022 PingCAP, Ltd. -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -#pragma once - -#include -#include -#include -#include - -namespace DB -{ -class PhysicalPlanBuilder -{ -public: - explicit PhysicalPlanBuilder(Context & context_, const String & req_id) - : context(context_) - , log(Logger::get("PhysicalPlanBuilder", req_id)) - {} - - void buildSource(const Block & sample_block); - - PhysicalPlanPtr getResult() const - { - RUNTIME_ASSERT(cur_plans.size() == 1, log, "There can only be one plan output, but here are {}", cur_plans.size()); - return cur_plans.back(); - } - -private: - std::vector cur_plans; - - [[maybe_unused]] Context & context; - - LoggerPtr log; -}; -} // namespace DB diff --git a/dbms/src/Flash/Planner/PhysicalPlanHelper.cpp b/dbms/src/Flash/Planner/PhysicalPlanHelper.cpp index 456ea70101e..cf289fe695a 100644 --- a/dbms/src/Flash/Planner/PhysicalPlanHelper.cpp +++ b/dbms/src/Flash/Planner/PhysicalPlanHelper.cpp @@ -24,4 +24,33 @@ Names schemaToNames(const NamesAndTypes & schema) names.push_back(column.name); return names; } + +ExpressionActionsPtr newActions(const Block & input_block, const Context & context) +{ + const ColumnsWithTypeAndName & actions_input_columns = input_block.getColumnsWithTypeAndName(); + return std::make_shared(actions_input_columns, context.getSettingsRef()); +} + +ExpressionActionsPtr newActions(const NamesAndTypes & input_columns, const Context & context) +{ + NamesAndTypesList actions_input_column; + std::unordered_set column_name_set; + for (const auto & col : input_columns) + { + if (column_name_set.find(col.name) == column_name_set.end()) + { + actions_input_column.emplace_back(col.name, col.type); + column_name_set.emplace(col.name); + } + } + return std::make_shared(actions_input_column, context.getSettingsRef()); +} + +Block constructBlockFromSchema(const NamesAndTypes & schema) +{ + ColumnsWithTypeAndName columns; + for (const auto & column : schema) + columns.emplace_back(column.type, column.name); + return Block(columns); +} } // namespace DB::PhysicalPlanHelper diff --git a/dbms/src/Flash/Planner/PhysicalPlanHelper.h b/dbms/src/Flash/Planner/PhysicalPlanHelper.h index 8a39921ec51..2dfa3b47563 100644 --- a/dbms/src/Flash/Planner/PhysicalPlanHelper.h +++ b/dbms/src/Flash/Planner/PhysicalPlanHelper.h @@ -14,9 +14,18 @@ #pragma once +#include #include +#include +#include namespace DB::PhysicalPlanHelper { Names schemaToNames(const NamesAndTypes & schema); + +ExpressionActionsPtr newActions(const Block & input_block, const Context & context); + +ExpressionActionsPtr newActions(const NamesAndTypes & input_columns, const Context & context); + +Block constructBlockFromSchema(const NamesAndTypes & schema); } // namespace DB::PhysicalPlanHelper diff --git a/dbms/src/Flash/Planner/PhysicalPlanNode.cpp b/dbms/src/Flash/Planner/PhysicalPlanNode.cpp new file mode 100644 index 00000000000..293dae3bb28 --- /dev/null +++ b/dbms/src/Flash/Planner/PhysicalPlanNode.cpp @@ -0,0 +1,75 @@ +// Copyright 2022 PingCAP, Ltd. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#include +#include +#include +#include +#include +#include +#include + +namespace DB +{ +PhysicalPlanNode::PhysicalPlanNode( + const String & executor_id_, + const PlanType & type_, + const NamesAndTypes & schema_, + const String & req_id) + : executor_id(executor_id_) + , type(type_) + , schema(schema_) + , log(Logger::get(type_.toString(), req_id)) +{} + +String PhysicalPlanNode::toString() +{ + auto schema_to_string = [&]() { + FmtBuffer buffer; + buffer.joinStr( + schema.cbegin(), + schema.cend(), + [](const auto & item, FmtBuffer & buf) { buf.fmtAppend("<{}, {}>", item.name, item.type->getName()); }, + ", "); + return buffer.toString(); + }; + return fmt::format( + "<{}, {}> | is_record_profile_streams: {}, schema: {}", + type.toString(), + executor_id, + is_record_profile_streams, + schema_to_string()); +} + +void PhysicalPlanNode::finalize() +{ + finalize(PhysicalPlanHelper::schemaToNames(schema)); +} + +void PhysicalPlanNode::recordProfileStreams(DAGPipeline & pipeline, const Context & context) +{ + auto & profile_streams = context.getDAGContext()->getProfileStreamsMap()[executor_id]; + pipeline.transform([&profile_streams](auto & stream) { profile_streams.push_back(stream); }); +} + +void PhysicalPlanNode::transform(DAGPipeline & pipeline, Context & context, size_t max_streams) +{ + transformImpl(pipeline, context, max_streams); + if (is_record_profile_streams) + recordProfileStreams(pipeline, context); + // todo modify logic after supporting window function. + context.getDAGContext()->updateFinalConcurrency(pipeline.streams.size(), max_streams); + restoreConcurrency(pipeline, context.getDAGContext()->final_concurrency, log); +} +} // namespace DB diff --git a/dbms/src/Flash/Planner/PhysicalPlanNode.h b/dbms/src/Flash/Planner/PhysicalPlanNode.h new file mode 100644 index 00000000000..47d41dd68ff --- /dev/null +++ b/dbms/src/Flash/Planner/PhysicalPlanNode.h @@ -0,0 +1,81 @@ +// Copyright 2022 PingCAP, Ltd. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#pragma once + +#include +#include +#include +#include +#include + +#include + +namespace DB +{ +struct DAGPipeline; +class Context; +class DAGContext; + +class PhysicalPlanNode; +using PhysicalPlanNodePtr = std::shared_ptr; + +class PhysicalPlanNode +{ +public: + PhysicalPlanNode( + const String & executor_id_, + const PlanType & type_, + const NamesAndTypes & schema_, + const String & req_id); + + virtual ~PhysicalPlanNode() = default; + + virtual PhysicalPlanNodePtr children(size_t /*i*/) const = 0; + + virtual void setChild(size_t /*i*/, const PhysicalPlanNodePtr & /*new_child*/) = 0; + + const PlanType & tp() const { return type; } + + const String & execId() const { return executor_id; } + + const NamesAndTypes & getSchema() const { return schema; } + + virtual size_t childrenSize() const = 0; + + virtual void transform(DAGPipeline & pipeline, Context & context, size_t max_streams); + + virtual void finalize(const Names & parent_require) = 0; + void finalize(); + + /// Obtain a sample block that contains the names and types of result columns. + virtual const Block & getSampleBlock() const = 0; + + void disableRecordProfileStreams() { is_record_profile_streams = false; } + + String toString(); + +protected: + virtual void transformImpl(DAGPipeline & /*pipeline*/, Context & /*context*/, size_t /*max_streams*/){}; + + void recordProfileStreams(DAGPipeline & pipeline, const Context & context); + + String executor_id; + PlanType type; + NamesAndTypes schema; + bool is_record_profile_streams = true; + + LoggerPtr log; +}; +} // namespace DB diff --git a/dbms/src/Flash/Planner/PhysicalPlanVisitor.cpp b/dbms/src/Flash/Planner/PhysicalPlanVisitor.cpp new file mode 100644 index 00000000000..1b9e6bb89f4 --- /dev/null +++ b/dbms/src/Flash/Planner/PhysicalPlanVisitor.cpp @@ -0,0 +1,54 @@ +// Copyright 2022 PingCAP, Ltd. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#include +#include + +namespace DB::PhysicalPlanVisitor +{ +namespace +{ +void addPrefix(FmtBuffer & buffer, size_t level) +{ + buffer.append(String(level, ' ')); +} + +void doVisitToString(FmtBuffer & buffer, const PhysicalPlanNodePtr & physical_plan, size_t level) +{ + visit(physical_plan, [&buffer, &level](const PhysicalPlanNodePtr & plan) { + assert(plan); + addPrefix(buffer, level); + buffer.fmtAppend("{}\n", plan->toString()); + ++level; + if (plan->childrenSize() <= 1) + { + return true; + } + else + { + for (size_t i = 0; i < plan->childrenSize(); ++i) + doVisitToString(buffer, plan->children(i), level); + return false; + } + }); +} +} // namespace + +String visitToString(const PhysicalPlanNodePtr & plan) +{ + FmtBuffer buffer; + doVisitToString(buffer, plan, 0); + return buffer.toString(); +} +} // namespace DB::PhysicalPlanVisitor diff --git a/dbms/src/Flash/Planner/PhysicalPlanBuilder.cpp b/dbms/src/Flash/Planner/PhysicalPlanVisitor.h similarity index 52% rename from dbms/src/Flash/Planner/PhysicalPlanBuilder.cpp rename to dbms/src/Flash/Planner/PhysicalPlanVisitor.h index b4037746ae5..c2d6c701f6c 100644 --- a/dbms/src/Flash/Planner/PhysicalPlanBuilder.cpp +++ b/dbms/src/Flash/Planner/PhysicalPlanVisitor.h @@ -12,13 +12,23 @@ // See the License for the specific language governing permissions and // limitations under the License. -#include -#include +#include -namespace DB +namespace DB::PhysicalPlanVisitor { -void PhysicalPlanBuilder::buildSource(const Block & sample_block) +/// visit physical plan node tree and apply function. +/// f: (const PhysicalPlanNodePtr &) -> bool, return true to continue visit. +template +void visit(const PhysicalPlanNodePtr & plan, FF && f) { - cur_plans.push_back(PhysicalSource::build(sample_block, log->identifier())); + if (f(plan)) + { + for (size_t i = 0; i < plan->childrenSize(); ++i) + { + visit(plan->children(i), std::forward(f)); + } + } } -} // namespace DB + +String visitToString(const PhysicalPlanNodePtr & plan); +} // namespace DB::PhysicalPlanVisitor diff --git a/dbms/src/Flash/Planner/PlanType.cpp b/dbms/src/Flash/Planner/PlanType.cpp index 131a9c13b3a..e04f08d005f 100644 --- a/dbms/src/Flash/Planner/PlanType.cpp +++ b/dbms/src/Flash/Planner/PlanType.cpp @@ -23,6 +23,24 @@ String PlanType::toString() const { case Source: return "Source"; + case Limit: + return "Limit"; + case TopN: + return "TopN"; + case Filter: + return "Filter"; + case Aggregation: + return "Aggregation"; + case ExchangeSender: + return "ExchangeSender"; + case MockExchangeSender: + return "MockExchangeSender"; + case ExchangeReceiver: + return "ExchangeReceiver"; + case MockExchangeReceiver: + return "MockExchangeReceiver"; + case Projection: + return "Projection"; default: throw TiFlashException("Unknown PlanType", Errors::Planner::Internal); } diff --git a/dbms/src/Flash/Planner/PlanType.h b/dbms/src/Flash/Planner/PlanType.h index 9a5f26a497b..7cd0c28d2e8 100644 --- a/dbms/src/Flash/Planner/PlanType.h +++ b/dbms/src/Flash/Planner/PlanType.h @@ -23,6 +23,15 @@ struct PlanType enum PlanTypeEnum { Source = 0, + Limit = 1, + TopN = 2, + Filter = 3, + Aggregation = 4, + ExchangeSender = 5, + MockExchangeSender = 6, + ExchangeReceiver = 7, + MockExchangeReceiver = 8, + Projection = 9, }; PlanTypeEnum enum_value; diff --git a/dbms/src/Flash/Planner/Planner.cpp b/dbms/src/Flash/Planner/Planner.cpp index 3ccfc1234d3..b798123de71 100644 --- a/dbms/src/Flash/Planner/Planner.cpp +++ b/dbms/src/Flash/Planner/Planner.cpp @@ -15,13 +15,52 @@ #include #include #include -#include +#include #include -#include #include +#include namespace DB { +namespace +{ +void analyzePhysicalPlan(Context & context, PhysicalPlan & physical_plan, const DAGQueryBlock & query_block) +{ + assert(query_block.source); + physical_plan.build(query_block.source_name, query_block.source); + + // selection on table scan had been executed in table scan. + // In test mode, filter is not pushed down to table scan. + if (query_block.selection && (!query_block.isTableScanSource() || context.getDAGContext()->isTest())) + { + physical_plan.build(query_block.selection_name, query_block.selection); + } + + if (query_block.aggregation) + { + physical_plan.build(query_block.aggregation_name, query_block.aggregation); + + if (query_block.having) + { + physical_plan.build(query_block.having_name, query_block.having); + } + } + + // TopN/Limit + if (query_block.limit_or_topn) + { + physical_plan.build(query_block.limit_or_topn_name, query_block.limit_or_topn); + } + + physical_plan.buildFinalProjection(query_block.qb_column_prefix, query_block.isRootQueryBlock()); + + if (query_block.exchange_sender) + { + physical_plan.build(query_block.exchange_sender_name, query_block.exchange_sender); + } +} +} // namespace + Planner::Planner( Context & context_, const std::vector & input_streams_vec_, @@ -46,9 +85,11 @@ BlockInputStreams Planner::execute() return pipeline.streams; } -bool Planner::isSupported(const DAGQueryBlock &) +bool Planner::isSupported(const DAGQueryBlock & query_block) { - return false; + return query_block.source + && (query_block.source->tp() == tipb::ExecType::TypeProjection + || query_block.source->tp() == tipb::ExecType::TypeExchangeReceiver); } DAGContext & Planner::dagContext() const @@ -64,15 +105,22 @@ void Planner::restorePipelineConcurrency(DAGPipeline & pipeline) void Planner::executeImpl(DAGPipeline & pipeline) { - PhysicalPlanBuilder builder{context, log->identifier()}; + PhysicalPlan physical_plan{context, log->identifier()}; for (const auto & input_streams : input_streams_vec) { RUNTIME_ASSERT(!input_streams.empty(), log, "input streams cannot be empty"); - builder.buildSource(input_streams.back()->getHeader()); + physical_plan.buildSource(input_streams); } - auto physical_plan = builder.getResult(); - physical_plan = optimize(context, physical_plan); - physical_plan->transform(pipeline, context, max_streams); + analyzePhysicalPlan(context, physical_plan, query_block); + + physical_plan.outputAndOptimize(); + + LOG_FMT_DEBUG( + log, + "build physical plan: \n{}", + physical_plan.toString()); + + physical_plan.transform(pipeline, context, max_streams); } } // namespace DB diff --git a/dbms/src/Flash/Planner/optimize.cpp b/dbms/src/Flash/Planner/optimize.cpp index 244ddd534b6..1e16bcb5469 100644 --- a/dbms/src/Flash/Planner/optimize.cpp +++ b/dbms/src/Flash/Planner/optimize.cpp @@ -20,7 +20,7 @@ namespace DB class Rule { public: - virtual PhysicalPlanPtr apply(const Context & context, PhysicalPlanPtr plan) = 0; + virtual PhysicalPlanNodePtr apply(const Context & context, PhysicalPlanNodePtr plan) = 0; virtual ~Rule() = default; }; @@ -29,7 +29,7 @@ using RulePtr = std::shared_ptr; class FinalizeRule : public Rule { public: - PhysicalPlanPtr apply(const Context &, PhysicalPlanPtr plan) override + PhysicalPlanNodePtr apply(const Context &, PhysicalPlanNodePtr plan) override { plan->finalize(); return plan; @@ -38,12 +38,13 @@ class FinalizeRule : public Rule static RulePtr create() { return std::make_shared(); } }; -PhysicalPlanPtr optimize(const Context & context, PhysicalPlanPtr plan) +PhysicalPlanNodePtr optimize(const Context & context, PhysicalPlanNodePtr plan) { assert(plan); static std::vector rules{FinalizeRule::create()}; for (const auto & rule : rules) plan = rule->apply(context, plan); + assert(plan); return plan; } } // namespace DB diff --git a/dbms/src/Flash/Planner/optimize.h b/dbms/src/Flash/Planner/optimize.h index 8ba738c9f77..8053ba5532b 100644 --- a/dbms/src/Flash/Planner/optimize.h +++ b/dbms/src/Flash/Planner/optimize.h @@ -14,10 +14,10 @@ #pragma once -#include +#include namespace DB { class Context; -PhysicalPlanPtr optimize(const Context & context, PhysicalPlanPtr plan); +PhysicalPlanNodePtr optimize(const Context & context, PhysicalPlanNodePtr plan); } // namespace DB diff --git a/dbms/src/Flash/Planner/plans/PhysicalAggregation.cpp b/dbms/src/Flash/Planner/plans/PhysicalAggregation.cpp new file mode 100644 index 00000000000..45e4586dd18 --- /dev/null +++ b/dbms/src/Flash/Planner/plans/PhysicalAggregation.cpp @@ -0,0 +1,178 @@ +// Copyright 2022 PingCAP, Ltd. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include + +namespace DB +{ +PhysicalPlanNodePtr PhysicalAggregation::build( + const Context & context, + const String & executor_id, + const LoggerPtr & log, + const tipb::Aggregation & aggregation, + const PhysicalPlanNodePtr & child) +{ + assert(child); + + if (unlikely(aggregation.group_by_size() == 0 && aggregation.agg_func_size() == 0)) + { + //should not reach here + throw TiFlashException("Aggregation executor without group by/agg exprs", Errors::Planner::BadRequest); + } + + DAGExpressionAnalyzer analyzer{child->getSchema(), context}; + ExpressionActionsPtr before_agg_actions = PhysicalPlanHelper::newActions(child->getSampleBlock(), context); + NamesAndTypes aggregated_columns; + AggregateDescriptions aggregate_descriptions; + Names aggregation_keys; + TiDB::TiDBCollators collators; + { + std::unordered_set agg_key_set; + analyzer.buildAggFuncs(aggregation, before_agg_actions, aggregate_descriptions, aggregated_columns); + analyzer.buildAggGroupBy( + aggregation.group_by(), + before_agg_actions, + aggregate_descriptions, + aggregated_columns, + aggregation_keys, + agg_key_set, + AggregationInterpreterHelper::isGroupByCollationSensitive(context), + collators); + } + + auto cast_after_agg_actions = PhysicalPlanHelper::newActions(aggregated_columns, context); + analyzer.reset(aggregated_columns); + analyzer.appendCastAfterAgg(cast_after_agg_actions, aggregation); + /// project action after aggregation to remove useless columns. + const NamesAndTypes & schema = analyzer.getCurrentInputColumns(); + cast_after_agg_actions->add(ExpressionAction::project(PhysicalPlanHelper::schemaToNames(schema))); + + auto physical_agg = std::make_shared( + executor_id, + schema, + log->identifier(), + child, + before_agg_actions, + aggregation_keys, + collators, + AggregationInterpreterHelper::isFinalAgg(aggregation), + aggregate_descriptions, + cast_after_agg_actions); + // For agg, `recordProfileStreams` has been called in `transformImpl`. + physical_agg->disableRecordProfileStreams(); + return physical_agg; +} + +void PhysicalAggregation::transformImpl(DAGPipeline & pipeline, Context & context, size_t max_streams) +{ + child->transform(pipeline, context, max_streams); + + executeExpression(pipeline, before_agg_actions, log, "before aggregation"); + + Block before_agg_header = pipeline.firstStream()->getHeader(); + AggregationInterpreterHelper::fillArgColumnNumbers(aggregate_descriptions, before_agg_header); + auto params = AggregationInterpreterHelper::buildParams( + context, + before_agg_header, + pipeline.streams.size(), + aggregation_keys, + aggregation_collators, + aggregate_descriptions, + is_final_agg); + + /// If there are several sources, then we perform parallel aggregation + if (pipeline.streams.size() > 1) + { + const Settings & settings = context.getSettingsRef(); + BlockInputStreamPtr stream_with_non_joined_data = combinedNonJoinedDataStream(pipeline, max_streams, log); + pipeline.firstStream() = std::make_shared( + pipeline.streams, + stream_with_non_joined_data, + params, + context.getFileProvider(), + true, + max_streams, + settings.aggregation_memory_efficient_merge_threads ? static_cast(settings.aggregation_memory_efficient_merge_threads) : static_cast(settings.max_threads), + log->identifier()); + pipeline.streams.resize(1); + // should record for agg before restore concurrency. See #3804. + recordProfileStreams(pipeline, context); + restoreConcurrency(pipeline, context.getDAGContext()->final_concurrency, log); + } + else + { + BlockInputStreamPtr stream_with_non_joined_data = combinedNonJoinedDataStream(pipeline, max_streams, log); + BlockInputStreams inputs; + if (!pipeline.streams.empty()) + inputs.push_back(pipeline.firstStream()); + else + pipeline.streams.resize(1); + if (stream_with_non_joined_data) + inputs.push_back(stream_with_non_joined_data); + pipeline.firstStream() = std::make_shared( + std::make_shared(inputs, log->identifier()), + params, + context.getFileProvider(), + true, + log->identifier()); + recordProfileStreams(pipeline, context); + } + + executeExpression(pipeline, cast_after_agg, log, "cast after aggregation"); +} + +void PhysicalAggregation::finalize(const Names & parent_require) +{ + // schema.size() >= parent_require.size() + FinalizeHelper::checkSchemaContainsParentRequire(schema, parent_require); + cast_after_agg->finalize(PhysicalPlanHelper::schemaToNames(schema)); + + Names before_agg_output; + // set required output for agg funcs's arguments and group by keys. + for (const auto & aggregate_description : aggregate_descriptions) + { + for (const auto & argument_name : aggregate_description.argument_names) + before_agg_output.push_back(argument_name); + } + for (const auto & aggregation_key : aggregation_keys) + { + before_agg_output.push_back(aggregation_key); + } + + before_agg_actions->finalize(before_agg_output); + child->finalize(before_agg_actions->getRequiredColumns()); + FinalizeHelper::prependProjectInputIfNeed(before_agg_actions, child->getSampleBlock().columns()); + + FinalizeHelper::checkSampleBlockContainsSchema(getSampleBlock(), schema); +} + +const Block & PhysicalAggregation::getSampleBlock() const +{ + return cast_after_agg->getSampleBlock(); +} +} // namespace DB diff --git a/dbms/src/Flash/Planner/plans/PhysicalAggregation.h b/dbms/src/Flash/Planner/plans/PhysicalAggregation.h new file mode 100644 index 00000000000..3d3d27384cb --- /dev/null +++ b/dbms/src/Flash/Planner/plans/PhysicalAggregation.h @@ -0,0 +1,68 @@ +// Copyright 2022 PingCAP, Ltd. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#pragma once + +#include +#include +#include +#include + +namespace DB +{ +class PhysicalAggregation : public PhysicalUnary +{ +public: + static PhysicalPlanNodePtr build( + const Context & context, + const String & executor_id, + const LoggerPtr & log, + const tipb::Aggregation & aggregation, + const PhysicalPlanNodePtr & child); + + PhysicalAggregation( + const String & executor_id_, + const NamesAndTypes & schema_, + const String & req_id, + const PhysicalPlanNodePtr & child_, + const ExpressionActionsPtr & before_agg_actions_, + const Names & aggregation_keys_, + const TiDB::TiDBCollators & aggregation_collators_, + bool is_final_agg_, + const AggregateDescriptions & aggregate_descriptions_, + const ExpressionActionsPtr & castAfterAgg_) + : PhysicalUnary(executor_id_, PlanType::Aggregation, schema_, req_id, child_) + , before_agg_actions(before_agg_actions_) + , aggregation_keys(aggregation_keys_) + , aggregation_collators(aggregation_collators_) + , is_final_agg(is_final_agg_) + , aggregate_descriptions(aggregate_descriptions_) + , cast_after_agg(castAfterAgg_) + {} + + void finalize(const Names & parent_require) override; + + const Block & getSampleBlock() const override; + +private: + void transformImpl(DAGPipeline & pipeline, Context & context, size_t max_streams) override; + + ExpressionActionsPtr before_agg_actions; + Names aggregation_keys; + TiDB::TiDBCollators aggregation_collators; + bool is_final_agg; + AggregateDescriptions aggregate_descriptions; + ExpressionActionsPtr cast_after_agg; +}; +} // namespace DB diff --git a/dbms/src/Flash/Planner/plans/PhysicalExchangeReceiver.cpp b/dbms/src/Flash/Planner/plans/PhysicalExchangeReceiver.cpp new file mode 100644 index 00000000000..ee40e42e1aa --- /dev/null +++ b/dbms/src/Flash/Planner/plans/PhysicalExchangeReceiver.cpp @@ -0,0 +1,89 @@ +// Copyright 2022 PingCAP, Ltd. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include + +namespace DB +{ +PhysicalExchangeReceiver::PhysicalExchangeReceiver( + const String & executor_id_, + const NamesAndTypes & schema_, + const String & req_id, + const Block & sample_block_, + const std::shared_ptr & mpp_exchange_receiver_) + : PhysicalLeaf(executor_id_, PlanType::ExchangeReceiver, schema_, req_id) + , sample_block(sample_block_) + , mpp_exchange_receiver(mpp_exchange_receiver_) +{} + +PhysicalPlanNodePtr PhysicalExchangeReceiver::build( + const Context & context, + const String & executor_id, + const LoggerPtr & log) +{ + const auto & mpp_exchange_receiver_map = context.getDAGContext()->getMPPExchangeReceiverMap(); + + auto it = mpp_exchange_receiver_map.find(executor_id); + if (unlikely(it == mpp_exchange_receiver_map.end())) + throw TiFlashException( + fmt::format("Can not find exchange receiver for {}", executor_id), + Errors::Planner::Internal); + + const auto & mpp_exchange_receiver = it->second; + NamesAndTypes schema = toNamesAndTypes(mpp_exchange_receiver->getOutputSchema()); + auto physical_exchange_receiver = std::make_shared( + executor_id, + schema, + log->identifier(), + PhysicalPlanHelper::constructBlockFromSchema(schema), + mpp_exchange_receiver); + return physical_exchange_receiver; +} + +void PhysicalExchangeReceiver::transformImpl(DAGPipeline & pipeline, Context & context, size_t max_streams) +{ + auto & dag_context = *context.getDAGContext(); + // todo choose a more reasonable stream number + auto & exchange_receiver_io_input_streams = dag_context.getInBoundIOInputStreamsMap()[executor_id]; + for (size_t i = 0; i < max_streams; ++i) + { + BlockInputStreamPtr stream = std::make_shared(mpp_exchange_receiver, log->identifier(), executor_id); + exchange_receiver_io_input_streams.push_back(stream); + stream = std::make_shared(stream, 8192, 0, log->identifier()); + stream->setExtraInfo("squashing after exchange receiver"); + pipeline.streams.push_back(stream); + } +} + +void PhysicalExchangeReceiver::finalize(const Names & parent_require) +{ + FinalizeHelper::checkSchemaContainsParentRequire(schema, parent_require); +} + +const Block & PhysicalExchangeReceiver::getSampleBlock() const +{ + return sample_block; +} +} // namespace DB diff --git a/dbms/src/Flash/Planner/plans/PhysicalExchangeReceiver.h b/dbms/src/Flash/Planner/plans/PhysicalExchangeReceiver.h new file mode 100644 index 00000000000..6dd2412d821 --- /dev/null +++ b/dbms/src/Flash/Planner/plans/PhysicalExchangeReceiver.h @@ -0,0 +1,49 @@ +// Copyright 2022 PingCAP, Ltd. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#pragma once + +#include + +namespace DB +{ +class ExchangeReceiver; + +class PhysicalExchangeReceiver : public PhysicalLeaf +{ +public: + static PhysicalPlanNodePtr build( + const Context & context, + const String & executor_id, + const LoggerPtr & log); + + PhysicalExchangeReceiver( + const String & executor_id_, + const NamesAndTypes & schema_, + const String & req_id, + const Block & sample_block_, + const std::shared_ptr & mpp_exchange_receiver_); + + void finalize(const Names & parent_require) override; + + const Block & getSampleBlock() const override; + +private: + void transformImpl(DAGPipeline & pipeline, Context & context, size_t max_streams) override; + + Block sample_block; + + std::shared_ptr mpp_exchange_receiver; +}; +} // namespace DB diff --git a/dbms/src/Flash/Planner/plans/PhysicalExchangeSender.cpp b/dbms/src/Flash/Planner/plans/PhysicalExchangeSender.cpp new file mode 100644 index 00000000000..373b04a3941 --- /dev/null +++ b/dbms/src/Flash/Planner/plans/PhysicalExchangeSender.cpp @@ -0,0 +1,83 @@ +// Copyright 2022 PingCAP, Ltd. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#include +#include +#include +#include +#include +#include +#include +#include +#include + +namespace DB +{ +PhysicalPlanNodePtr PhysicalExchangeSender::build( + const String & executor_id, + const LoggerPtr & log, + const tipb::ExchangeSender & exchange_sender, + const PhysicalPlanNodePtr & child) +{ + assert(child); + + std::vector partition_col_ids = ExchangeSenderInterpreterHelper::genPartitionColIds(exchange_sender); + TiDB::TiDBCollators partition_col_collators = ExchangeSenderInterpreterHelper::genPartitionColCollators(exchange_sender); + + auto physical_exchange_sender = std::make_shared( + executor_id, + child->getSchema(), + log->identifier(), + child, + partition_col_ids, + partition_col_collators, + exchange_sender.tp()); + return physical_exchange_sender; +} + +void PhysicalExchangeSender::transformImpl(DAGPipeline & pipeline, Context & context, size_t max_streams) +{ + child->transform(pipeline, context, max_streams); + + auto & dag_context = *context.getDAGContext(); + restoreConcurrency(pipeline, dag_context.final_concurrency, log); + + RUNTIME_ASSERT(dag_context.isMPPTask() && dag_context.tunnel_set != nullptr, log, "exchange_sender only run in MPP"); + + int stream_id = 0; + pipeline.transform([&](auto & stream) { + // construct writer + std::unique_ptr response_writer = std::make_unique>( + dag_context.tunnel_set, + partition_col_ids, + partition_col_collators, + exchange_type, + context.getSettingsRef().dag_records_per_chunk, + context.getSettingsRef().batch_send_min_limit, + stream_id++ == 0, /// only one stream needs to sending execution summaries for the last response + dag_context); + stream = std::make_shared(stream, std::move(response_writer), log->identifier()); + }); +} + +void PhysicalExchangeSender::finalize(const Names & parent_require) +{ + child->finalize(parent_require); +} + +const Block & PhysicalExchangeSender::getSampleBlock() const +{ + return child->getSampleBlock(); +} +} // namespace DB diff --git a/dbms/src/Flash/Planner/plans/PhysicalExchangeSender.h b/dbms/src/Flash/Planner/plans/PhysicalExchangeSender.h new file mode 100644 index 00000000000..1323674c3d4 --- /dev/null +++ b/dbms/src/Flash/Planner/plans/PhysicalExchangeSender.h @@ -0,0 +1,57 @@ +// Copyright 2022 PingCAP, Ltd. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#pragma once + +#include +#include +#include + +namespace DB +{ +class PhysicalExchangeSender : public PhysicalUnary +{ +public: + static PhysicalPlanNodePtr build( + const String & executor_id, + const LoggerPtr & log, + const tipb::ExchangeSender & exchange_sender, + const PhysicalPlanNodePtr & child); + + PhysicalExchangeSender( + const String & executor_id_, + const NamesAndTypes & schema_, + const String & req_id, + const PhysicalPlanNodePtr & child_, + const std::vector & partition_col_ids_, + const TiDB::TiDBCollators & collators_, + const tipb::ExchangeType & exchange_type_) + : PhysicalUnary(executor_id_, PlanType::ExchangeSender, schema_, req_id, child_) + , partition_col_ids(partition_col_ids_) + , partition_col_collators(collators_) + , exchange_type(exchange_type_) + {} + + void finalize(const Names & parent_require) override; + + const Block & getSampleBlock() const override; + +private: + void transformImpl(DAGPipeline & pipeline, Context & context, size_t max_streams) override; + + std::vector partition_col_ids; + TiDB::TiDBCollators partition_col_collators; + tipb::ExchangeType exchange_type; +}; +} // namespace DB diff --git a/dbms/src/Flash/Planner/plans/PhysicalFilter.cpp b/dbms/src/Flash/Planner/plans/PhysicalFilter.cpp new file mode 100644 index 00000000000..c014f3b54af --- /dev/null +++ b/dbms/src/Flash/Planner/plans/PhysicalFilter.cpp @@ -0,0 +1,77 @@ +// Copyright 2022 PingCAP, Ltd. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#include +#include +#include +#include +#include +#include +#include +#include + +namespace DB +{ +PhysicalPlanNodePtr PhysicalFilter::build( + const Context & context, + const String & executor_id, + const LoggerPtr & log, + const tipb::Selection & selection, + const PhysicalPlanNodePtr & child) +{ + assert(child); + + DAGExpressionAnalyzer analyzer{child->getSchema(), context}; + ExpressionActionsPtr before_filter_actions = PhysicalPlanHelper::newActions(child->getSampleBlock(), context); + + std::vector conditions; + for (const auto & c : selection.conditions()) + conditions.push_back(&c); + String filter_column_name = analyzer.buildFilterColumn(before_filter_actions, conditions); + + auto physical_filter = std::make_shared( + executor_id, + child->getSchema(), + log->identifier(), + child, + filter_column_name, + before_filter_actions); + + return physical_filter; +} + +void PhysicalFilter::transformImpl(DAGPipeline & pipeline, Context & context, size_t max_streams) +{ + child->transform(pipeline, context, max_streams); + + pipeline.transform([&](auto & stream) { stream = std::make_shared(stream, before_filter_actions, filter_column, log->identifier()); }); +} + +void PhysicalFilter::finalize(const Names & parent_require) +{ + Names required_output = parent_require; + required_output.push_back(filter_column); + before_filter_actions->finalize(required_output); + + child->finalize(before_filter_actions->getRequiredColumns()); + FinalizeHelper::prependProjectInputIfNeed(before_filter_actions, child->getSampleBlock().columns()); + + FinalizeHelper::checkSampleBlockContainsSchema(getSampleBlock(), schema); +} + +const Block & PhysicalFilter::getSampleBlock() const +{ + return before_filter_actions->getSampleBlock(); +} +} // namespace DB diff --git a/dbms/src/Flash/Planner/plans/PhysicalFilter.h b/dbms/src/Flash/Planner/plans/PhysicalFilter.h new file mode 100644 index 00000000000..27d050b4a61 --- /dev/null +++ b/dbms/src/Flash/Planner/plans/PhysicalFilter.h @@ -0,0 +1,55 @@ +// Copyright 2022 PingCAP, Ltd. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#pragma once + +#include +#include +#include + +namespace DB +{ +class PhysicalFilter : public PhysicalUnary +{ +public: + static PhysicalPlanNodePtr build( + const Context & context, + const String & executor_id, + const LoggerPtr & log, + const tipb::Selection & selection, + const PhysicalPlanNodePtr & child); + + PhysicalFilter( + const String & executor_id_, + const NamesAndTypes & schema_, + const String & req_id, + const PhysicalPlanNodePtr & child_, + const String & filter_column_, + const ExpressionActionsPtr & before_filter_actions_) + : PhysicalUnary(executor_id_, PlanType::Filter, schema_, req_id, child_) + , filter_column(filter_column_) + , before_filter_actions(before_filter_actions_) + {} + + void finalize(const Names & parent_require) override; + + const Block & getSampleBlock() const override; + +private: + void transformImpl(DAGPipeline & pipeline, Context & context, size_t max_streams) override; + + String filter_column; + ExpressionActionsPtr before_filter_actions; +}; +} // namespace DB diff --git a/dbms/src/Flash/Planner/plans/PhysicalLeaf.h b/dbms/src/Flash/Planner/plans/PhysicalLeaf.h index 50ced412c13..343ab66625c 100644 --- a/dbms/src/Flash/Planner/plans/PhysicalLeaf.h +++ b/dbms/src/Flash/Planner/plans/PhysicalLeaf.h @@ -15,14 +15,14 @@ #pragma once #include -#include +#include namespace DB { /** * A physical plan node with no children. */ -class PhysicalLeaf : public PhysicalPlan +class PhysicalLeaf : public PhysicalPlanNode { public: PhysicalLeaf( @@ -30,20 +30,15 @@ class PhysicalLeaf : public PhysicalPlan const PlanType & type_, const NamesAndTypes & schema_, const String & req_id) - : PhysicalPlan(executor_id_, type_, schema_, req_id) + : PhysicalPlanNode(executor_id_, type_, schema_, req_id) {} - PhysicalPlanPtr children(size_t) const override + PhysicalPlanNodePtr children(size_t) const override { throw TiFlashException("the children size of PhysicalLeaf is zero", Errors::Planner::Internal); } - void setChild(size_t, const PhysicalPlanPtr &) override - { - throw TiFlashException("the children size of PhysicalLeaf is zero", Errors::Planner::Internal); - } - - void appendChild(const PhysicalPlanPtr &) override + void setChild(size_t, const PhysicalPlanNodePtr &) override { throw TiFlashException("the children size of PhysicalLeaf is zero", Errors::Planner::Internal); } diff --git a/dbms/src/Flash/Planner/plans/PhysicalLimit.cpp b/dbms/src/Flash/Planner/plans/PhysicalLimit.cpp new file mode 100644 index 00000000000..2722e7e1869 --- /dev/null +++ b/dbms/src/Flash/Planner/plans/PhysicalLimit.cpp @@ -0,0 +1,62 @@ +// Copyright 2022 PingCAP, Ltd. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#include +#include +#include +#include +#include +#include +#include + +namespace DB +{ +PhysicalPlanNodePtr PhysicalLimit::build( + const String & executor_id, + const LoggerPtr & log, + const tipb::Limit & limit, + const PhysicalPlanNodePtr & child) +{ + assert(child); + auto physical_limit = std::make_shared( + executor_id, + child->getSchema(), + log->identifier(), + child, + limit.limit()); + return physical_limit; +} + +void PhysicalLimit::transformImpl(DAGPipeline & pipeline, Context & context, size_t max_streams) +{ + child->transform(pipeline, context, max_streams); + + pipeline.transform([&](auto & stream) { stream = std::make_shared(stream, limit, 0, log->identifier(), false); }); + if (pipeline.hasMoreThanOneStream()) + { + executeUnion(pipeline, max_streams, log, false, "for partial limit"); + pipeline.transform([&](auto & stream) { stream = std::make_shared(stream, limit, 0, log->identifier(), false); }); + } +} + +void PhysicalLimit::finalize(const Names & parent_require) +{ + child->finalize(parent_require); +} + +const Block & PhysicalLimit::getSampleBlock() const +{ + return child->getSampleBlock(); +} +} // namespace DB diff --git a/dbms/src/Flash/Planner/plans/PhysicalLimit.h b/dbms/src/Flash/Planner/plans/PhysicalLimit.h new file mode 100644 index 00000000000..24f6fe83044 --- /dev/null +++ b/dbms/src/Flash/Planner/plans/PhysicalLimit.h @@ -0,0 +1,50 @@ +// Copyright 2022 PingCAP, Ltd. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#pragma once + +#include +#include + +namespace DB +{ +class PhysicalLimit : public PhysicalUnary +{ +public: + static PhysicalPlanNodePtr build( + const String & executor_id, + const LoggerPtr & log, + const tipb::Limit & limit, + const PhysicalPlanNodePtr & child); + + PhysicalLimit( + const String & executor_id_, + const NamesAndTypes & schema_, + const String & req_id, + const PhysicalPlanNodePtr & child_, + size_t limit_) + : PhysicalUnary(executor_id_, PlanType::Limit, schema_, req_id, child_) + , limit(limit_) + {} + + void finalize(const Names & parent_require) override; + + const Block & getSampleBlock() const override; + +private: + void transformImpl(DAGPipeline & pipeline, Context & context, size_t max_streams) override; + + size_t limit; +}; +} // namespace DB diff --git a/dbms/src/Flash/Planner/plans/PhysicalMockExchangeReceiver.cpp b/dbms/src/Flash/Planner/plans/PhysicalMockExchangeReceiver.cpp new file mode 100644 index 00000000000..1dc5b680937 --- /dev/null +++ b/dbms/src/Flash/Planner/plans/PhysicalMockExchangeReceiver.cpp @@ -0,0 +1,86 @@ +// Copyright 2022 PingCAP, Ltd. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#include +#include +#include +#include +#include +#include +#include + +namespace DB +{ +PhysicalMockExchangeReceiver::PhysicalMockExchangeReceiver( + const String & executor_id_, + const NamesAndTypes & schema_, + const String & req_id, + const Block & sample_block_, + const BlockInputStreams & mock_streams_) + : PhysicalLeaf(executor_id_, PlanType::MockExchangeReceiver, schema_, req_id) + , sample_block(sample_block_) + , mock_streams(mock_streams_) +{} + +PhysicalPlanNodePtr PhysicalMockExchangeReceiver::build( + Context & context, + const String & executor_id, + const LoggerPtr & log, + const tipb::ExchangeReceiver & exchange_receiver) +{ + NamesAndTypes schema; + BlockInputStreams mock_streams; + auto & dag_context = *context.getDAGContext(); + size_t max_streams = dag_context.initialize_concurrency; + if (dag_context.columnsForTestEmpty() || dag_context.columnsForTest(executor_id).empty()) + { + for (size_t i = 0; i < max_streams; ++i) + // use max_block_size / 10 to determine the mock block's size + mock_streams.push_back(std::make_shared(exchange_receiver, context.getSettingsRef().max_block_size, context.getSettingsRef().max_block_size / 10)); + for (const auto & col : mock_streams.back()->getHeader()) + schema.emplace_back(col.name, col.type); + } + else + { + auto [names_and_types, mock_exchange_streams] = mockSourceStream(context, max_streams, log, executor_id); + schema = std::move(names_and_types); + mock_streams.insert(mock_streams.end(), mock_exchange_streams.begin(), mock_exchange_streams.end()); + } + assert(!schema.empty()); + assert(!mock_streams.empty()); + + auto physical_mock_exchange_receiver = std::make_shared( + executor_id, + schema, + log->identifier(), + PhysicalPlanHelper::constructBlockFromSchema(schema), + mock_streams); + return physical_mock_exchange_receiver; +} + +void PhysicalMockExchangeReceiver::transformImpl(DAGPipeline & pipeline, Context & /*context*/, size_t /*max_streams*/) +{ + pipeline.streams.insert(pipeline.streams.end(), mock_streams.begin(), mock_streams.end()); +} + +void PhysicalMockExchangeReceiver::finalize(const Names & parent_require) +{ + FinalizeHelper::checkSchemaContainsParentRequire(schema, parent_require); +} + +const Block & PhysicalMockExchangeReceiver::getSampleBlock() const +{ + return sample_block; +} +} // namespace DB diff --git a/dbms/src/Flash/Planner/plans/PhysicalMockExchangeReceiver.h b/dbms/src/Flash/Planner/plans/PhysicalMockExchangeReceiver.h new file mode 100644 index 00000000000..b269ee438a7 --- /dev/null +++ b/dbms/src/Flash/Planner/plans/PhysicalMockExchangeReceiver.h @@ -0,0 +1,50 @@ +// Copyright 2022 PingCAP, Ltd. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#pragma once + +#include +#include +#include + +namespace DB +{ +class PhysicalMockExchangeReceiver : public PhysicalLeaf +{ +public: + static PhysicalPlanNodePtr build( + Context & context, + const String & executor_id, + const LoggerPtr & log, + const tipb::ExchangeReceiver & exchange_receiver); + + PhysicalMockExchangeReceiver( + const String & executor_id_, + const NamesAndTypes & schema_, + const String & req_id, + const Block & sample_block_, + const BlockInputStreams & mock_streams_); + + void finalize(const Names & parent_require) override; + + const Block & getSampleBlock() const override; + +private: + void transformImpl(DAGPipeline & pipeline, Context & /*context*/, size_t /*max_streams*/) override; + + Block sample_block; + + BlockInputStreams mock_streams; +}; +} // namespace DB diff --git a/dbms/src/Flash/Planner/plans/PhysicalMockExchangeSender.cpp b/dbms/src/Flash/Planner/plans/PhysicalMockExchangeSender.cpp new file mode 100644 index 00000000000..8405ee2283f --- /dev/null +++ b/dbms/src/Flash/Planner/plans/PhysicalMockExchangeSender.cpp @@ -0,0 +1,54 @@ +// Copyright 2022 PingCAP, Ltd. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#include +#include +#include +#include +#include + +namespace DB +{ +PhysicalPlanNodePtr PhysicalMockExchangeSender::build( + const String & executor_id, + const LoggerPtr & log, + const PhysicalPlanNodePtr & child) +{ + assert(child); + + auto physical_mock_exchange_sender = std::make_shared( + executor_id, + child->getSchema(), + log->identifier(), + child); + return physical_mock_exchange_sender; +} + +void PhysicalMockExchangeSender::transformImpl(DAGPipeline & pipeline, Context & context, size_t max_streams) +{ + child->transform(pipeline, context, max_streams); + + pipeline.transform([&](auto & stream) { stream = std::make_shared(stream, log->identifier()); }); +} + +void PhysicalMockExchangeSender::finalize(const Names & parent_require) +{ + child->finalize(parent_require); +} + +const Block & PhysicalMockExchangeSender::getSampleBlock() const +{ + return child->getSampleBlock(); +} +} // namespace DB diff --git a/dbms/src/Flash/Planner/plans/PhysicalMockExchangeSender.h b/dbms/src/Flash/Planner/plans/PhysicalMockExchangeSender.h new file mode 100644 index 00000000000..bfebf34c1ea --- /dev/null +++ b/dbms/src/Flash/Planner/plans/PhysicalMockExchangeSender.h @@ -0,0 +1,46 @@ +// Copyright 2022 PingCAP, Ltd. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#pragma once + +#include +#include +#include + +namespace DB +{ +class PhysicalMockExchangeSender : public PhysicalUnary +{ +public: + static PhysicalPlanNodePtr build( + const String & executor_id, + const LoggerPtr & log, + const PhysicalPlanNodePtr & child); + + PhysicalMockExchangeSender( + const String & executor_id_, + const NamesAndTypes & schema_, + const String & req_id, + const PhysicalPlanNodePtr & child_) + : PhysicalUnary(executor_id_, PlanType::MockExchangeSender, schema_, req_id, child_) + {} + + void finalize(const Names & parent_require) override; + + const Block & getSampleBlock() const override; + +private: + void transformImpl(DAGPipeline & pipeline, Context & context, size_t max_streams) override; +}; +} // namespace DB diff --git a/dbms/src/Flash/Planner/plans/PhysicalProjection.cpp b/dbms/src/Flash/Planner/plans/PhysicalProjection.cpp new file mode 100644 index 00000000000..0835fd557b1 --- /dev/null +++ b/dbms/src/Flash/Planner/plans/PhysicalProjection.cpp @@ -0,0 +1,167 @@ +// Copyright 2022 PingCAP, Ltd. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include + +namespace DB +{ +PhysicalPlanNodePtr PhysicalProjection::build( + const Context & context, + const String & executor_id, + const LoggerPtr & log, + const tipb::Projection & projection, + const PhysicalPlanNodePtr & child) +{ + assert(child); + + DAGExpressionAnalyzer analyzer{child->getSchema(), context}; + ExpressionActionsPtr project_actions = PhysicalPlanHelper::newActions(child->getSampleBlock(), context); + + NamesAndTypes schema; + NamesWithAliases project_aliases; + UniqueNameGenerator unique_name_generator; + for (const auto & expr : projection.exprs()) + { + auto expr_name = analyzer.getActions(expr, project_actions); + const auto & col = project_actions->getSampleBlock().getByName(expr_name); + + String alias = unique_name_generator.toUniqueName(col.name); + project_aliases.emplace_back(col.name, alias); + schema.emplace_back(alias, col.type); + } + /// TODO When there is no alias, there is no need to add the project action. + /// https://github.com/pingcap/tiflash/issues/3921 + project_actions->add(ExpressionAction::project(project_aliases)); + + auto physical_projection = std::make_shared( + executor_id, + schema, + log->identifier(), + child, + "projection", + project_actions); + return physical_projection; +} + +PhysicalPlanNodePtr PhysicalProjection::buildNonRootFinal( + const Context & context, + const LoggerPtr & log, + const String & column_prefix, + const PhysicalPlanNodePtr & child) +{ + assert(child); + + DAGExpressionAnalyzer analyzer{child->getSchema(), context}; + ExpressionActionsPtr project_actions = PhysicalPlanHelper::newActions(child->getSampleBlock(), context); + auto final_project_aliases = analyzer.genNonRootFinalProjectAliases(column_prefix); + project_actions->add(ExpressionAction::project(final_project_aliases)); + + NamesAndTypes schema = child->getSchema(); + assert(final_project_aliases.size() == schema.size()); + // replace column name of schema by alias. + for (size_t i = 0; i < final_project_aliases.size(); ++i) + { + assert(schema[i].name == final_project_aliases[i].first); + schema[i].name = final_project_aliases[i].second; + } + + auto physical_projection = std::make_shared( + "NonRootFinalProjection", + schema, + log->identifier(), + child, + "final projection", + project_actions); + // For final projection, no need to record profile streams. + physical_projection->disableRecordProfileStreams(); + return physical_projection; +} + +PhysicalPlanNodePtr PhysicalProjection::buildRootFinal( + const Context & context, + const LoggerPtr & log, + const std::vector & require_schema, + const std::vector & output_offsets, + const String & column_prefix, + bool keep_session_timezone_info, + const PhysicalPlanNodePtr & child) +{ + assert(child); + + DAGExpressionAnalyzer analyzer{child->getSchema(), context}; + ExpressionActionsPtr project_actions = PhysicalPlanHelper::newActions(child->getSampleBlock(), context); + + NamesWithAliases final_project_aliases = analyzer.buildFinalProjection( + project_actions, + require_schema, + output_offsets, + column_prefix, + keep_session_timezone_info); + + project_actions->add(ExpressionAction::project(final_project_aliases)); + + assert(final_project_aliases.size() == output_offsets.size()); + NamesAndTypes schema; + for (size_t i = 0; i < final_project_aliases.size(); ++i) + { + const auto & alias = final_project_aliases[i].second; + assert(!alias.empty()); + const auto & type = analyzer.getCurrentInputColumns()[output_offsets[i]].type; + schema.emplace_back(alias, type); + } + + auto physical_projection = std::make_shared( + "RootFinalProjection", + schema, + log->identifier(), + child, + "final projection", + project_actions); + // For final projection, no need to record profile streams. + physical_projection->disableRecordProfileStreams(); + return physical_projection; +} + +void PhysicalProjection::transformImpl(DAGPipeline & pipeline, Context & context, size_t max_streams) +{ + child->transform(pipeline, context, max_streams); + + executeExpression(pipeline, project_actions, log, extra_info); +} + +void PhysicalProjection::finalize(const Names & parent_require) +{ + FinalizeHelper::checkSchemaContainsParentRequire(schema, parent_require); + project_actions->finalize(parent_require); + + child->finalize(project_actions->getRequiredColumns()); + FinalizeHelper::prependProjectInputIfNeed(project_actions, child->getSampleBlock().columns()); + + FinalizeHelper::checkSampleBlockContainsSchema(getSampleBlock(), schema); +} + +const Block & PhysicalProjection::getSampleBlock() const +{ + return project_actions->getSampleBlock(); +} +} // namespace DB diff --git a/dbms/src/Flash/Planner/plans/PhysicalProjection.h b/dbms/src/Flash/Planner/plans/PhysicalProjection.h new file mode 100644 index 00000000000..bd5e8140f28 --- /dev/null +++ b/dbms/src/Flash/Planner/plans/PhysicalProjection.h @@ -0,0 +1,76 @@ +// Copyright 2022 PingCAP, Ltd. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#pragma once + +#include +#include +#include + +namespace DB +{ +class PhysicalProjection : public PhysicalUnary +{ +public: + static PhysicalPlanNodePtr build( + const Context & context, + const String & executor_id, + const LoggerPtr & log, + const tipb::Projection & projection, + const PhysicalPlanNodePtr & child); + + // Generate a project action to keep the schema of Block and tidb-schema the same, + // and guarantee that left/right block of join don't have duplicated column names. + static PhysicalPlanNodePtr buildNonRootFinal( + const Context & context, + const LoggerPtr & log, + const String & column_prefix, + const PhysicalPlanNodePtr & child); + + // Generate a project action for root executor, + // to keep the schema of Block and tidb-schema the same. + // Because the output of the root executor is sent to other TiFlash or TiDB. + static PhysicalPlanNodePtr buildRootFinal( + const Context & context, + const LoggerPtr & log, + const std::vector & require_schema, + const std::vector & output_offsets, + const String & column_prefix, + bool keep_session_timezone_info, + const PhysicalPlanNodePtr & child); + + PhysicalProjection( + const String & executor_id_, + const NamesAndTypes & schema_, + const String & req_id, + const PhysicalPlanNodePtr & child_, + const String & extra_info_, + const ExpressionActionsPtr & project_actions_) + : PhysicalUnary(executor_id_, PlanType::Projection, schema_, req_id, child_) + , extra_info(extra_info_) + , project_actions(project_actions_) + {} + + void finalize(const Names & parent_require) override; + + const Block & getSampleBlock() const override; + +private: + void transformImpl(DAGPipeline & pipeline, Context & context, size_t max_streams) override; + + const String extra_info; + + ExpressionActionsPtr project_actions; +}; +} // namespace DB diff --git a/dbms/src/Flash/Planner/plans/PhysicalSource.cpp b/dbms/src/Flash/Planner/plans/PhysicalSource.cpp new file mode 100644 index 00000000000..b694622314a --- /dev/null +++ b/dbms/src/Flash/Planner/plans/PhysicalSource.cpp @@ -0,0 +1,37 @@ +// Copyright 2022 PingCAP, Ltd. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#include +#include +#include + +namespace DB +{ +PhysicalPlanNodePtr PhysicalSource::build( + const BlockInputStreams & source_streams, + const LoggerPtr & log) +{ + RUNTIME_ASSERT(!source_streams.empty(), log, "source streams cannot be empty"); + Block sample_block = source_streams.back()->getHeader(); + NamesAndTypes schema; + for (const auto & col : sample_block) + schema.emplace_back(col.name, col.type); + return std::make_shared("source", schema, log->identifier(), sample_block, source_streams); +} + +void PhysicalSource::transformImpl(DAGPipeline & pipeline, Context & /*context*/, size_t /*max_streams*/) +{ + pipeline.streams.insert(pipeline.streams.end(), source_streams.begin(), source_streams.end()); +} +} // namespace DB diff --git a/dbms/src/Flash/Planner/plans/PhysicalSource.h b/dbms/src/Flash/Planner/plans/PhysicalSource.h index 6b6837de107..eb178583c6a 100644 --- a/dbms/src/Flash/Planner/plans/PhysicalSource.h +++ b/dbms/src/Flash/Planner/plans/PhysicalSource.h @@ -22,28 +22,24 @@ namespace DB class PhysicalSource : public PhysicalLeaf { public: - static PhysicalPlanPtr build( - const Block & sample_block, - const String & req_id) - { - NamesAndTypes schema; - for (const auto & col : sample_block) - schema.emplace_back(col.name, col.type); - return std::make_shared("source", schema, sample_block, req_id); - } + static PhysicalPlanNodePtr build( + const BlockInputStreams & source_streams, + const LoggerPtr & log); PhysicalSource( const String & executor_id_, const NamesAndTypes & schema_, + const String & req_id, const Block & sample_block_, - const String & req_id) + const BlockInputStreams & source_streams_) : PhysicalLeaf(executor_id_, PlanType::Source, schema_, req_id) , sample_block(sample_block_) + , source_streams(source_streams_) { is_record_profile_streams = false; } - void transformImpl(DAGPipeline &, Context &, size_t) override {} + void transformImpl(DAGPipeline & pipeline, Context & /*context*/, size_t /*max_streams*/) override; void finalize(const Names &) override {} @@ -51,5 +47,7 @@ class PhysicalSource : public PhysicalLeaf private: Block sample_block; + + BlockInputStreams source_streams; }; } // namespace DB diff --git a/dbms/src/Flash/Planner/plans/PhysicalTopN.cpp b/dbms/src/Flash/Planner/plans/PhysicalTopN.cpp new file mode 100644 index 00000000000..d572435d645 --- /dev/null +++ b/dbms/src/Flash/Planner/plans/PhysicalTopN.cpp @@ -0,0 +1,86 @@ +// Copyright 2022 PingCAP, Ltd. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#include +#include +#include +#include +#include +#include +#include +#include +#include + +namespace DB +{ +PhysicalPlanNodePtr PhysicalTopN::build( + const Context & context, + const String & executor_id, + const LoggerPtr & log, + const tipb::TopN & top_n, + const PhysicalPlanNodePtr & child) +{ + assert(child); + + if (unlikely(top_n.order_by_size() == 0)) + { + //should not reach here + throw TiFlashException("TopN executor without order by exprs", Errors::Planner::BadRequest); + } + + DAGExpressionAnalyzer analyzer{child->getSchema(), context}; + ExpressionActionsPtr before_sort_actions = PhysicalPlanHelper::newActions(child->getSampleBlock(), context); + + auto order_columns = analyzer.buildOrderColumns(before_sort_actions, top_n.order_by()); + SortDescription order_descr = getSortDescription(order_columns, top_n.order_by()); + + auto physical_top_n = std::make_shared( + executor_id, + child->getSchema(), + log->identifier(), + child, + order_descr, + before_sort_actions, + top_n.limit()); + return physical_top_n; +} + +void PhysicalTopN::transformImpl(DAGPipeline & pipeline, Context & context, size_t max_streams) +{ + child->transform(pipeline, context, max_streams); + + executeExpression(pipeline, before_sort_actions, log, "before TopN"); + + orderStreams(pipeline, max_streams, order_descr, limit, context, log); +} + +void PhysicalTopN::finalize(const Names & parent_require) +{ + Names required_output = parent_require; + required_output.reserve(required_output.size() + order_descr.size()); + for (const auto & desc : order_descr) + required_output.push_back(desc.column_name); + before_sort_actions->finalize(required_output); + + child->finalize(before_sort_actions->getRequiredColumns()); + FinalizeHelper::prependProjectInputIfNeed(before_sort_actions, child->getSampleBlock().columns()); + + FinalizeHelper::checkSampleBlockContainsSchema(getSampleBlock(), schema); +} + +const Block & PhysicalTopN::getSampleBlock() const +{ + return before_sort_actions->getSampleBlock(); +} +} // namespace DB diff --git a/dbms/src/Flash/Planner/plans/PhysicalTopN.h b/dbms/src/Flash/Planner/plans/PhysicalTopN.h new file mode 100644 index 00000000000..bfabb5f4261 --- /dev/null +++ b/dbms/src/Flash/Planner/plans/PhysicalTopN.h @@ -0,0 +1,59 @@ +// Copyright 2022 PingCAP, Ltd. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#pragma once + +#include +#include +#include +#include + +namespace DB +{ +class PhysicalTopN : public PhysicalUnary +{ +public: + static PhysicalPlanNodePtr build( + const Context & context, + const String & executor_id, + const LoggerPtr & log, + const tipb::TopN & top_n, + const PhysicalPlanNodePtr & child); + + PhysicalTopN( + const String & executor_id_, + const NamesAndTypes & schema_, + const String & req_id, + const PhysicalPlanNodePtr & child_, + const SortDescription & order_descr_, + const ExpressionActionsPtr & before_sort_actions_, + size_t limit_) + : PhysicalUnary(executor_id_, PlanType::TopN, schema_, req_id, child_) + , order_descr(order_descr_) + , before_sort_actions(before_sort_actions_) + , limit(limit_) + {} + + void finalize(const Names & parent_require) override; + + const Block & getSampleBlock() const override; + +private: + void transformImpl(DAGPipeline & pipeline, Context & context, size_t max_streams) override; + + SortDescription order_descr; + ExpressionActionsPtr before_sort_actions; + size_t limit; +}; +} // namespace DB diff --git a/dbms/src/Flash/Planner/plans/PhysicalUnary.h b/dbms/src/Flash/Planner/plans/PhysicalUnary.h index 4d0091bb8e3..37e0cb707b2 100644 --- a/dbms/src/Flash/Planner/plans/PhysicalUnary.h +++ b/dbms/src/Flash/Planner/plans/PhysicalUnary.h @@ -15,7 +15,7 @@ #pragma once #include -#include +#include #include #include @@ -24,43 +24,38 @@ namespace DB /** * A physical plan node with single child. */ -class PhysicalUnary : public PhysicalPlan +class PhysicalUnary : public PhysicalPlanNode { public: PhysicalUnary( const String & executor_id_, const PlanType & type_, const NamesAndTypes & schema_, - const String & req_id) - : PhysicalPlan(executor_id_, type_, schema_, req_id) - {} - - PhysicalPlanPtr children(size_t i) const override + const String & req_id, + const PhysicalPlanNodePtr & child_) + : PhysicalPlanNode(executor_id_, type_, schema_, req_id) + , child(child_) { - RUNTIME_ASSERT(i == 0, log, "child_index({}) should not >= childrenSize({})", i, childrenSize()); - assert(child); - return child; + RUNTIME_ASSERT(child, log, "children(0) shouldn't be nullptr"); } - void setChild(size_t i, const PhysicalPlanPtr & new_child) override + PhysicalPlanNodePtr children(size_t i) const override { - RUNTIME_ASSERT(i == 0, log, "child_index({}) should not >= childrenSize({})", i, childrenSize()); - assert(new_child); - assert(new_child.get() != this); - child = new_child; + RUNTIME_ASSERT(i == 0, log, "child_index({}) shouldn't >= childrenSize({})", i, childrenSize()); + return child; } - void appendChild(const PhysicalPlanPtr & new_child) override + void setChild(size_t i, const PhysicalPlanNodePtr & new_child) override { - RUNTIME_ASSERT(!child, log, "the actual children size had be the max size({}), don't append child again", childrenSize()); - assert(new_child); - assert(new_child.get() != this); + RUNTIME_ASSERT(i == 0, log, "child_index({}) should not >= childrenSize({})", i, childrenSize()); + RUNTIME_ASSERT(new_child, log, "new_child for child_index({}) shouldn't be nullptr", i); + RUNTIME_ASSERT(new_child.get() != this, log, "new_child for child_index({}) shouldn't be itself", i); child = new_child; } size_t childrenSize() const override { return 1; }; protected: - PhysicalPlanPtr child; + PhysicalPlanNodePtr child; }; } // namespace DB diff --git a/dbms/src/Flash/Planner/tests/CMakeLists.txt b/dbms/src/Flash/Planner/tests/CMakeLists.txt new file mode 100644 index 00000000000..944908dcb25 --- /dev/null +++ b/dbms/src/Flash/Planner/tests/CMakeLists.txt @@ -0,0 +1,15 @@ +# Copyright 2022 PingCAP, Ltd. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +include_directories (${CMAKE_CURRENT_BINARY_DIR}) diff --git a/dbms/src/Flash/Planner/tests/gtest_physical_plan.cpp b/dbms/src/Flash/Planner/tests/gtest_physical_plan.cpp new file mode 100644 index 00000000000..7ad06fbeb04 --- /dev/null +++ b/dbms/src/Flash/Planner/tests/gtest_physical_plan.cpp @@ -0,0 +1,225 @@ +// Copyright 2022 PingCAP, Ltd. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#include +#include +#include +#include +#include + +namespace DB +{ +namespace tests +{ +class PhysicalPlanTestRunner : public DB::tests::ExecutorTest +{ +public: + void initializeContext() override + { + ExecutorTest::initializeContext(); + context.addMockTable({"test_db", "test_table"}, + {{"s1", TiDB::TP::TypeString}, {"s2", TiDB::TP::TypeString}}, + {toNullableVec("s1", {"banana", {}, "banana"}), + toNullableVec("s2", {"apple", {}, "banana"})}); + context.addExchangeReceiver("exchange1", + {{"s1", TiDB::TP::TypeString}, {"s2", TiDB::TP::TypeString}}, + {toNullableVec("s1", {"banana", {}, "banana"}), + toNullableVec("s2", {"apple", {}, "banana"})}); + } + + void execute( + const std::shared_ptr & request, + const String & expected_physical_plan, + const String & expected_streams, + const ColumnsWithTypeAndName & expect_columns) + { + // TODO support multi-streams. + size_t max_streams = 1; + DAGContext dag_context(*request, "executor_test", max_streams); + dag_context.setColumnsForTest(context.executorIdColumnsMap()); + context.context.setDAGContext(&dag_context); + + PhysicalPlan physical_plan{context.context, log->identifier()}; + assert(request); + physical_plan.build(request.get()); + physical_plan.outputAndOptimize(); + + ASSERT_EQ(Poco::trim(expected_physical_plan), Poco::trim(physical_plan.toString())); + + BlockInputStreamPtr final_stream; + { + DAGPipeline pipeline; + physical_plan.transform(pipeline, context.context, max_streams); + // TODO support non-joined streams. + assert(pipeline.streams.size() == 1 && pipeline.streams_with_non_joined_data.empty()); + final_stream = pipeline.firstStream(); + FmtBuffer fb; + final_stream->dumpTree(fb); + ASSERT_EQ(Poco::trim(expected_streams), Poco::trim(fb.toString())); + } + + readAndAssertBlock(final_stream, expect_columns); + } + + LoggerPtr log = Logger::get("PhysicalPlanTestRunner", "test_physical_plan"); +}; + +TEST_F(PhysicalPlanTestRunner, Filter) +try +{ + auto request = context.receive("exchange1") + .filter(eq(col("s1"), col("s2"))) + .project({col("s1"), col("s2")}) + .build(context); + + execute( + request, + /*expected_physical_plan=*/R"( + | is_record_profile_streams: true, schema: , + | is_record_profile_streams: true, schema: , + | is_record_profile_streams: true, schema: , )", + /*expected_streams=*/R"( +Expression: + Filter + MockExchangeReceiver)", + {toNullableVec({"banana"}), + toNullableVec({"banana"})}); +} +CATCH + +TEST_F(PhysicalPlanTestRunner, Limit) +try +{ + auto request = context.receive("exchange1") + .limit(1) + .build(context); + + execute( + request, + /*expected_physical_plan=*/R"( + | is_record_profile_streams: true, schema: , + | is_record_profile_streams: true, schema: , )", + /*expected_streams=*/R"( +Limit, limit = 1 + MockExchangeReceiver)", + {toNullableVec({"banana"}), + toNullableVec({"apple"})}); +} +CATCH + +TEST_F(PhysicalPlanTestRunner, TopN) +try +{ + auto request = context.receive("exchange1") + .topN("s2", false, 1) + .project({col("s1"), col("s2")}) + .build(context); + + execute( + request, + /*expected_physical_plan=*/R"( + | is_record_profile_streams: true, schema: , + | is_record_profile_streams: true, schema: , + | is_record_profile_streams: true, schema: , )", + /*expected_streams=*/R"( +Expression: + MergeSorting, limit = 1 + PartialSorting: limit = 1 + MockExchangeReceiver)", + {toNullableVec({{}}), + toNullableVec({{}})}); +} +CATCH + +// agg's schema = agg funcs + agg group bys +TEST_F(PhysicalPlanTestRunner, Aggregation) +try +{ + auto request = context.receive("exchange1") + .aggregation(Max(col("s2")), col("s1")) + .build(context); + + execute( + request, + /*expected_physical_plan=*/R"( + | is_record_profile_streams: false, schema: , + | is_record_profile_streams: true, schema: , )", + /*expected_streams=*/R"( +Expression: + Aggregating + Concat + MockExchangeReceiver)", + {toNullableVec({{}, "banana"}), + toNullableVec({{}, "banana"})}); +} +CATCH + +TEST_F(PhysicalPlanTestRunner, Projection) +try +{ + auto request = context.receive("exchange1") + .project({concat(col("s1"), col("s2"))}) + .build(context); + + execute( + request, + /*expected_physical_plan=*/R"( + | is_record_profile_streams: true, schema: + | is_record_profile_streams: true, schema: , )", + /*expected_streams=*/R"( +Expression: + MockExchangeReceiver)", + {toNullableVec({"bananaapple", {}, "bananabanana"})}); +} +CATCH + +TEST_F(PhysicalPlanTestRunner, MockExchangeSender) +try +{ + auto request = context.receive("exchange1") + .exchangeSender(tipb::Hash) + .build(context); + + execute( + request, + /*expected_physical_plan=*/R"( + | is_record_profile_streams: true, schema: , + | is_record_profile_streams: true, schema: , )", + /*expected_streams=*/R"( +MockExchangeSender + MockExchangeReceiver)", + {toNullableVec({"banana", {}, "banana"}), + toNullableVec({"apple", {}, "banana"})}); +} +CATCH + +TEST_F(PhysicalPlanTestRunner, MockExchangeReceiver) +try +{ + auto request = context.receive("exchange1") + .build(context); + + execute( + request, + /*expected_physical_plan=*/R"( + | is_record_profile_streams: true, schema: , )", + /*expected_streams=*/R"( +MockExchangeReceiver)", + {toNullableVec({"banana", {}, "banana"}), + toNullableVec({"apple", {}, "banana"})}); +} +CATCH + +} // namespace tests +} // namespace DB diff --git a/dbms/src/Flash/tests/gtest_executor.cpp b/dbms/src/Flash/tests/gtest_executor.cpp index 64c60f14bb6..49512b9271f 100644 --- a/dbms/src/Flash/tests/gtest_executor.cpp +++ b/dbms/src/Flash/tests/gtest_executor.cpp @@ -59,6 +59,16 @@ class ExecutorTestRunner : public DB::tests::ExecutorTest {toVec("s", {"banana", "banana"}), toVec("join_c", {"apple", "banana"})}); } + + void executeExecutor(const std::shared_ptr & request, const ColumnsWithTypeAndName & expect_columns, size_t concurrency = 1) + { + std::vector enable_planners{"true", "false"}; + for (auto enable : enable_planners) + { + context.context.setSetting("enable_planner", enable); + executeStreams(request, expect_columns, concurrency); + } + } }; TEST_F(ExecutorTestRunner, Filter) @@ -69,18 +79,18 @@ try .filter(eq(col("s1"), col("s2"))) .build(context); { - executeStreams(request, - {toNullableVec({"banana"}), - toNullableVec({"banana"})}); + executeExecutor(request, + {toNullableVec({"banana"}), + toNullableVec({"banana"})}); } request = context.receive("exchange1") .filter(eq(col("s1"), col("s2"))) .build(context); { - executeStreams(request, - {toNullableVec({"banana"}), - toNullableVec({"banana"})}); + executeExecutor(request, + {toNullableVec({"banana"}), + toNullableVec({"banana"})}); } } CATCH @@ -99,25 +109,25 @@ try " table_scan_0 | {<0, String>, <1, String>}\n" " table_scan_1 | {<0, String>, <1, String>}\n"; ASSERT_DAGREQUEST_EQAUL(expected, request); - executeStreams(request, - {toNullableVec({"banana", "banana"}), - toNullableVec({"apple", "banana"}), - toNullableVec({"banana", "banana"}), - toNullableVec({"apple", "banana"})}, - 2); - - executeStreams(request, - {toNullableVec({"banana", "banana"}), - toNullableVec({"apple", "banana"}), - toNullableVec({"banana", "banana"}), - toNullableVec({"apple", "banana"})}, - 5); - - executeStreams(request, - {toNullableVec({"banana", "banana"}), - toNullableVec({"apple", "banana"}), - toNullableVec({"banana", "banana"}), - toNullableVec({"apple", "banana"})}); + executeExecutor(request, + {toNullableVec({"banana", "banana"}), + toNullableVec({"apple", "banana"}), + toNullableVec({"banana", "banana"}), + toNullableVec({"apple", "banana"})}, + 2); + + executeExecutor(request, + {toNullableVec({"banana", "banana"}), + toNullableVec({"apple", "banana"}), + toNullableVec({"banana", "banana"}), + toNullableVec({"apple", "banana"})}, + 5); + + executeExecutor(request, + {toNullableVec({"banana", "banana"}), + toNullableVec({"apple", "banana"}), + toNullableVec({"banana", "banana"}), + toNullableVec({"apple", "banana"})}); } request = context .scan("test_db", "l_table") @@ -132,10 +142,10 @@ try " table_scan_0 | {<0, String>, <1, String>}\n" " table_scan_1 | {<0, String>, <1, String>}\n"; ASSERT_DAGREQUEST_EQAUL(expected, request); - executeStreams(request, - {toNullableVec({"banana", "banana"}), - toNullableVec({"apple", "banana"})}, - 2); + executeExecutor(request, + {toNullableVec({"banana", "banana"}), + toNullableVec({"apple", "banana"})}, + 2); } request = context @@ -149,18 +159,18 @@ try " table_scan_0 | {<0, String>, <1, String>}\n" " table_scan_1 | {<0, String>, <1, String>}\n"; ASSERT_DAGREQUEST_EQAUL(expected, request); - executeStreams(request, - {toNullableVec({"banana", "banana", "banana", "banana"}), - toNullableVec({"apple", "apple", "apple", "banana"}), - toNullableVec({"banana", "banana", "banana", {}}), - toNullableVec({"apple", "apple", "apple", {}})}, - 2); - executeStreams(request, - {toNullableVec({"banana", "banana", "banana", "banana"}), - toNullableVec({"apple", "apple", "apple", "banana"}), - toNullableVec({"banana", "banana", "banana", {}}), - toNullableVec({"apple", "apple", "apple", {}})}, - 3); + executeExecutor(request, + {toNullableVec({"banana", "banana", "banana", "banana"}), + toNullableVec({"apple", "apple", "apple", "banana"}), + toNullableVec({"banana", "banana", "banana", {}}), + toNullableVec({"apple", "apple", "apple", {}})}, + 2); + executeExecutor(request, + {toNullableVec({"banana", "banana", "banana", "banana"}), + toNullableVec({"apple", "apple", "apple", "banana"}), + toNullableVec({"banana", "banana", "banana", {}}), + toNullableVec({"apple", "apple", "apple", {}})}, + 3); } } CATCH @@ -179,25 +189,25 @@ try " exchange_receiver_0 | type:PassThrough, {<0, String>, <1, String>}\n" " exchange_receiver_1 | type:PassThrough, {<0, String>, <1, String>}\n"; ASSERT_DAGREQUEST_EQAUL(expected, request); - executeStreams(request, - {toNullableVec({"banana", "banana"}), - toNullableVec({"apple", "banana"}), - toNullableVec({"banana", "banana"}), - toNullableVec({"apple", "banana"})}, - 2); - - executeStreams(request, - {toNullableVec({"banana", "banana"}), - toNullableVec({"apple", "banana"}), - toNullableVec({"banana", "banana"}), - toNullableVec({"apple", "banana"})}, - 5); - - executeStreams(request, - {toNullableVec({"banana", "banana"}), - toNullableVec({"apple", "banana"}), - toNullableVec({"banana", "banana"}), - toNullableVec({"apple", "banana"})}); + executeExecutor(request, + {toNullableVec({"banana", "banana"}), + toNullableVec({"apple", "banana"}), + toNullableVec({"banana", "banana"}), + toNullableVec({"apple", "banana"})}, + 2); + + executeExecutor(request, + {toNullableVec({"banana", "banana"}), + toNullableVec({"apple", "banana"}), + toNullableVec({"banana", "banana"}), + toNullableVec({"apple", "banana"})}, + 5); + + executeExecutor(request, + {toNullableVec({"banana", "banana"}), + toNullableVec({"apple", "banana"}), + toNullableVec({"banana", "banana"}), + toNullableVec({"apple", "banana"})}); } } CATCH @@ -216,15 +226,15 @@ try " table_scan_0 | {<0, String>, <1, String>}\n" " exchange_receiver_1 | type:PassThrough, {<0, String>, <1, String>}\n"; ASSERT_DAGREQUEST_EQAUL(expected, request); - executeStreams(request, - {toNullableVec({"banana", "banana"}), - toNullableVec({"apple", "banana"}), - toNullableVec({"banana", "banana"}), - toNullableVec({"apple", "banana"})}, - 2); + executeExecutor(request, + {toNullableVec({"banana", "banana"}), + toNullableVec({"apple", "banana"}), + toNullableVec({"banana", "banana"}), + toNullableVec({"apple", "banana"})}, + 2); } } CATCH } // namespace tests -} // namespace DB \ No newline at end of file +} // namespace DB diff --git a/dbms/src/Flash/tests/gtest_planner_interpreter.cpp b/dbms/src/Flash/tests/gtest_planner_interpreter.cpp index acb5ae0d2c9..3840b858340 100644 --- a/dbms/src/Flash/tests/gtest_planner_interpreter.cpp +++ b/dbms/src/Flash/tests/gtest_planner_interpreter.cpp @@ -105,15 +105,12 @@ try Union: Expression x 10: Expression: - Expression: - Expression: - Expression: - Expression: + Expression: + Expression: + Expression: + Expression: Expression: - Expression: - Expression: - Expression: - MockTableScan)"; + MockTableScan)"; ASSERT_BLOCKINPUTSTREAM_EQAUL(expected, request, 10); } @@ -127,16 +124,14 @@ Union: Union: Expression x 10: Expression: - Expression: + Expression: SharedQuery: - Expression: - MergeSorting, limit = 10 - Union: - PartialSorting x 10: limit = 10 - Expression: - Expression: - Expression: - MockTableScan)"; + MergeSorting, limit = 10 + Union: + PartialSorting x 10: limit = 10 + Expression: + Expression: + MockTableScan)"; ASSERT_BLOCKINPUTSTREAM_EQAUL(expected, request, 10); } @@ -152,22 +147,19 @@ Union: Union: Expression x 10: Expression: - Expression: - Expression: + Expression: + Expression: SharedQuery: ParallelAggregating, max_threads: 10, final: true - Expression x 10: - Expression: - Expression: - SharedQuery: - Expression: - MergeSorting, limit = 10 - Union: - PartialSorting x 10: limit = 10 - Expression: - Expression: - Expression: - MockTableScan)"; + Expression x 10: + Expression: + SharedQuery: + MergeSorting, limit = 10 + Union: + PartialSorting x 10: limit = 10 + Expression: + Expression: + MockTableScan)"; ASSERT_BLOCKINPUTSTREAM_EQAUL(expected, request, 10); } @@ -184,33 +176,28 @@ Union: { String expected = R"( Union: - SharedQuery x 10: - Limit, limit = 10 - Union: - Limit x 10, limit = 10 - Expression: + Expression x 10: + SharedQuery: + Limit, limit = 10 + Union: + Limit x 10, limit = 10 Expression: - Expression: - Expression: - Expression: - Filter: - Expression: - Expression: - Expression: - SharedQuery: - ParallelAggregating, max_threads: 10, final: true - Expression x 10: - Expression: - Expression: - SharedQuery: - Expression: - MergeSorting, limit = 10 - Union: - PartialSorting x 10: limit = 10 - Expression: - Expression: - Expression: - MockTableScan)"; + Expression: + Filter + Expression: + Expression: + Expression: + SharedQuery: + ParallelAggregating, max_threads: 10, final: true + Expression x 10: + Expression: + SharedQuery: + MergeSorting, limit = 10 + Union: + PartialSorting x 10: limit = 10 + Expression: + Expression: + MockTableScan)"; ASSERT_BLOCKINPUTSTREAM_EQAUL(expected, request, 10); } @@ -265,15 +252,12 @@ CreatingSets Union: Expression x 10: Expression: - Expression: - Expression: - Expression: - Expression: + Expression: + Expression: + Expression: + Expression: Expression: - Expression: - Expression: - Expression: - MockExchangeReceiver)"; + MockExchangeReceiver)"; ASSERT_BLOCKINPUTSTREAM_EQAUL(expected, request, 10); } @@ -289,15 +273,12 @@ Union: MockExchangeSender x 10 Expression: Expression: - Expression: - Expression: - Expression: - Expression: + Expression: + Expression: + Expression: + Expression: Expression: - Expression: - Expression: - Expression: - MockExchangeReceiver)"; + MockExchangeReceiver)"; ASSERT_BLOCKINPUTSTREAM_EQAUL(expected, request, 10); } @@ -419,18 +400,16 @@ Union: String expected = R"( Expression: Expression: - Expression: - Expression: - MockTableScan)"; + Expression: + MockTableScan)"; ASSERT_BLOCKINPUTSTREAM_EQAUL(expected, request, 1); expected = R"( Union: Expression x 5: Expression: - Expression: - Expression: - MockTableScan)"; + Expression: + MockTableScan)"; ASSERT_BLOCKINPUTSTREAM_EQAUL(expected, request, 5); } @@ -442,8 +421,7 @@ Union: Expression: Aggregating Concat - Expression: - MockTableScan)"; + MockTableScan)"; ASSERT_BLOCKINPUTSTREAM_EQAUL(expected, request, 1); expected = R"( @@ -451,8 +429,7 @@ Union: Expression x 5: SharedQuery: ParallelAggregating, max_threads: 5, final: true - Expression x 5: - MockTableScan)"; + MockTableScan x 5)"; ASSERT_BLOCKINPUTSTREAM_EQAUL(expected, request, 5); } @@ -508,28 +485,27 @@ Union: String expected = R"( Union: Expression x 10: - SharedQuery: - ParallelAggregating, max_threads: 10, final: true - Expression x 10: - Expression: - Expression: - SharedQuery: - Limit, limit = 10 - Union: - Limit x 10, limit = 10 - Expression: - MockTableScan)"; + Expression: + SharedQuery: + ParallelAggregating, max_threads: 10, final: true + Expression x 10: + SharedQuery: + Limit, limit = 10 + Union: + Limit x 10, limit = 10 + Expression: + MockTableScan)"; ASSERT_BLOCKINPUTSTREAM_EQAUL(expected, request, 10); - expected = R"(Expression: - Aggregating - Concat - Expression: + expected = R"( +Expression: + Expression: + Aggregating + Concat Expression: - Expression: - Limit, limit = 10 - Expression: - MockTableScan)"; + Limit, limit = 10 + Expression: + MockTableScan)"; ASSERT_BLOCKINPUTSTREAM_EQAUL(expected, request, 1); } @@ -542,30 +518,28 @@ Union: String expected = R"( Union: Expression x 10: - SharedQuery: - ParallelAggregating, max_threads: 10, final: true - Expression x 10: - Expression: - Expression: - SharedQuery: - Expression: - MergeSorting, limit = 10 - Union: - PartialSorting x 10: limit = 10 - MockTableScan)"; + Expression: + SharedQuery: + ParallelAggregating, max_threads: 10, final: true + Expression x 10: + SharedQuery: + Expression: + MergeSorting, limit = 10 + Union: + PartialSorting x 10: limit = 10 + MockTableScan)"; ASSERT_BLOCKINPUTSTREAM_EQAUL(expected, request, 10); expected = R"( Expression: - Aggregating - Concat - Expression: + Expression: + Aggregating + Concat Expression: - Expression: - Expression: - MergeSorting, limit = 10 - PartialSorting: limit = 10 - MockTableScan)"; + Expression: + MergeSorting, limit = 10 + PartialSorting: limit = 10 + MockTableScan)"; ASSERT_BLOCKINPUTSTREAM_EQAUL(expected, request, 1); } @@ -578,30 +552,26 @@ Expression: String expected = R"( Union: Expression x 10: - SharedQuery: - ParallelAggregating, max_threads: 10, final: true - Expression x 10: - Expression: - Expression: - Expression: - SharedQuery: - ParallelAggregating, max_threads: 10, final: true - Expression x 10: - MockTableScan)"; + Expression: + SharedQuery: + ParallelAggregating, max_threads: 10, final: true + Expression x 10: + Expression: + SharedQuery: + ParallelAggregating, max_threads: 10, final: true + MockTableScan x 10)"; ASSERT_BLOCKINPUTSTREAM_EQAUL(expected, request, 10); expected = R"( Expression: - Aggregating - Concat - Expression: + Expression: + Aggregating + Concat Expression: - Expression: - Expression: - Aggregating - Concat - Expression: - MockTableScan)"; + Expression: + Aggregating + Concat + MockTableScan)"; ASSERT_BLOCKINPUTSTREAM_EQAUL(expected, request, 1); } @@ -616,8 +586,7 @@ Union: Expression: SharedQuery: ParallelAggregating, max_threads: 10, final: true - Expression x 10: - MockTableScan)"; + MockTableScan x 10)"; ASSERT_BLOCKINPUTSTREAM_EQAUL(expected, request, 10); expected = R"( @@ -625,8 +594,7 @@ MockExchangeSender Expression: Aggregating Concat - Expression: - MockTableScan)"; + MockTableScan)"; ASSERT_BLOCKINPUTSTREAM_EQAUL(expected, request, 1); } @@ -706,4 +674,4 @@ CreatingSets CATCH } // namespace tests -} // namespace DB \ No newline at end of file +} // namespace DB diff --git a/dbms/src/Flash/tests/gtest_qb_interpreter.cpp b/dbms/src/Flash/tests/gtest_qb_interpreter.cpp index c8ac422fdb3..9c4c15857d3 100644 --- a/dbms/src/Flash/tests/gtest_qb_interpreter.cpp +++ b/dbms/src/Flash/tests/gtest_qb_interpreter.cpp @@ -105,15 +105,12 @@ try Union: Expression x 10: Expression: - Expression: - Expression: - Expression: - Expression: + Expression: + Expression: + Expression: + Expression: Expression: - Expression: - Expression: - Expression: - MockTableScan)"; + MockTableScan)"; ASSERT_BLOCKINPUTSTREAM_EQAUL(expected, request, 10); } @@ -127,16 +124,14 @@ Union: Union: Expression x 10: Expression: - Expression: - SharedQuery: - Expression: - MergeSorting, limit = 10 - Union: - PartialSorting x 10: limit = 10 - Expression: - Expression: - Expression: - MockTableScan)"; + SharedQuery: + Expression: + MergeSorting, limit = 10 + Union: + PartialSorting x 10: limit = 10 + Expression: + Expression: + MockTableScan)"; ASSERT_BLOCKINPUTSTREAM_EQAUL(expected, request, 10); } @@ -152,22 +147,18 @@ Union: Union: Expression x 10: Expression: - Expression: - Expression: - SharedQuery: - ParallelAggregating, max_threads: 10, final: true - Expression x 10: - Expression: - Expression: - SharedQuery: - Expression: - MergeSorting, limit = 10 - Union: - PartialSorting x 10: limit = 10 - Expression: - Expression: - Expression: - MockTableScan)"; + Expression: + SharedQuery: + ParallelAggregating, max_threads: 10, final: true + Expression x 10: + SharedQuery: + Expression: + MergeSorting, limit = 10 + Union: + PartialSorting x 10: limit = 10 + Expression: + Expression: + MockTableScan)"; ASSERT_BLOCKINPUTSTREAM_EQAUL(expected, request, 10); } @@ -190,27 +181,22 @@ Union: Limit x 10, limit = 10 Expression: Expression: - Expression: - Expression: - Expression: - Filter: - Expression: - Expression: - Expression: - SharedQuery: - ParallelAggregating, max_threads: 10, final: true - Expression x 10: - Expression: - Expression: - SharedQuery: - Expression: - MergeSorting, limit = 10 - Union: - PartialSorting x 10: limit = 10 - Expression: - Expression: - Expression: - MockTableScan)"; + Expression: + Expression: + Filter: + Expression: + Expression: + SharedQuery: + ParallelAggregating, max_threads: 10, final: true + Expression x 10: + SharedQuery: + Expression: + MergeSorting, limit = 10 + Union: + PartialSorting x 10: limit = 10 + Expression: + Expression: + MockTableScan)"; ASSERT_BLOCKINPUTSTREAM_EQAUL(expected, request, 10); } @@ -265,15 +251,12 @@ CreatingSets Union: Expression x 10: Expression: - Expression: - Expression: - Expression: - Expression: + Expression: + Expression: + Expression: + Expression: Expression: - Expression: - Expression: - Expression: - MockExchangeReceiver)"; + MockExchangeReceiver)"; ASSERT_BLOCKINPUTSTREAM_EQAUL(expected, request, 10); } @@ -289,15 +272,12 @@ Union: MockExchangeSender x 10 Expression: Expression: - Expression: - Expression: - Expression: - Expression: + Expression: + Expression: + Expression: + Expression: Expression: - Expression: - Expression: - Expression: - MockExchangeReceiver)"; + MockExchangeReceiver)"; ASSERT_BLOCKINPUTSTREAM_EQAUL(expected, request, 10); } @@ -419,18 +399,16 @@ Union: String expected = R"( Expression: Expression: - Expression: - Expression: - MockTableScan)"; + Expression: + MockTableScan)"; ASSERT_BLOCKINPUTSTREAM_EQAUL(expected, request, 1); expected = R"( Union: Expression x 5: Expression: - Expression: - Expression: - MockTableScan)"; + Expression: + MockTableScan)"; ASSERT_BLOCKINPUTSTREAM_EQAUL(expected, request, 5); } @@ -442,8 +420,7 @@ Union: Expression: Aggregating Concat - Expression: - MockTableScan)"; + MockTableScan)"; ASSERT_BLOCKINPUTSTREAM_EQAUL(expected, request, 1); expected = R"( @@ -451,8 +428,7 @@ Union: Expression x 5: SharedQuery: ParallelAggregating, max_threads: 5, final: true - Expression x 5: - MockTableScan)"; + MockTableScan x 5)"; ASSERT_BLOCKINPUTSTREAM_EQAUL(expected, request, 5); } @@ -510,26 +486,22 @@ Union: Expression x 10: SharedQuery: ParallelAggregating, max_threads: 10, final: true - Expression x 10: - Expression: - Expression: - SharedQuery: - Limit, limit = 10 - Union: - Limit x 10, limit = 10 - Expression: - MockTableScan)"; + Expression x 10: + SharedQuery: + Limit, limit = 10 + Union: + Limit x 10, limit = 10 + Expression: + MockTableScan)"; ASSERT_BLOCKINPUTSTREAM_EQAUL(expected, request, 10); expected = R"(Expression: Aggregating Concat - Expression: - Expression: - Expression: - Limit, limit = 10 - Expression: - MockTableScan)"; + Expression: + Limit, limit = 10 + Expression: + MockTableScan)"; ASSERT_BLOCKINPUTSTREAM_EQAUL(expected, request, 1); } @@ -544,28 +516,24 @@ Union: Expression x 10: SharedQuery: ParallelAggregating, max_threads: 10, final: true - Expression x 10: - Expression: - Expression: - SharedQuery: - Expression: - MergeSorting, limit = 10 - Union: - PartialSorting x 10: limit = 10 - MockTableScan)"; + Expression x 10: + SharedQuery: + Expression: + MergeSorting, limit = 10 + Union: + PartialSorting x 10: limit = 10 + MockTableScan)"; ASSERT_BLOCKINPUTSTREAM_EQAUL(expected, request, 10); expected = R"( Expression: Aggregating Concat - Expression: - Expression: - Expression: - Expression: - MergeSorting, limit = 10 - PartialSorting: limit = 10 - MockTableScan)"; + Expression: + Expression: + MergeSorting, limit = 10 + PartialSorting: limit = 10 + MockTableScan)"; ASSERT_BLOCKINPUTSTREAM_EQAUL(expected, request, 1); } @@ -580,28 +548,22 @@ Union: Expression x 10: SharedQuery: ParallelAggregating, max_threads: 10, final: true - Expression x 10: - Expression: - Expression: - Expression: - SharedQuery: - ParallelAggregating, max_threads: 10, final: true - Expression x 10: - MockTableScan)"; + Expression x 10: + Expression: + SharedQuery: + ParallelAggregating, max_threads: 10, final: true + MockTableScan x 10)"; ASSERT_BLOCKINPUTSTREAM_EQAUL(expected, request, 10); expected = R"( Expression: Aggregating Concat - Expression: - Expression: - Expression: - Expression: - Aggregating - Concat - Expression: - MockTableScan)"; + Expression: + Expression: + Aggregating + Concat + MockTableScan)"; ASSERT_BLOCKINPUTSTREAM_EQAUL(expected, request, 1); } @@ -616,8 +578,7 @@ Union: Expression: SharedQuery: ParallelAggregating, max_threads: 10, final: true - Expression x 10: - MockTableScan)"; + MockTableScan x 10)"; ASSERT_BLOCKINPUTSTREAM_EQAUL(expected, request, 10); expected = R"( @@ -625,8 +586,7 @@ MockExchangeSender Expression: Aggregating Concat - Expression: - MockTableScan)"; + MockTableScan)"; ASSERT_BLOCKINPUTSTREAM_EQAUL(expected, request, 1); } @@ -706,4 +666,4 @@ CreatingSets CATCH } // namespace tests -} // namespace DB \ No newline at end of file +} // namespace DB diff --git a/dbms/src/TestUtils/ExecutorTestUtils.cpp b/dbms/src/TestUtils/ExecutorTestUtils.cpp index 67a21d12286..91c1430f7a0 100644 --- a/dbms/src/TestUtils/ExecutorTestUtils.cpp +++ b/dbms/src/TestUtils/ExecutorTestUtils.cpp @@ -94,8 +94,9 @@ Block mergeBlocks(Blocks blocks) actual_columns.push_back({std::move(actual_cols[i]), sample_block.getColumnsWithTypeAndName()[i].type, sample_block.getColumnsWithTypeAndName()[i].name, sample_block.getColumnsWithTypeAndName()[i].column_id}); return Block(actual_columns); } +} // namespace -void readBlock(BlockInputStreamPtr stream, const ColumnsWithTypeAndName & expect_columns) +void ExecutorTest::readAndAssertBlock(BlockInputStreamPtr stream, const ColumnsWithTypeAndName & expect_columns) { Blocks actual_blocks; Block except_block(expect_columns); @@ -108,7 +109,6 @@ void readBlock(BlockInputStreamPtr stream, const ColumnsWithTypeAndName & expect Block actual_block = mergeBlocks(actual_blocks); ASSERT_BLOCK_EQ(except_block, actual_block); } -} // namespace void ExecutorTest::executeStreams(const std::shared_ptr & request, std::unordered_map & source_columns_map, const ColumnsWithTypeAndName & expect_columns, size_t concurrency) { @@ -117,7 +117,7 @@ void ExecutorTest::executeStreams(const std::shared_ptr & requ context.context.setDAGContext(&dag_context); // Currently, don't care about regions information in tests. DAGQuerySource dag(context.context); - readBlock(executeQuery(dag, context.context, false, QueryProcessingStage::Complete).in, expect_columns); + readAndAssertBlock(executeQuery(dag, context.context, false, QueryProcessingStage::Complete).in, expect_columns); } void ExecutorTest::executeStreams(const std::shared_ptr & request, const ColumnsWithTypeAndName & expect_columns, size_t concurrency) diff --git a/dbms/src/TestUtils/ExecutorTestUtils.h b/dbms/src/TestUtils/ExecutorTestUtils.h index 977b46abbd2..56a07085e50 100644 --- a/dbms/src/TestUtils/ExecutorTestUtils.h +++ b/dbms/src/TestUtils/ExecutorTestUtils.h @@ -86,6 +86,8 @@ class ExecutorTest : public ::testing::Test return createColumn(v, name); } + static void readAndAssertBlock(BlockInputStreamPtr stream, const ColumnsWithTypeAndName & expect_columns); + protected: MockDAGRequestContext context; std::unique_ptr dag_context_ptr; diff --git a/dbms/src/TestUtils/mockExecutor.h b/dbms/src/TestUtils/mockExecutor.h index 88d98158b74..95551cdfc9e 100644 --- a/dbms/src/TestUtils/mockExecutor.h +++ b/dbms/src/TestUtils/mockExecutor.h @@ -166,6 +166,7 @@ ASTPtr buildOrderByItemList(MockOrderByItems order_by_items); #define col(name) buildColumn((name)) #define lit(field) buildLiteral((field)) +#define concat(expr1, expr2) makeASTFunction("concat", (expr1), (expr2)) #define eq(expr1, expr2) makeASTFunction("equals", (expr1), (expr2)) #define Not_eq(expr1, expr2) makeASTFunction("notEquals", (expr1), (expr2)) #define lt(expr1, expr2) makeASTFunction("less", (expr1), (expr2)) diff --git a/tests/fullstack-test/mpp/mpp_fail.test b/tests/fullstack-test/mpp/mpp_fail.test index 02259a90681..7af5fef3f89 100644 --- a/tests/fullstack-test/mpp/mpp_fail.test +++ b/tests/fullstack-test/mpp/mpp_fail.test @@ -21,6 +21,7 @@ mysql> insert into test.t values(1,'a'),(2,'b'),(3,'c') mysql> alter table test.t set tiflash replica 1 func> wait_table test t +mysql> analyze table test.t # Data.