Skip to content

Commit

Permalink
Refine handle agg and exchange sender (#4721)
Browse files Browse the repository at this point in the history
ref #4118
  • Loading branch information
SeaRise authored Apr 25, 2022
1 parent d752d0e commit c751073
Show file tree
Hide file tree
Showing 6 changed files with 278 additions and 100 deletions.
117 changes: 117 additions & 0 deletions dbms/src/Flash/Coprocessor/AggregationInterpreterHelper.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,117 @@
// Copyright 2022 PingCAP, Ltd.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

#include <Common/TiFlashException.h>
#include <Core/ColumnNumbers.h>
#include <Flash/Coprocessor/AggregationInterpreterHelper.h>
#include <Flash/Coprocessor/DAGContext.h>
#include <Interpreters/Context.h>

namespace DB::AggregationInterpreterHelper
{
namespace
{
bool isFinalAggMode(const tipb::Expr & expr)
{
if (!expr.has_aggfuncmode())
/// set default value to true to make it compatible with old version of TiDB since before this
/// change, all the aggregation in TiFlash is treated as final aggregation
return true;
return expr.aggfuncmode() == tipb::AggFunctionMode::FinalMode || expr.aggfuncmode() == tipb::AggFunctionMode::CompleteMode;
}

bool isAllowToUseTwoLevelGroupBy(size_t before_agg_streams_size, const Settings & settings)
{
/** Two-level aggregation is useful in two cases:
* 1. Parallel aggregation is done, and the results should be merged in parallel.
* 2. An aggregation is done with store of temporary data on the disk, and they need to be merged in a memory efficient way.
*/
return before_agg_streams_size > 1 || settings.max_bytes_before_external_group_by != 0;
}
} // namespace

bool isFinalAgg(const tipb::Aggregation & aggregation)
{
/// set default value to true to make it compatible with old version of TiDB since before this
/// change, all the aggregation in TiFlash is treated as final aggregation
bool is_final_agg = true;
if (aggregation.agg_func_size() > 0 && !isFinalAggMode(aggregation.agg_func(0)))
is_final_agg = false;
for (int i = 1; i < aggregation.agg_func_size(); ++i)
{
if (unlikely(is_final_agg != isFinalAggMode(aggregation.agg_func(i))))
throw TiFlashException("Different aggregation mode detected", Errors::Coprocessor::BadRequest);
}
return is_final_agg;
}

bool isGroupByCollationSensitive(const Context & context)
{
// todo now we can tell if the aggregation is final stage or partial stage,
// maybe we can do collation insensitive aggregation if the stage is partial

/// collation sensitive group by is slower than normal group by, use normal group by by default
return context.getSettingsRef().group_by_collation_sensitive || context.getDAGContext()->isMPPTask();
}

Aggregator::Params buildParams(
const Context & context,
const Block & before_agg_header,
size_t before_agg_streams_size,
const Names & key_names,
const TiDB::TiDBCollators & collators,
const AggregateDescriptions & aggregate_descriptions,
bool is_final_agg)
{
ColumnNumbers keys;
for (const auto & name : key_names)
{
keys.push_back(before_agg_header.getPositionByName(name));
}

const Settings & settings = context.getSettingsRef();

bool allow_to_use_two_level_group_by = isAllowToUseTwoLevelGroupBy(before_agg_streams_size, settings);

bool has_collator = std::any_of(begin(collators), end(collators), [](const auto & p) { return p != nullptr; });

return Aggregator::Params(
before_agg_header,
keys,
aggregate_descriptions,
false,
settings.max_rows_to_group_by,
settings.group_by_overflow_mode,
allow_to_use_two_level_group_by ? settings.group_by_two_level_threshold : SettingUInt64(0),
allow_to_use_two_level_group_by ? settings.group_by_two_level_threshold_bytes : SettingUInt64(0),
settings.max_bytes_before_external_group_by,
!is_final_agg,
context.getTemporaryPath(),
has_collator ? collators : TiDB::dummy_collators);
}

void fillArgColumnNumbers(AggregateDescriptions & aggregate_descriptions, const Block & before_agg_header)
{
for (auto & descr : aggregate_descriptions)
{
if (descr.arguments.empty())
{
for (const auto & name : descr.argument_names)
{
descr.arguments.push_back(before_agg_header.getPositionByName(name));
}
}
}
}
} // namespace DB::AggregationInterpreterHelper
44 changes: 44 additions & 0 deletions dbms/src/Flash/Coprocessor/AggregationInterpreterHelper.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
// Copyright 2022 PingCAP, Ltd.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

#pragma once

#include <Core/Block.h>
#include <Core/Names.h>
#include <Interpreters/AggregateDescription.h>
#include <Interpreters/Aggregator.h>
#include <tipb/executor.pb.h>

namespace DB
{
class Context;

namespace AggregationInterpreterHelper
{
bool isFinalAgg(const tipb::Aggregation & aggregation);

bool isGroupByCollationSensitive(const Context & context);

Aggregator::Params buildParams(
const Context & context,
const Block & before_agg_header,
size_t before_agg_streams_size,
const Names & key_names,
const TiDB::TiDBCollators & collators,
const AggregateDescriptions & aggregate_descriptions,
bool is_final_agg);

void fillArgColumnNumbers(AggregateDescriptions & aggregate_descriptions, const Block & before_agg_header);
} // namespace AggregationInterpreterHelper
} // namespace DB
119 changes: 21 additions & 98 deletions dbms/src/Flash/Coprocessor/DAGQueryBlockInterpreter.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -33,10 +33,11 @@
#include <DataTypes/DataTypeNullable.h>
#include <DataTypes/DataTypesNumber.h>
#include <DataTypes/getLeastSupertype.h>
#include <Flash/Coprocessor/DAGCodec.h>
#include <Flash/Coprocessor/AggregationInterpreterHelper.h>
#include <Flash/Coprocessor/DAGExpressionAnalyzer.h>
#include <Flash/Coprocessor/DAGQueryBlockInterpreter.h>
#include <Flash/Coprocessor/DAGUtils.h>
#include <Flash/Coprocessor/ExchangeSenderInterpreterHelper.h>
#include <Flash/Coprocessor/InterpreterUtils.h>
#include <Flash/Coprocessor/StreamingDAGResponseWriter.h>
#include <Flash/Mpp/ExchangeReceiver.h>
Expand Down Expand Up @@ -104,15 +105,6 @@ bool addExtraCastsAfterTs(
return analyzer.appendExtraCastsAfterTS(chain, need_cast_column, table_scan);
}

bool isFinalAgg(const tipb::Expr & expr)
{
if (!expr.has_aggfuncmode())
/// set default value to true to make it compatible with old version of TiDB since before this
/// change, all the aggregation in TiFlash is treated as final aggregation
return true;
return expr.aggfuncmode() == tipb::AggFunctionMode::FinalMode || expr.aggfuncmode() == tipb::AggFunctionMode::CompleteMode;
}

AnalysisResult analyzeExpressions(
Context & context,
DAGExpressionAnalyzer & analyzer,
Expand All @@ -134,27 +126,12 @@ AnalysisResult analyzeExpressions(
// There will be either Agg...
if (query_block.aggregation)
{
/// set default value to true to make it compatible with old version of TiDB since before this
/// change, all the aggregation in TiFlash is treated as final aggregation
res.is_final_agg = true;
const auto & aggregation = query_block.aggregation->aggregation();
if (aggregation.agg_func_size() > 0 && !isFinalAgg(aggregation.agg_func(0)))
res.is_final_agg = false;
for (int i = 1; i < aggregation.agg_func_size(); i++)
{
if (res.is_final_agg != isFinalAgg(aggregation.agg_func(i)))
throw TiFlashException("Different aggregation mode detected", Errors::Coprocessor::BadRequest);
}
// todo now we can tell if the aggregation is final stage or partial stage, maybe we can do collation insensitive
// aggregation if the stage is partial
bool group_by_collation_sensitive =
/// collation sensitive group by is slower than normal group by, use normal group by by default
context.getSettingsRef().group_by_collation_sensitive || context.getDAGContext()->isMPPTask();
res.is_final_agg = AggregationInterpreterHelper::isFinalAgg(query_block.aggregation->aggregation());

std::tie(res.aggregation_keys, res.aggregation_collators, res.aggregate_descriptions, res.before_aggregation) = analyzer.appendAggregation(
chain,
query_block.aggregation->aggregation(),
group_by_collation_sensitive);
AggregationInterpreterHelper::isGroupByCollationSensitive(context));

if (query_block.having != nullptr)
{
Expand Down Expand Up @@ -777,56 +754,29 @@ void DAGQueryBlockInterpreter::executeWindow(
void DAGQueryBlockInterpreter::executeAggregation(
DAGPipeline & pipeline,
const ExpressionActionsPtr & expression_actions_ptr,
Names & key_names,
TiDB::TiDBCollators & collators,
const Names & key_names,
const TiDB::TiDBCollators & collators,
AggregateDescriptions & aggregate_descriptions,
bool is_final_agg)
{
pipeline.transform([&](auto & stream) { stream = std::make_shared<ExpressionBlockInputStream>(stream, expression_actions_ptr, log->identifier()); });

Block header = pipeline.firstStream()->getHeader();
ColumnNumbers keys;
for (const auto & name : key_names)
{
keys.push_back(header.getPositionByName(name));
}
for (auto & descr : aggregate_descriptions)
{
if (descr.arguments.empty())
{
for (const auto & name : descr.argument_names)
{
descr.arguments.push_back(header.getPositionByName(name));
}
}
}

const Settings & settings = context.getSettingsRef();

/** Two-level aggregation is useful in two cases:
* 1. Parallel aggregation is done, and the results should be merged in parallel.
* 2. An aggregation is done with store of temporary data on the disk, and they need to be merged in a memory efficient way.
*/
bool allow_to_use_two_level_group_by = pipeline.streams.size() > 1 || settings.max_bytes_before_external_group_by != 0;
bool has_collator = std::any_of(begin(collators), end(collators), [](const auto & p) { return p != nullptr; });
Block before_agg_header = pipeline.firstStream()->getHeader();

Aggregator::Params params(
header,
keys,
AggregationInterpreterHelper::fillArgColumnNumbers(aggregate_descriptions, before_agg_header);
auto params = AggregationInterpreterHelper::buildParams(
context,
before_agg_header,
pipeline.streams.size(),
key_names,
collators,
aggregate_descriptions,
false,
settings.max_rows_to_group_by,
settings.group_by_overflow_mode,
allow_to_use_two_level_group_by ? settings.group_by_two_level_threshold : SettingUInt64(0),
allow_to_use_two_level_group_by ? settings.group_by_two_level_threshold_bytes : SettingUInt64(0),
settings.max_bytes_before_external_group_by,
!is_final_agg,
context.getTemporaryPath(),
has_collator ? collators : TiDB::dummy_collators);
is_final_agg);

/// If there are several sources, then we perform parallel aggregation
if (pipeline.streams.size() > 1)
{
const Settings & settings = context.getSettingsRef();
BlockInputStreamPtr stream_with_non_joined_data = combinedNonJoinedDataStream(pipeline, max_streams, log);
pipeline.firstStream() = std::make_shared<ParallelAggregatingBlockInputStream>(
pipeline.streams,
Expand Down Expand Up @@ -860,7 +810,6 @@ void DAGQueryBlockInterpreter::executeAggregation(
log->identifier());
recordProfileStreams(pipeline, query_block.aggregation_name);
}
// add cast
}

void DAGQueryBlockInterpreter::executeExpression(DAGPipeline & pipeline, const ExpressionActionsPtr & expressionActionsPtr)
Expand Down Expand Up @@ -1205,44 +1154,18 @@ void DAGQueryBlockInterpreter::executeLimit(DAGPipeline & pipeline)

void DAGQueryBlockInterpreter::handleExchangeSender(DAGPipeline & pipeline)
{
/// only run in MPP
assert(dagContext().isMPPTask() && dagContext().tunnel_set != nullptr);
RUNTIME_ASSERT(dagContext().isMPPTask() && dagContext().tunnel_set != nullptr, log, "exchange_sender only run in MPP");
/// exchange sender should be at the top of operators
const auto & exchange_sender = query_block.exchange_sender->exchange_sender();
/// get partition column ids
const auto & part_keys = exchange_sender.partition_keys();
std::vector<Int64> partition_col_id;
TiDB::TiDBCollators collators;
/// in case TiDB is an old version, it has no collation info
bool has_collator_info = exchange_sender.types_size() != 0;
if (has_collator_info && part_keys.size() != exchange_sender.types_size())
{
throw TiFlashException(
std::string(__PRETTY_FUNCTION__) + ": Invalid plan, in ExchangeSender, the length of partition_keys and types is not the same when TiDB new collation is enabled",
Errors::Coprocessor::BadRequest);
}
for (int i = 0; i < part_keys.size(); ++i)
{
const auto & expr = part_keys[i];
assert(isColumnExpr(expr));
auto column_index = decodeDAGInt64(expr.val());
partition_col_id.emplace_back(column_index);
if (has_collator_info && removeNullable(getDataTypeByFieldTypeForComputingLayer(expr.field_type()))->isString())
{
collators.emplace_back(getCollatorFromFieldType(exchange_sender.types(i)));
}
else
{
collators.emplace_back(nullptr);
}
}
std::vector<Int64> partition_col_ids = ExchangeSenderInterpreterHelper::genPartitionColIds(exchange_sender);
TiDB::TiDBCollators partition_col_collators = ExchangeSenderInterpreterHelper::genPartitionColCollators(exchange_sender);
int stream_id = 0;
pipeline.transform([&](auto & stream) {
// construct writer
std::unique_ptr<DAGResponseWriter> response_writer = std::make_unique<StreamingDAGResponseWriter<MPPTunnelSetPtr>>(
context.getDAGContext()->tunnel_set,
partition_col_id,
collators,
partition_col_ids,
partition_col_collators,
exchange_sender.tp(),
context.getSettingsRef().dag_records_per_chunk,
context.getSettingsRef().batch_send_min_limit,
Expand Down
4 changes: 2 additions & 2 deletions dbms/src/Flash/Coprocessor/DAGQueryBlockInterpreter.h
Original file line number Diff line number Diff line change
Expand Up @@ -95,8 +95,8 @@ class DAGQueryBlockInterpreter
void executeAggregation(
DAGPipeline & pipeline,
const ExpressionActionsPtr & expression_actions_ptr,
Names & key_names,
TiDB::TiDBCollators & collators,
const Names & key_names,
const TiDB::TiDBCollators & collators,
AggregateDescriptions & aggregate_descriptions,
bool is_final_agg);
void executeProject(DAGPipeline & pipeline, NamesWithAliases & project_cols);
Expand Down
Loading

0 comments on commit c751073

Please sign in to comment.