Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

avoid spill empty block in agg #6891

Merged
merged 3 commits into from
Feb 24, 2023
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 4 additions & 3 deletions dbms/src/Interpreters/Aggregator.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -263,6 +263,7 @@ Aggregator::Aggregator(const Params & params_, const String & req_id)
}

method_chosen = chooseAggregationMethod();
RUNTIME_CHECK_MSG(method_chosen != AggregatedDataVariants::Type::EMPTY, "Invalid aggregation method");
if (AggregatedDataVariants::isConvertibleToTwoLevel(method_chosen))
{
/// for aggregation, the input block is sorted by bucket number
Expand Down Expand Up @@ -725,7 +726,7 @@ bool Aggregator::executeOnBlock(
result.aggregator = this;

/// How to perform the aggregation?
if (result.empty())
if (!result.inited())
{
result.init(method_chosen);
result.keys_size = params.keys_size;
Expand Down Expand Up @@ -812,7 +813,7 @@ bool Aggregator::executeOnBlock(
/** Flush data to disk if too much RAM is consumed.
* Data can only be flushed to disk if a two-level aggregation is supported.
*/
if (max_bytes_before_external_group_by
if (max_bytes_before_external_group_by && result_size > 0
&& (result.isTwoLevel() || result.isConvertibleToTwoLevel())
&& result_size_bytes > max_bytes_before_external_group_by)
{
Expand Down Expand Up @@ -2105,7 +2106,7 @@ void Aggregator::destroyWithoutKey(AggregatedDataVariants & result) const

void Aggregator::destroyAllAggregateStates(AggregatedDataVariants & result)
{
if (result.empty())
if (!result.inited())
return;

LOG_TRACE(log, "Destroying aggregate states");
Expand Down
6 changes: 5 additions & 1 deletion dbms/src/Interpreters/Aggregator.h
Original file line number Diff line number Diff line change
Expand Up @@ -686,9 +686,13 @@ struct AggregatedDataVariants : private boost::noncopyable
: aggregates_pools(1, std::make_shared<Arena>())
, aggregates_pool(aggregates_pools.back().get())
{}
bool inited() const
{
return type != Type::EMPTY;
}
bool empty() const
{
return type == Type::EMPTY;
return size() == 0;
}
void invalidate()
{
Expand Down