diff --git a/dbms/src/Flash/Coprocessor/AggregationInterpreterHelper.cpp b/dbms/src/Flash/Coprocessor/AggregationInterpreterHelper.cpp new file mode 100644 index 00000000000..c6b2687b2d3 --- /dev/null +++ b/dbms/src/Flash/Coprocessor/AggregationInterpreterHelper.cpp @@ -0,0 +1,113 @@ +// Copyright 2022 PingCAP, Ltd. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#include +#include +#include +#include + +namespace DB +{ +namespace +{ +bool isFinalAggMode(const tipb::Expr & expr) +{ + if (!expr.has_aggfuncmode()) + /// set default value to true to make it compatible with old version of TiDB since before this + /// change, all the aggregation in TiFlash is treated as final aggregation + return true; + return expr.aggfuncmode() == tipb::AggFunctionMode::FinalMode || expr.aggfuncmode() == tipb::AggFunctionMode::CompleteMode; +} +} // namespace + +Aggregator::Params AggregationInterpreterHelper::buildAggregatorParams( + const Context & context, + const Block & before_agg_header, + size_t before_agg_streams_size, + const Names & key_names, + const TiDB::TiDBCollators & collators, + const AggregateDescriptions & aggregate_descriptions, + bool is_final_agg) +{ + ColumnNumbers keys; + for (const auto & name : key_names) + { + keys.push_back(before_agg_header.getPositionByName(name)); + } + + const Settings & settings = context.getSettingsRef(); + + /** Two-level aggregation is useful in two cases: + * 1. Parallel aggregation is done, and the results should be merged in parallel. + * 2. An aggregation is done with store of temporary data on the disk, and they need to be merged in a memory efficient way. + */ + bool allow_to_use_two_level_group_by = before_agg_streams_size > 1 || settings.max_bytes_before_external_group_by != 0; + bool has_collator = std::any_of(begin(collators), end(collators), [](const auto & p) { return p != nullptr; }); + + return Aggregator::Params( + before_agg_header, + keys, + aggregate_descriptions, + false, + settings.max_rows_to_group_by, + settings.group_by_overflow_mode, + allow_to_use_two_level_group_by ? settings.group_by_two_level_threshold : SettingUInt64(0), + allow_to_use_two_level_group_by ? settings.group_by_two_level_threshold_bytes : SettingUInt64(0), + settings.max_bytes_before_external_group_by, + !is_final_agg, + context.getTemporaryPath(), + has_collator ? collators : TiDB::dummy_collators); +} + +void AggregationInterpreterHelper::fillArgColumnNumbers(AggregateDescriptions & aggregate_descriptions, const Block & before_agg_header) +{ + for (auto & descr : aggregate_descriptions) + { + if (descr.arguments.empty()) + { + for (const auto & name : descr.argument_names) + { + descr.arguments.push_back(before_agg_header.getPositionByName(name)); + } + } + } +} + +bool AggregationInterpreterHelper::isFinalAgg(const tipb::Aggregation & aggregation) +{ + /// set default value to true to make it compatible with old version of TiDB since before this + /// change, all the aggregation in TiFlash is treated as final aggregation + bool is_final_agg = true; + if (aggregation.agg_func_size() > 0 && !isFinalAggMode(aggregation.agg_func(0))) + { + is_final_agg = false; + } + + for (int i = 1; i < aggregation.agg_func_size(); ++i) + { + if (is_final_agg != isFinalAggMode(aggregation.agg_func(i))) + throw TiFlashException("Different aggregation mode detected", Errors::Coprocessor::BadRequest); + } + return is_final_agg; +} + +bool AggregationInterpreterHelper::isGroupByCollationSensitive(const Context & context) +{ + // todo now we can tell if the aggregation is final stage or partial stage, + // maybe we can do collation insensitive aggregation if the stage is partial. + + /// collation sensitive group by is slower than normal group by, use normal group by by default + return context.getSettingsRef().group_by_collation_sensitive || context.getDAGContext()->isMPPTask(); +} +} // namespace DB diff --git a/dbms/src/Flash/Coprocessor/AggregationInterpreterHelper.h b/dbms/src/Flash/Coprocessor/AggregationInterpreterHelper.h new file mode 100644 index 00000000000..b99ecce83d0 --- /dev/null +++ b/dbms/src/Flash/Coprocessor/AggregationInterpreterHelper.h @@ -0,0 +1,45 @@ +// Copyright 2022 PingCAP, Ltd. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#pragma once + +#include +#include +#include +#include +#include +#include +#include + +namespace DB +{ +struct AggregationInterpreterHelper +{ + static Aggregator::Params buildAggregatorParams( + const Context & context, + const Block & before_agg_header, + size_t before_agg_streams_size, + const Names & key_names, + const TiDB::TiDBCollators & collators, + const AggregateDescriptions & aggregate_descriptions, + bool is_final_agg); + + // get and fill the column number of agg func's arguments to aggregate_descriptions. + static void fillArgColumnNumbers(AggregateDescriptions & aggregate_descriptions, const Block & before_agg_header); + + static bool isFinalAgg(const tipb::Aggregation & aggregation); + + static bool isGroupByCollationSensitive(const Context & context); +}; +} // namespace DB diff --git a/dbms/src/Flash/Coprocessor/DAGQueryBlockInterpreter.cpp b/dbms/src/Flash/Coprocessor/DAGQueryBlockInterpreter.cpp index 51cd1bf671f..5d722b1f381 100644 --- a/dbms/src/Flash/Coprocessor/DAGQueryBlockInterpreter.cpp +++ b/dbms/src/Flash/Coprocessor/DAGQueryBlockInterpreter.cpp @@ -31,10 +31,11 @@ #include #include #include -#include +#include #include #include #include +#include #include #include #include @@ -103,15 +104,6 @@ bool addExtraCastsAfterTs( return analyzer.appendExtraCastsAfterTS(chain, need_cast_column, table_scan); } -bool isFinalAgg(const tipb::Expr & expr) -{ - if (!expr.has_aggfuncmode()) - /// set default value to true to make it compatible with old version of TiDB since before this - /// change, all the aggregation in TiFlash is treated as final aggregation - return true; - return expr.aggfuncmode() == tipb::AggFunctionMode::FinalMode || expr.aggfuncmode() == tipb::AggFunctionMode::CompleteMode; -} - AnalysisResult analyzeExpressions( Context & context, DAGExpressionAnalyzer & analyzer, @@ -133,27 +125,15 @@ AnalysisResult analyzeExpressions( // There will be either Agg... if (query_block.aggregation) { - /// set default value to true to make it compatible with old version of TiDB since before this - /// change, all the aggregation in TiFlash is treated as final aggregation - res.is_final_agg = true; - const auto & aggregation = query_block.aggregation->aggregation(); - if (aggregation.agg_func_size() > 0 && !isFinalAgg(aggregation.agg_func(0))) - res.is_final_agg = false; - for (int i = 1; i < aggregation.agg_func_size(); i++) - { - if (res.is_final_agg != isFinalAgg(aggregation.agg_func(i))) - throw TiFlashException("Different aggregation mode detected", Errors::Coprocessor::BadRequest); - } - // todo now we can tell if the aggregation is final stage or partial stage, maybe we can do collation insensitive - // aggregation if the stage is partial - bool group_by_collation_sensitive = - /// collation sensitive group by is slower than normal group by, use normal group by by default - context.getSettingsRef().group_by_collation_sensitive || context.getDAGContext()->isMPPTask(); + res.is_final_agg = AggregationInterpreterHelper::isFinalAgg(query_block.aggregation->aggregation()); std::tie(res.aggregation_keys, res.aggregation_collators, res.aggregate_descriptions, res.before_aggregation) = analyzer.appendAggregation( chain, query_block.aggregation->aggregation(), - group_by_collation_sensitive); + AggregationInterpreterHelper::isGroupByCollationSensitive(context)); + + // `res.before_aggregation` has called `finalize`, get column index in here is safe. + AggregationInterpreterHelper::fillArgColumnNumbers(res.aggregate_descriptions, res.before_aggregation->getSampleBlock()); if (query_block.having != nullptr) { @@ -760,56 +740,27 @@ void DAGQueryBlockInterpreter::executeWhere(DAGPipeline & pipeline, const Expres void DAGQueryBlockInterpreter::executeAggregation( DAGPipeline & pipeline, const ExpressionActionsPtr & expression_actions_ptr, - Names & key_names, - TiDB::TiDBCollators & collators, - AggregateDescriptions & aggregate_descriptions, + const Names & key_names, + const TiDB::TiDBCollators & collators, + const AggregateDescriptions & aggregate_descriptions, bool is_final_agg) { pipeline.transform([&](auto & stream) { stream = std::make_shared(stream, expression_actions_ptr, log->identifier()); }); - Block header = pipeline.firstStream()->getHeader(); - ColumnNumbers keys; - for (const auto & name : key_names) - { - keys.push_back(header.getPositionByName(name)); - } - for (auto & descr : aggregate_descriptions) - { - if (descr.arguments.empty()) - { - for (const auto & name : descr.argument_names) - { - descr.arguments.push_back(header.getPositionByName(name)); - } - } - } - - const Settings & settings = context.getSettingsRef(); - - /** Two-level aggregation is useful in two cases: - * 1. Parallel aggregation is done, and the results should be merged in parallel. - * 2. An aggregation is done with store of temporary data on the disk, and they need to be merged in a memory efficient way. - */ - bool allow_to_use_two_level_group_by = pipeline.streams.size() > 1 || settings.max_bytes_before_external_group_by != 0; - bool has_collator = std::any_of(begin(collators), end(collators), [](const auto & p) { return p != nullptr; }); - - Aggregator::Params params( - header, - keys, + assert(pipeline.hasMoreThanOneStream()); + Aggregator::Params params = AggregationInterpreterHelper::buildAggregatorParams( + context, + pipeline.firstStream()->getHeader(), + pipeline.streams.size(), + key_names, + collators, aggregate_descriptions, - false, - settings.max_rows_to_group_by, - settings.group_by_overflow_mode, - allow_to_use_two_level_group_by ? settings.group_by_two_level_threshold : SettingUInt64(0), - allow_to_use_two_level_group_by ? settings.group_by_two_level_threshold_bytes : SettingUInt64(0), - settings.max_bytes_before_external_group_by, - !is_final_agg, - context.getTemporaryPath(), - has_collator ? collators : TiDB::dummy_collators); + is_final_agg); /// If there are several sources, then we perform parallel aggregation if (pipeline.streams.size() > 1) { + const Settings & settings = context.getSettingsRef(); BlockInputStreamPtr stream_with_non_joined_data = combinedNonJoinedDataStream(pipeline, max_streams, log); pipeline.firstStream() = std::make_shared( pipeline.streams, @@ -843,7 +794,6 @@ void DAGQueryBlockInterpreter::executeAggregation( log->identifier()); recordProfileStreams(pipeline, query_block.aggregation_name); } - // add cast } void DAGQueryBlockInterpreter::executeExpression(DAGPipeline & pipeline, const ExpressionActionsPtr & expressionActionsPtr) @@ -1146,40 +1096,21 @@ void DAGQueryBlockInterpreter::handleExchangeSender(DAGPipeline & pipeline) assert(dagContext().isMPPTask() && dagContext().tunnel_set != nullptr); /// exchange sender should be at the top of operators const auto & exchange_sender = query_block.exchange_sender->exchange_sender(); - /// get partition column ids - const auto & part_keys = exchange_sender.partition_keys(); - std::vector partition_col_id; - TiDB::TiDBCollators collators; - /// in case TiDB is an old version, it has no collation info - bool has_collator_info = exchange_sender.types_size() != 0; - if (has_collator_info && part_keys.size() != exchange_sender.types_size()) - { - throw TiFlashException( - std::string(__PRETTY_FUNCTION__) + ": Invalid plan, in ExchangeSender, the length of partition_keys and types is not the same when TiDB new collation is enabled", - Errors::Coprocessor::BadRequest); - } - for (int i = 0; i < part_keys.size(); ++i) - { - const auto & expr = part_keys[i]; - assert(isColumnExpr(expr)); - auto column_index = decodeDAGInt64(expr.val()); - partition_col_id.emplace_back(column_index); - if (has_collator_info && removeNullable(getDataTypeByFieldTypeForComputingLayer(expr.field_type()))->isString()) - { - collators.emplace_back(getCollatorFromFieldType(exchange_sender.types(i))); - } - else - { - collators.emplace_back(nullptr); - } - } + + // Can't use auto [partition_col_ids, partition_col_collators], + // because of `Structured bindings cannot be captured by lambda expressions. (until C++20)` + // https://en.cppreference.com/w/cpp/language/structured_binding + std::vector partition_col_ids; + TiDB::TiDBCollators partition_col_collators; + std::tie(partition_col_ids, partition_col_collators) = ExchangeSenderInterpreterHelper::genPartitionColIdsAndCollators(exchange_sender, log); + int stream_id = 0; pipeline.transform([&](auto & stream) { // construct writer std::unique_ptr response_writer = std::make_unique>( context.getDAGContext()->tunnel_set, - partition_col_id, - collators, + partition_col_ids, + partition_col_collators, exchange_sender.tp(), context.getSettingsRef().dag_records_per_chunk, context.getSettingsRef().batch_send_min_limit, diff --git a/dbms/src/Flash/Coprocessor/DAGQueryBlockInterpreter.h b/dbms/src/Flash/Coprocessor/DAGQueryBlockInterpreter.h index 35627cd19ee..13bbfbb66c5 100644 --- a/dbms/src/Flash/Coprocessor/DAGQueryBlockInterpreter.h +++ b/dbms/src/Flash/Coprocessor/DAGQueryBlockInterpreter.h @@ -87,9 +87,9 @@ class DAGQueryBlockInterpreter void executeAggregation( DAGPipeline & pipeline, const ExpressionActionsPtr & expression_actions_ptr, - Names & key_names, - TiDB::TiDBCollators & collators, - AggregateDescriptions & aggregate_descriptions, + const Names & key_names, + const TiDB::TiDBCollators & collators, + const AggregateDescriptions & aggregate_descriptions, bool is_final_agg); void executeProject(DAGPipeline & pipeline, NamesWithAliases & project_cols); void handleExchangeSender(DAGPipeline & pipeline); diff --git a/dbms/src/Flash/Coprocessor/ExchangeSenderInterpreterHelper.cpp b/dbms/src/Flash/Coprocessor/ExchangeSenderInterpreterHelper.cpp new file mode 100644 index 00000000000..4ab0a1c4992 --- /dev/null +++ b/dbms/src/Flash/Coprocessor/ExchangeSenderInterpreterHelper.cpp @@ -0,0 +1,59 @@ +// Copyright 2022 PingCAP, Ltd. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#include +#include +#include +#include +#include +#include +#include +#include + +namespace DB::ExchangeSenderInterpreterHelper +{ +std::pair, TiDB::TiDBCollators> genPartitionColIdsAndCollators( + const tipb::ExchangeSender & exchange_sender, + const LoggerPtr & log) +{ + /// get partition column ids + const auto & part_keys = exchange_sender.partition_keys(); + std::vector partition_col_ids; + TiDB::TiDBCollators partition_col_collators; + /// in case TiDB is an old version, it has no collation info + bool has_collator_info = exchange_sender.types_size() != 0; + if (has_collator_info && part_keys.size() != exchange_sender.types_size()) + { + throw TiFlashException( + fmt::format("{}: Invalid plan, in ExchangeSender, the length of partition_keys and types is not the same when TiDB new collation is enabled", __PRETTY_FUNCTION__), + Errors::Coprocessor::BadRequest); + } + for (int i = 0; i < part_keys.size(); ++i) + { + const auto & expr = part_keys[i]; + RUNTIME_ASSERT(isColumnExpr(expr), log, "part_key of ExchangeSender must be column"); + auto column_index = decodeDAGInt64(expr.val()); + partition_col_ids.emplace_back(column_index); + if (has_collator_info && removeNullable(getDataTypeByFieldTypeForComputingLayer(expr.field_type()))->isString()) + { + partition_col_collators.emplace_back(getCollatorFromFieldType(exchange_sender.types(i))); + } + else + { + partition_col_collators.emplace_back(nullptr); + } + } + return {partition_col_ids, partition_col_collators}; +} +} // namespace DB::ExchangeSenderInterpreterHelper diff --git a/dbms/src/Flash/Coprocessor/ExchangeSenderInterpreterHelper.h b/dbms/src/Flash/Coprocessor/ExchangeSenderInterpreterHelper.h new file mode 100644 index 00000000000..e1cff75a83d --- /dev/null +++ b/dbms/src/Flash/Coprocessor/ExchangeSenderInterpreterHelper.h @@ -0,0 +1,27 @@ +// Copyright 2022 PingCAP, Ltd. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#pragma once + +#include +#include +#include +#include + +namespace DB::ExchangeSenderInterpreterHelper +{ +std::pair, TiDB::TiDBCollators> genPartitionColIdsAndCollators( + const tipb::ExchangeSender & exchange_sender, + const LoggerPtr & log); +} // namespace DB::ExchangeSenderInterpreterHelper