Skip to content

Commit

Permalink
Refine merge agg stream (#6793)
Browse files Browse the repository at this point in the history
ref #5900
  • Loading branch information
SeaRise authored Feb 14, 2023
1 parent f5f17bb commit 34fc70f
Show file tree
Hide file tree
Showing 9 changed files with 280 additions and 841 deletions.
13 changes: 12 additions & 1 deletion dbms/src/DataStreams/AggregatingBlockInputStream.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,8 @@

#include <DataStreams/AggregatingBlockInputStream.h>
#include <DataStreams/MergingAggregatedMemoryEfficientBlockInputStream.h>
#include <DataStreams/MergingAndConvertingBlockInputStream.h>
#include <DataStreams/NullBlockInputStream.h>

namespace DB
{
Expand Down Expand Up @@ -41,7 +43,16 @@ Block AggregatingBlockInputStream::readImpl()
if (!aggregator.hasSpilledData())
{
ManyAggregatedDataVariants many_data{data_variants};
impl = aggregator.mergeAndConvertToBlocks(many_data, final, 1);
auto merging_buckets = aggregator.mergeAndConvertToBlocks(many_data, final, 1);
if (!merging_buckets)
{
impl = std::make_unique<NullBlockInputStream>(aggregator.getHeader(final));
}
else
{
RUNTIME_CHECK(1 == merging_buckets->getConcurrency());
impl = std::make_unique<MergingAndConvertingBlockInputStream>(merging_buckets, 0, log->identifier());
}
}
else
{
Expand Down
55 changes: 0 additions & 55 deletions dbms/src/DataStreams/MergingAggregatedBlockInputStream.cpp

This file was deleted.

58 changes: 0 additions & 58 deletions dbms/src/DataStreams/MergingAggregatedBlockInputStream.h

This file was deleted.

Loading

0 comments on commit 34fc70f

Please sign in to comment.