diff --git a/dbms/src/Flash/Coprocessor/AggregationInterpreterHelper.cpp b/dbms/src/Flash/Coprocessor/AggregationInterpreterHelper.cpp index e1a499aa848..1d5109fc410 100644 --- a/dbms/src/Flash/Coprocessor/AggregationInterpreterHelper.cpp +++ b/dbms/src/Flash/Coprocessor/AggregationInterpreterHelper.cpp @@ -31,13 +31,13 @@ bool isFinalAggMode(const tipb::Expr & expr) return expr.aggfuncmode() == tipb::AggFunctionMode::FinalMode || expr.aggfuncmode() == tipb::AggFunctionMode::CompleteMode; } -bool isAllowToUseTwoLevelGroupBy(size_t before_agg_streams_size, const Settings & settings) +bool isAllowToUseTwoLevelGroupBy(size_t agg_concurrency, const Settings & settings) { /** Two-level aggregation is useful in two cases: * 1. Parallel aggregation is done, and the results should be merged in parallel. * 2. An aggregation is done with store of temporary data on the disk, and they need to be merged in a memory efficient way. */ - return before_agg_streams_size > 1 || settings.max_bytes_before_external_group_by != 0; + return agg_concurrency > 1 || settings.max_bytes_before_external_group_by != 0; } } // namespace diff --git a/dbms/src/Flash/Coprocessor/DAGQueryBlockInterpreter.cpp b/dbms/src/Flash/Coprocessor/DAGQueryBlockInterpreter.cpp index 2e0a54a3e4f..1c3cc5a490f 100644 --- a/dbms/src/Flash/Coprocessor/DAGQueryBlockInterpreter.cpp +++ b/dbms/src/Flash/Coprocessor/DAGQueryBlockInterpreter.cpp @@ -396,7 +396,7 @@ void DAGQueryBlockInterpreter::executeAggregation( auto params = AggregationInterpreterHelper::buildParams( context, before_agg_header, - pipeline.streams.size(), + std::min(max_streams, pipeline.streams.size()), key_names, collators, aggregate_descriptions, diff --git a/dbms/src/Flash/Planner/Plans/PhysicalAggregation.cpp b/dbms/src/Flash/Planner/Plans/PhysicalAggregation.cpp index 90a4f441e35..25c851eae78 100644 --- a/dbms/src/Flash/Planner/Plans/PhysicalAggregation.cpp +++ b/dbms/src/Flash/Planner/Plans/PhysicalAggregation.cpp @@ -98,7 +98,7 @@ void PhysicalAggregation::buildBlockInputStreamImpl(DAGPipeline & pipeline, Cont auto params = AggregationInterpreterHelper::buildParams( context, before_agg_header, - pipeline.streams.size(), + std::min(max_streams, pipeline.streams.size()), aggregation_keys, aggregation_collators, aggregate_descriptions,