Skip to content

Commit

Permalink
fix
Browse files Browse the repository at this point in the history
Signed-off-by: xufei <xufeixw@mail.ustc.edu.cn>
  • Loading branch information
windtalker committed Feb 6, 2023
1 parent 88d4c8f commit 426315f
Show file tree
Hide file tree
Showing 4 changed files with 13 additions and 9 deletions.
13 changes: 7 additions & 6 deletions dbms/src/Flash/Coprocessor/AggregationInterpreterHelper.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -32,13 +32,13 @@ bool isFinalAggMode(const tipb::Expr & expr)
return expr.aggfuncmode() == tipb::AggFunctionMode::FinalMode || expr.aggfuncmode() == tipb::AggFunctionMode::CompleteMode;
}

bool isAllowToUseTwoLevelGroupBy(size_t agg_concurrency, const Settings & settings)
bool isAllowToUseTwoLevelGroupBy(size_t before_agg_streams_size, const Settings & settings)
{
/** Two-level aggregation is useful in two cases:
* 1. Parallel aggregation is done, and the results should be merged in parallel.
* 2. An aggregation is done with store of temporary data on the disk, and they need to be merged in a memory efficient way.
*/
return agg_concurrency > 1 || settings.max_bytes_before_external_group_by != 0;
return before_agg_streams_size > 1 || settings.max_bytes_before_external_group_by != 0;
}
} // namespace

Expand Down Expand Up @@ -76,7 +76,8 @@ bool isGroupByCollationSensitive(const Context & context)
Aggregator::Params buildParams(
const Context & context,
const Block & before_agg_header,
size_t agg_concurrency,
size_t before_agg_streams_size,
size_t agg_streams_size,
const Names & key_names,
const TiDB::TiDBCollators & collators,
const AggregateDescriptions & aggregate_descriptions,
Expand All @@ -91,7 +92,7 @@ Aggregator::Params buildParams(

const Settings & settings = context.getSettingsRef();

bool allow_to_use_two_level_group_by = isAllowToUseTwoLevelGroupBy(agg_concurrency, settings);
bool allow_to_use_two_level_group_by = isAllowToUseTwoLevelGroupBy(before_agg_streams_size, settings);
auto total_two_level_threshold_bytes = allow_to_use_two_level_group_by ? settings.group_by_two_level_threshold_bytes : SettingUInt64(0);

bool has_collator = std::any_of(begin(collators), end(collators), [](const auto & p) { return p != nullptr; });
Expand All @@ -103,8 +104,8 @@ Aggregator::Params buildParams(
/// do not use the average value for key count threshold, because for a random distributed data, the key count
/// in every threads should almost be the same
allow_to_use_two_level_group_by ? settings.group_by_two_level_threshold : SettingUInt64(0),
getAverageThreshold(total_two_level_threshold_bytes, agg_concurrency),
getAverageThreshold(settings.max_bytes_before_external_group_by, agg_concurrency),
getAverageThreshold(total_two_level_threshold_bytes, agg_streams_size),
getAverageThreshold(settings.max_bytes_before_external_group_by, agg_streams_size),
!is_final_agg,
spill_config,
context.getSettingsRef().max_block_size,
Expand Down
3 changes: 2 additions & 1 deletion dbms/src/Flash/Coprocessor/AggregationInterpreterHelper.h
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,8 @@ bool isGroupByCollationSensitive(const Context & context);
Aggregator::Params buildParams(
const Context & context,
const Block & before_agg_header,
size_t agg_concurrency,
size_t before_agg_streams_size,
size_t agg_streams_size,
const Names & key_names,
const TiDB::TiDBCollators & collators,
const AggregateDescriptions & aggregate_descriptions,
Expand Down
3 changes: 2 additions & 1 deletion dbms/src/Flash/Coprocessor/DAGQueryBlockInterpreter.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -397,7 +397,8 @@ void DAGQueryBlockInterpreter::executeAggregation(
auto params = AggregationInterpreterHelper::buildParams(
context,
before_agg_header,
std::min(max_streams, pipeline.streams.size()),
pipeline.streams.size(),
enable_fine_grained_shuffle ? pipeline.streams.size() : 1,
key_names,
collators,
aggregate_descriptions,
Expand Down
3 changes: 2 additions & 1 deletion dbms/src/Flash/Planner/Plans/PhysicalAggregation.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -98,7 +98,8 @@ void PhysicalAggregation::buildBlockInputStreamImpl(DAGPipeline & pipeline, Cont
auto params = AggregationInterpreterHelper::buildParams(
context,
before_agg_header,
std::min(max_streams, pipeline.streams.size()),
pipeline.streams.size(),
fine_grained_shuffle.enable() ? pipeline.streams.size() : 1,
aggregation_keys,
aggregation_collators,
aggregate_descriptions,
Expand Down

0 comments on commit 426315f

Please sign in to comment.