Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[WIP] refine executeAggregation and handleExchangeSender #4629

Closed
wants to merge 13 commits into from
113 changes: 113 additions & 0 deletions dbms/src/Flash/Coprocessor/AggregationInterpreterHelper.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,113 @@
// Copyright 2022 PingCAP, Ltd.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

#include <Common/TiFlashException.h>
#include <Core/ColumnNumbers.h>
#include <Flash/Coprocessor/AggregationInterpreterHelper.h>
#include <Flash/Coprocessor/DAGContext.h>

namespace DB
{
namespace
{
bool isFinalAggMode(const tipb::Expr & expr)
{
if (!expr.has_aggfuncmode())
/// set default value to true to make it compatible with old version of TiDB since before this
/// change, all the aggregation in TiFlash is treated as final aggregation
return true;
return expr.aggfuncmode() == tipb::AggFunctionMode::FinalMode || expr.aggfuncmode() == tipb::AggFunctionMode::CompleteMode;
}
} // namespace

Aggregator::Params AggregationInterpreterHelper::buildAggregatorParams(
const Context & context,
const Block & before_agg_header,
size_t before_agg_streams_size,
const Names & key_names,
const TiDB::TiDBCollators & collators,
const AggregateDescriptions & aggregate_descriptions,
bool is_final_agg)
{
ColumnNumbers keys;
for (const auto & name : key_names)
{
keys.push_back(before_agg_header.getPositionByName(name));
}

const Settings & settings = context.getSettingsRef();

/** Two-level aggregation is useful in two cases:
* 1. Parallel aggregation is done, and the results should be merged in parallel.
* 2. An aggregation is done with store of temporary data on the disk, and they need to be merged in a memory efficient way.
*/
bool allow_to_use_two_level_group_by = before_agg_streams_size > 1 || settings.max_bytes_before_external_group_by != 0;
bool has_collator = std::any_of(begin(collators), end(collators), [](const auto & p) { return p != nullptr; });

return Aggregator::Params(
before_agg_header,
keys,
aggregate_descriptions,
false,
settings.max_rows_to_group_by,
settings.group_by_overflow_mode,
allow_to_use_two_level_group_by ? settings.group_by_two_level_threshold : SettingUInt64(0),
allow_to_use_two_level_group_by ? settings.group_by_two_level_threshold_bytes : SettingUInt64(0),
settings.max_bytes_before_external_group_by,
!is_final_agg,
context.getTemporaryPath(),
has_collator ? collators : TiDB::dummy_collators);
}

void AggregationInterpreterHelper::fillArgColumnNumbers(AggregateDescriptions & aggregate_descriptions, const Block & before_agg_header)
{
for (auto & descr : aggregate_descriptions)
{
if (descr.arguments.empty())
{
for (const auto & name : descr.argument_names)
{
descr.arguments.push_back(before_agg_header.getPositionByName(name));
}
}
}
}

bool AggregationInterpreterHelper::isFinalAgg(const tipb::Aggregation & aggregation)
{
/// set default value to true to make it compatible with old version of TiDB since before this
/// change, all the aggregation in TiFlash is treated as final aggregation
bool is_final_agg = true;
if (aggregation.agg_func_size() > 0 && !isFinalAggMode(aggregation.agg_func(0)))
{
is_final_agg = false;
}

for (int i = 1; i < aggregation.agg_func_size(); ++i)
{
if (is_final_agg != isFinalAggMode(aggregation.agg_func(i)))
throw TiFlashException("Different aggregation mode detected", Errors::Coprocessor::BadRequest);
}
return is_final_agg;
}

bool AggregationInterpreterHelper::isGroupByCollationSensitive(const Context & context)
{
// todo now we can tell if the aggregation is final stage or partial stage,
// maybe we can do collation insensitive aggregation if the stage is partial.

/// collation sensitive group by is slower than normal group by, use normal group by by default
return context.getSettingsRef().group_by_collation_sensitive || context.getDAGContext()->isMPPTask();
}
} // namespace DB
45 changes: 45 additions & 0 deletions dbms/src/Flash/Coprocessor/AggregationInterpreterHelper.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
// Copyright 2022 PingCAP, Ltd.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

#pragma once

#include <Core/Block.h>
#include <Core/Names.h>
#include <Interpreters/AggregateDescription.h>
#include <Interpreters/Aggregator.h>
#include <Interpreters/Context.h>
#include <Storages/Transaction/Collator.h>
#include <tipb/executor.pb.h>

namespace DB
{
struct AggregationInterpreterHelper
{
static Aggregator::Params buildAggregatorParams(
const Context & context,
const Block & before_agg_header,
size_t before_agg_streams_size,
const Names & key_names,
const TiDB::TiDBCollators & collators,
const AggregateDescriptions & aggregate_descriptions,
bool is_final_agg);

// get and fill the column number of agg func's arguments to aggregate_descriptions.
static void fillArgColumnNumbers(AggregateDescriptions & aggregate_descriptions, const Block & before_agg_header);

static bool isFinalAgg(const tipb::Aggregation & aggregation);

static bool isGroupByCollationSensitive(const Context & context);
};
} // namespace DB
127 changes: 29 additions & 98 deletions dbms/src/Flash/Coprocessor/DAGQueryBlockInterpreter.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -31,10 +31,11 @@
#include <DataTypes/DataTypeNullable.h>
#include <DataTypes/DataTypesNumber.h>
#include <DataTypes/getLeastSupertype.h>
#include <Flash/Coprocessor/DAGCodec.h>
#include <Flash/Coprocessor/AggregationInterpreterHelper.h>
#include <Flash/Coprocessor/DAGExpressionAnalyzer.h>
#include <Flash/Coprocessor/DAGQueryBlockInterpreter.h>
#include <Flash/Coprocessor/DAGUtils.h>
#include <Flash/Coprocessor/ExchangeSenderInterpreterHelper.h>
#include <Flash/Coprocessor/InterpreterUtils.h>
#include <Flash/Coprocessor/StreamingDAGResponseWriter.h>
#include <Flash/Mpp/ExchangeReceiver.h>
Expand Down Expand Up @@ -103,15 +104,6 @@ bool addExtraCastsAfterTs(
return analyzer.appendExtraCastsAfterTS(chain, need_cast_column, table_scan);
}

bool isFinalAgg(const tipb::Expr & expr)
{
if (!expr.has_aggfuncmode())
/// set default value to true to make it compatible with old version of TiDB since before this
/// change, all the aggregation in TiFlash is treated as final aggregation
return true;
return expr.aggfuncmode() == tipb::AggFunctionMode::FinalMode || expr.aggfuncmode() == tipb::AggFunctionMode::CompleteMode;
}

AnalysisResult analyzeExpressions(
Context & context,
DAGExpressionAnalyzer & analyzer,
Expand All @@ -133,27 +125,15 @@ AnalysisResult analyzeExpressions(
// There will be either Agg...
if (query_block.aggregation)
{
/// set default value to true to make it compatible with old version of TiDB since before this
/// change, all the aggregation in TiFlash is treated as final aggregation
res.is_final_agg = true;
const auto & aggregation = query_block.aggregation->aggregation();
if (aggregation.agg_func_size() > 0 && !isFinalAgg(aggregation.agg_func(0)))
res.is_final_agg = false;
for (int i = 1; i < aggregation.agg_func_size(); i++)
{
if (res.is_final_agg != isFinalAgg(aggregation.agg_func(i)))
throw TiFlashException("Different aggregation mode detected", Errors::Coprocessor::BadRequest);
}
// todo now we can tell if the aggregation is final stage or partial stage, maybe we can do collation insensitive
// aggregation if the stage is partial
bool group_by_collation_sensitive =
/// collation sensitive group by is slower than normal group by, use normal group by by default
context.getSettingsRef().group_by_collation_sensitive || context.getDAGContext()->isMPPTask();
res.is_final_agg = AggregationInterpreterHelper::isFinalAgg(query_block.aggregation->aggregation());

std::tie(res.aggregation_keys, res.aggregation_collators, res.aggregate_descriptions, res.before_aggregation) = analyzer.appendAggregation(
chain,
query_block.aggregation->aggregation(),
group_by_collation_sensitive);
AggregationInterpreterHelper::isGroupByCollationSensitive(context));

// `res.before_aggregation` has called `finalize`, get column index in here is safe.
AggregationInterpreterHelper::fillArgColumnNumbers(res.aggregate_descriptions, res.before_aggregation->getSampleBlock());

if (query_block.having != nullptr)
{
Expand Down Expand Up @@ -760,56 +740,27 @@ void DAGQueryBlockInterpreter::executeWhere(DAGPipeline & pipeline, const Expres
void DAGQueryBlockInterpreter::executeAggregation(
DAGPipeline & pipeline,
const ExpressionActionsPtr & expression_actions_ptr,
Names & key_names,
TiDB::TiDBCollators & collators,
AggregateDescriptions & aggregate_descriptions,
const Names & key_names,
const TiDB::TiDBCollators & collators,
const AggregateDescriptions & aggregate_descriptions,
bool is_final_agg)
{
pipeline.transform([&](auto & stream) { stream = std::make_shared<ExpressionBlockInputStream>(stream, expression_actions_ptr, log->identifier()); });

Block header = pipeline.firstStream()->getHeader();
ColumnNumbers keys;
for (const auto & name : key_names)
{
keys.push_back(header.getPositionByName(name));
}
for (auto & descr : aggregate_descriptions)
{
if (descr.arguments.empty())
{
for (const auto & name : descr.argument_names)
{
descr.arguments.push_back(header.getPositionByName(name));
}
}
}

const Settings & settings = context.getSettingsRef();

/** Two-level aggregation is useful in two cases:
* 1. Parallel aggregation is done, and the results should be merged in parallel.
* 2. An aggregation is done with store of temporary data on the disk, and they need to be merged in a memory efficient way.
*/
bool allow_to_use_two_level_group_by = pipeline.streams.size() > 1 || settings.max_bytes_before_external_group_by != 0;
bool has_collator = std::any_of(begin(collators), end(collators), [](const auto & p) { return p != nullptr; });

Aggregator::Params params(
header,
keys,
assert(pipeline.hasMoreThanOneStream());
Aggregator::Params params = AggregationInterpreterHelper::buildAggregatorParams(
context,
pipeline.firstStream()->getHeader(),
pipeline.streams.size(),
key_names,
collators,
aggregate_descriptions,
false,
settings.max_rows_to_group_by,
settings.group_by_overflow_mode,
allow_to_use_two_level_group_by ? settings.group_by_two_level_threshold : SettingUInt64(0),
allow_to_use_two_level_group_by ? settings.group_by_two_level_threshold_bytes : SettingUInt64(0),
settings.max_bytes_before_external_group_by,
!is_final_agg,
context.getTemporaryPath(),
has_collator ? collators : TiDB::dummy_collators);
is_final_agg);

/// If there are several sources, then we perform parallel aggregation
if (pipeline.streams.size() > 1)
{
const Settings & settings = context.getSettingsRef();
BlockInputStreamPtr stream_with_non_joined_data = combinedNonJoinedDataStream(pipeline, max_streams, log);
pipeline.firstStream() = std::make_shared<ParallelAggregatingBlockInputStream>(
pipeline.streams,
Expand Down Expand Up @@ -843,7 +794,6 @@ void DAGQueryBlockInterpreter::executeAggregation(
log->identifier());
recordProfileStreams(pipeline, query_block.aggregation_name);
}
// add cast
}

void DAGQueryBlockInterpreter::executeExpression(DAGPipeline & pipeline, const ExpressionActionsPtr & expressionActionsPtr)
Expand Down Expand Up @@ -1146,40 +1096,21 @@ void DAGQueryBlockInterpreter::handleExchangeSender(DAGPipeline & pipeline)
assert(dagContext().isMPPTask() && dagContext().tunnel_set != nullptr);
/// exchange sender should be at the top of operators
const auto & exchange_sender = query_block.exchange_sender->exchange_sender();
/// get partition column ids
const auto & part_keys = exchange_sender.partition_keys();
std::vector<Int64> partition_col_id;
TiDB::TiDBCollators collators;
/// in case TiDB is an old version, it has no collation info
bool has_collator_info = exchange_sender.types_size() != 0;
if (has_collator_info && part_keys.size() != exchange_sender.types_size())
{
throw TiFlashException(
std::string(__PRETTY_FUNCTION__) + ": Invalid plan, in ExchangeSender, the length of partition_keys and types is not the same when TiDB new collation is enabled",
Errors::Coprocessor::BadRequest);
}
for (int i = 0; i < part_keys.size(); ++i)
{
const auto & expr = part_keys[i];
assert(isColumnExpr(expr));
auto column_index = decodeDAGInt64(expr.val());
partition_col_id.emplace_back(column_index);
if (has_collator_info && removeNullable(getDataTypeByFieldTypeForComputingLayer(expr.field_type()))->isString())
{
collators.emplace_back(getCollatorFromFieldType(exchange_sender.types(i)));
}
else
{
collators.emplace_back(nullptr);
}
}

// Can't use auto [partition_col_ids, partition_col_collators],
// because of `Structured bindings cannot be captured by lambda expressions. (until C++20)`
// https://en.cppreference.com/w/cpp/language/structured_binding
std::vector<Int64> partition_col_ids;
TiDB::TiDBCollators partition_col_collators;
std::tie(partition_col_ids, partition_col_collators) = ExchangeSenderInterpreterHelper::genPartitionColIdsAndCollators(exchange_sender, log);

int stream_id = 0;
pipeline.transform([&](auto & stream) {
// construct writer
std::unique_ptr<DAGResponseWriter> response_writer = std::make_unique<StreamingDAGResponseWriter<MPPTunnelSetPtr>>(
context.getDAGContext()->tunnel_set,
partition_col_id,
collators,
partition_col_ids,
partition_col_collators,
exchange_sender.tp(),
context.getSettingsRef().dag_records_per_chunk,
context.getSettingsRef().batch_send_min_limit,
Expand Down
6 changes: 3 additions & 3 deletions dbms/src/Flash/Coprocessor/DAGQueryBlockInterpreter.h
Original file line number Diff line number Diff line change
Expand Up @@ -87,9 +87,9 @@ class DAGQueryBlockInterpreter
void executeAggregation(
DAGPipeline & pipeline,
const ExpressionActionsPtr & expression_actions_ptr,
Names & key_names,
TiDB::TiDBCollators & collators,
AggregateDescriptions & aggregate_descriptions,
const Names & key_names,
const TiDB::TiDBCollators & collators,
const AggregateDescriptions & aggregate_descriptions,
bool is_final_agg);
void executeProject(DAGPipeline & pipeline, NamesWithAliases & project_cols);
void handleExchangeSender(DAGPipeline & pipeline);
Expand Down
Loading