Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add support for AggregateFunction in SummingMergeTree #2566

Merged
Show file tree
Hide file tree
Changes from 6 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
87 changes: 56 additions & 31 deletions dbms/src/DataStreams/SummingSortedBlockInputStream.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,8 @@
#include <DataTypes/NestedUtils.h>
#include <DataTypes/DataTypeTuple.h>
#include <DataTypes/DataTypeArray.h>
#include <DataTypes/DataTypeAggregateFunction.h>
#include <Columns/ColumnAggregateFunction.h>
#include <Columns/ColumnTuple.h>
#include <Common/StringUtils/StringUtils.h>
#include <Common/FieldVisitors.h>
Expand Down Expand Up @@ -74,7 +76,8 @@ SummingSortedBlockInputStream::SummingSortedBlockInputStream(
}
else
{
if (!column.type->isSummable())
bool isAggFunc = checkDataType<DataTypeAggregateFunction>(&*column.type);
if (!column.type->isSummable() && !isAggFunc)
{
column_numbers_not_to_aggregate.push_back(i);
continue;
Expand All @@ -93,8 +96,14 @@ SummingSortedBlockInputStream::SummingSortedBlockInputStream(
{
// Create aggregator to sum this column
AggregateDescription desc;
desc.isAggFuncType = isAggFunc;
desc.column_numbers = {i};
desc.init("sumWithOverflow", {column.type});

if (!isAggFunc)
{
desc.init("sumWithOverflow", {column.type});
}

columns_to_aggregate.emplace_back(std::move(desc));
}
else
Expand Down Expand Up @@ -193,28 +202,35 @@ void SummingSortedBlockInputStream::insertCurrentRowIfNeeded(MutableColumns & me
// Do not insert if the aggregation state hasn't been created
if (desc.created)
{
try
if (desc.isAggFuncType)
{
desc.function->insertResultInto(desc.state.data(), *desc.merged_column);

/// Update zero status of current row
if (desc.column_numbers.size() == 1)
current_row_is_zero = false;
}
else
{
try
{
// Flag row as non-empty if at least one column number if non-zero
current_row_is_zero = current_row_is_zero && desc.merged_column->get64(desc.merged_column->size() - 1) == 0;
desc.function->insertResultInto(desc.state.data(), *desc.merged_column);

/// Update zero status of current row
if (desc.column_numbers.size() == 1)
{
// Flag row as non-empty if at least one column number if non-zero
current_row_is_zero = current_row_is_zero && desc.merged_column->get64(desc.merged_column->size() - 1) == 0;
}
else
{
/// It is sumMap aggregate function.
/// Assume that the row isn't empty in this case (just because it is compatible with previous version)
current_row_is_zero = false;
}
}
else
catch (...)
{
/// It is sumMap aggregate function.
/// Assume that the row isn't empty in this case (just because it is compatible with previous version)
current_row_is_zero = false;
desc.destroyState();
throw;
}
}
catch (...)
{
desc.destroyState();
throw;
}
desc.destroyState();
}
else
Expand Down Expand Up @@ -258,7 +274,7 @@ Block SummingSortedBlockInputStream::readImpl()
for (auto & desc : columns_to_aggregate)
{
// Wrap aggregated columns in a tuple to match function signature
if (checkDataType<DataTypeTuple>(desc.function->getReturnType().get()))
if (!desc.isAggFuncType && checkDataType<DataTypeTuple>(desc.function->getReturnType().get()))
{
size_t tuple_size = desc.column_numbers.size();
MutableColumns tuple_columns(tuple_size);
Expand All @@ -277,7 +293,7 @@ Block SummingSortedBlockInputStream::readImpl()
/// Place aggregation results into block.
for (auto & desc : columns_to_aggregate)
{
if (checkDataType<DataTypeTuple>(desc.function->getReturnType().get()))
if (!desc.isAggFuncType && checkDataType<DataTypeTuple>(desc.function->getReturnType().get()))
{
/// Unpack tuple into block.
size_t tuple_size = desc.column_numbers.size();
Expand Down Expand Up @@ -465,23 +481,32 @@ void SummingSortedBlockInputStream::addRow(SortCursor & cursor)
{
for (auto & desc : columns_to_aggregate)
{
if (!desc.created)
throw Exception("Logical error in SummingSortedBlockInputStream, there are no description", ErrorCodes::LOGICAL_ERROR);

// Specialized case for unary functions
if (desc.column_numbers.size() == 1)
if (desc.isAggFuncType)
{
// desc.state is not used for AggregateFunction types
auto & col = cursor->all_columns[desc.column_numbers[0]];
desc.add_function(desc.function.get(), desc.state.data(), &col, cursor->pos, nullptr);
static_cast<ColumnAggregateFunction &>(*desc.merged_column).insertMergeFrom(*col, cursor->pos);
}
else
{
// Gather all source columns into a vector
ColumnRawPtrs columns(desc.column_numbers.size());
for (size_t i = 0; i < desc.column_numbers.size(); ++i)
columns[i] = cursor->all_columns[desc.column_numbers[i]];
if (!desc.created)
throw Exception("Logical error in SummingSortedBlockInputStream, there are no description", ErrorCodes::LOGICAL_ERROR);

desc.add_function(desc.function.get(), desc.state.data(), columns.data(), cursor->pos, nullptr);
// Specialized case for unary functions
if (desc.column_numbers.size() == 1)
{
auto & col = cursor->all_columns[desc.column_numbers[0]];
desc.add_function(desc.function.get(), desc.state.data(), &col, cursor->pos, nullptr);
}
else
{
// Gather all source columns into a vector
ColumnRawPtrs columns(desc.column_numbers.size());
for (size_t i = 0; i < desc.column_numbers.size(); ++i)
columns[i] = cursor->all_columns[desc.column_numbers[i]];

desc.add_function(desc.function.get(), desc.state.data(), columns.data(), cursor->pos, nullptr);
}
}
}
}
Expand Down
9 changes: 7 additions & 2 deletions dbms/src/DataStreams/SummingSortedBlockInputStream.h
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,7 @@ class SummingSortedBlockInputStream : public MergingSortedBlockInputStream
MutableColumnPtr merged_column;
std::vector<char> state;
bool created = false;
bool isAggFuncType = false;

void init(const char * function_name, const DataTypes & argument_types)
{
Expand All @@ -87,15 +88,19 @@ class SummingSortedBlockInputStream : public MergingSortedBlockInputStream
{
if (created)
return;
function->create(state.data());
if (isAggFuncType)
merged_column->insertDefault();
else
function->create(state.data());
created = true;
}

void destroyState()
{
if (!created)
return;
function->destroy(state.data());
if (!isAggFuncType)
function->destroy(state.data());
created = false;
}

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
1 6 3 3
1 6 3 3
1 6 [3,2]
1 6 [3,2]
1 0.5
1 0.5
1 0.1
1 0.1
0 333333 53
1 333333 53
2 333333 53
0 333333 53
1 333333 53
2 333333 53
Original file line number Diff line number Diff line change
@@ -0,0 +1,115 @@
drop table if exists test.summing_merge_tree_aggregate_function;
drop table if exists test.summing_merge_tree_null;

---- sum + uniq + uniqExact
create table test.summing_merge_tree_aggregate_function (
d materialized today(),
k UInt64,
c UInt64,
u AggregateFunction(uniq, UInt8),
ue AggregateFunction(uniqExact, UInt8)
) engine=SummingMergeTree(d, k, 8192);

insert into test.summing_merge_tree_aggregate_function select 1, 1, uniqState(1), uniqExactState(1);
insert into test.summing_merge_tree_aggregate_function select 1, 1, uniqState(2), uniqExactState(2);
insert into test.summing_merge_tree_aggregate_function select 1, 1, uniqState(3), uniqExactState(2);
insert into test.summing_merge_tree_aggregate_function select 1, 1, uniqState(1), uniqExactState(1);
insert into test.summing_merge_tree_aggregate_function select 1, 1, uniqState(2), uniqExactState(2);
insert into test.summing_merge_tree_aggregate_function select 1, 1, uniqState(3), uniqExactState(3);

select
k, sum(c),
uniqMerge(u), uniqExactMerge(ue)
from test.summing_merge_tree_aggregate_function group by k;

optimize table test.summing_merge_tree_aggregate_function;

select
k, sum(c),
uniqMerge(u), uniqExactMerge(ue)
from test.summing_merge_tree_aggregate_function group by k;

drop table test.summing_merge_tree_aggregate_function;

---- sum + topK
create table test.summing_merge_tree_aggregate_function (d materialized today(), k UInt64, c UInt64, x AggregateFunction(topK(2), UInt8)) engine=SummingMergeTree(d, k, 8192);

insert into test.summing_merge_tree_aggregate_function select 1, 1, topKState(2)(1);
insert into test.summing_merge_tree_aggregate_function select 1, 1, topKState(2)(2);
insert into test.summing_merge_tree_aggregate_function select 1, 1, topKState(2)(2);
insert into test.summing_merge_tree_aggregate_function select 1, 1, topKState(2)(3);
insert into test.summing_merge_tree_aggregate_function select 1, 1, topKState(2)(3);
insert into test.summing_merge_tree_aggregate_function select 1, 1, topKState(2)(3);
select k, sum(c), topKMerge(2)(x) from test.summing_merge_tree_aggregate_function group by k;
optimize table test.summing_merge_tree_aggregate_function;
select k, sum(c), topKMerge(2)(x) from test.summing_merge_tree_aggregate_function group by k;

drop table test.summing_merge_tree_aggregate_function;

---- avg
create table test.summing_merge_tree_aggregate_function (d materialized today(), k UInt64, x AggregateFunction(avg, Float64)) engine=SummingMergeTree(d, k, 8192);

insert into test.summing_merge_tree_aggregate_function select 1, avgState(0.0);
insert into test.summing_merge_tree_aggregate_function select 1, avgState(0.1);
insert into test.summing_merge_tree_aggregate_function select 1, avgState(0.2);
insert into test.summing_merge_tree_aggregate_function select 1, avgState(0.3);
insert into test.summing_merge_tree_aggregate_function select 1, avgState(0.4);
insert into test.summing_merge_tree_aggregate_function select 1, avgState(0.5);
insert into test.summing_merge_tree_aggregate_function select 1, avgState(0.6);
insert into test.summing_merge_tree_aggregate_function select 1, avgState(0.7);
insert into test.summing_merge_tree_aggregate_function select 1, avgState(0.8);
insert into test.summing_merge_tree_aggregate_function select 1, avgState(0.9);
insert into test.summing_merge_tree_aggregate_function select 1, avgState(1.0);
select k, avgMerge(x) from test.summing_merge_tree_aggregate_function group by k;
optimize table test.summing_merge_tree_aggregate_function;
select k, avgMerge(x) from test.summing_merge_tree_aggregate_function group by k;

drop table test.summing_merge_tree_aggregate_function;

---- quantile
create table test.summing_merge_tree_aggregate_function (d materialized today(), k UInt64, x AggregateFunction(quantile(0.1), Float64)) engine=SummingMergeTree(d, k, 8192);

insert into test.summing_merge_tree_aggregate_function select 1, quantileState(0.1)(0.0);
insert into test.summing_merge_tree_aggregate_function select 1, quantileState(0.1)(0.1);
insert into test.summing_merge_tree_aggregate_function select 1, quantileState(0.1)(0.2);
insert into test.summing_merge_tree_aggregate_function select 1, quantileState(0.1)(0.3);
insert into test.summing_merge_tree_aggregate_function select 1, quantileState(0.1)(0.4);
insert into test.summing_merge_tree_aggregate_function select 1, quantileState(0.1)(0.5);
insert into test.summing_merge_tree_aggregate_function select 1, quantileState(0.1)(0.6);
insert into test.summing_merge_tree_aggregate_function select 1, quantileState(0.1)(0.7);
insert into test.summing_merge_tree_aggregate_function select 1, quantileState(0.1)(0.8);
insert into test.summing_merge_tree_aggregate_function select 1, quantileState(0.1)(0.9);
insert into test.summing_merge_tree_aggregate_function select 1, quantileState(0.1)(1.0);
select k, quantileMerge(0.1)(x) from test.summing_merge_tree_aggregate_function group by k;
optimize table test.summing_merge_tree_aggregate_function;
select k, quantileMerge(0.1)(x) from test.summing_merge_tree_aggregate_function group by k;

drop table test.summing_merge_tree_aggregate_function;

---- sum + uniq with more data
create table test.summing_merge_tree_null (
d materialized today(),
k UInt64,
c UInt64,
u UInt64
) engine=Null;

create materialized view test.summing_merge_tree_aggregate_function (
d materialized today(),
k UInt64,
c UInt64,
u AggregateFunction(uniq, UInt64)
) engine=SummingMergeTree(d, k, 8192)
as select d, k, sum(c) as c, uniqState(u) as u
from test.summing_merge_tree_null
group by d, k;

-- prime number 53 to avoid resonanse between %3 and %53
insert into test.summing_merge_tree_null select number % 3, 1, number % 53 from numbers(999999);

select k, sum(c), uniqMerge(u) from test.summing_merge_tree_aggregate_function group by k order by k;
optimize table test.summing_merge_tree_aggregate_function;
select k, sum(c), uniqMerge(u) from test.summing_merge_tree_aggregate_function group by k order by k;

drop table test.summing_merge_tree_aggregate_function;
drop table test.summing_merge_tree_null;
2 changes: 1 addition & 1 deletion docs/en/table_engines/aggregatingmergetree.md
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ SELECT uniq(UserID) FROM table
SELECT uniqMerge(state) FROM (SELECT uniqState(UserID) AS state FROM table GROUP BY RegionID)
```

There is an ` AggregatingMergeTree` engine. Its job during a merge is to combine the states of aggregate functions from different table rows with the same primary key value.
There is an `AggregatingMergeTree` engine. Its job during a merge is to combine the states of aggregate functions from different table rows with the same primary key value.

You can't use a normal INSERT to insert a row in a table containing `AggregateFunction` columns, because you can't explicitly define the `AggregateFunction` value. Instead, use `INSERT SELECT` with `-State` aggregate functions for inserting data.

Expand Down
2 changes: 1 addition & 1 deletion docs/en/table_engines/summingmergetree.md
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ The columns to total are set explicitly (the last parameter – Shows, Clicks, C

If the values were null in all of these columns, the row is deleted. (The exception is cases when the data part would not have any rows left in it.)

For the other rows that are not part of the primary key, the first value that occurs is selected when merging.
For the other columns that are not part of the primary key, the first value that occurs is selected when merging. But if a column is of AggregateFunction type, then it is merged according to that function, which effectively makes this engine behave like `AggregatingMergeTree`.

Summation is not performed for a read operation. If it is necessary, write the appropriate GROUP BY.

Expand Down
2 changes: 1 addition & 1 deletion docs/ru/table_engines/summingmergetree.md
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ SummingMergeTree(EventDate, (OrderID, EventDate, BannerID, ...), 8192, (Shows, C

Если значения во всех таких столбцах оказались нулевыми, то строчка удаляется. (За исключением случаев, когда в куске данных не осталось бы ни одной строчки.)

Для остальных столбцов, не входящих в первичный ключ, при слиянии выбирается первое попавшееся значение.
Для остальных столбцов, не входящих в первичный ключ, при слиянии выбирается первое попавшееся значение. Но для столбцов типа AggregateFunction выполняется агрегация согласно заданной функции, так что этот движок фактически ведёт себя как `AggregatinMergeTree`.

При чтении, суммирование не делается само по себе. Если оно необходимо - напишите соответствующий GROUP BY.

Expand Down