Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 13 additions & 0 deletions be/src/vec/aggregate_functions/aggregate_function_distinct.h
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,8 @@ struct AggregateFunctionDistinctSingleNumericData {
using Self = AggregateFunctionDistinctSingleNumericData<T, stable>;
Container data;

void clear() { data.clear(); }

void add(const IColumn** columns, size_t /* columns_num */, size_t row_num, Arena*) {
const auto& vec = assert_cast<const ColumnVector<T>&>(*columns[0]).get_data();
if constexpr (stable) {
Expand Down Expand Up @@ -122,6 +124,8 @@ struct AggregateFunctionDistinctGenericData {
using Self = AggregateFunctionDistinctGenericData;
Container data;

void clear() { data.clear(); }

void merge(const Self& rhs, Arena* arena) {
DCHECK(!stable);
if constexpr (!stable) {
Expand Down Expand Up @@ -315,6 +319,15 @@ class AggregateFunctionDistinct
nested_func->add_batch_single_place(arguments[0]->size(), get_nested_place(place),
arguments_raw.data(), &arena);
nested_func->insert_result_into(get_nested_place(place), to);
// for distinct agg function, the real calculate is add_batch_single_place at last step of insert_result_into function.
// but with distinct agg and over() window function together, the result will be inserted into many times with different rows
// so we need to clear the data, thus not to affect the next insert_result_into
this->data(place).clear();
}

void reset(AggregateDataPtr place) const override {
this->data(place).clear();
nested_func->reset(get_nested_place(place));
}

size_t size_of_data() const override { return prefix_size + nested_func->size_of_data(); }
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -505,23 +505,23 @@ sichuan [{"cbe":{},"notnull":0,"null":1,"all":1}]
3 4

-- !agg_window_multi_distinct_sum --
1 11320987615.0000000119
1 1617283945.0000000017
1 3234567890.0000000034
1 4851851835.0000000051
1 6469135780.0000000068
1 8086419725.0000000085
1 9703703670.0000000102
1 1617283945.0000000017
1 1617283945.0000000017
1 1617283945.0000000017
1 1617283945.0000000017
1 1617283945.0000000017
1 1617283945.0000000017
2 1217283945.0000000026
2 1217283945.0000000026
2 2434567890.0000000052
2 3651851835.0000000078
2 4869135780.0000000104
2 6086419725.0000000130
2 1217283945.0000000026
2 1217283945.0000000026
2 1217283945.0000000026
3 1093827157.0000000020
3 1093827157.0000000020
3 1093827157.0000000020
3 1093827157.0000000020
3 1093827157.0000000020
3 2187654314.0000000040
3 3281481471.0000000060
3 4375308628.0000000080
3 5469135785.0000000100

-- !agg_window_bitmap_union --
1 1,2,3
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,3 +4,21 @@
2 1243.500
3 24453.325

-- !sql_window_muti1 --
a
a
a
a

-- !sql_window_muti2 --
1
1
1
1

-- !sql_window_muti3 --
1
1
1
1

Original file line number Diff line number Diff line change
Expand Up @@ -21,5 +21,27 @@ suite("test_sum") {
(partition by k1 order by k3 range between current row and unbounded following) as w
from test_query_db.test order by k1, w
"""

sql "create database if not exists multi_db"
sql "use multi_db"
sql "DROP TABLE IF EXISTS multi"
sql """
CREATE TABLE multi (
id int,
v1 int,
v2 varchar
) ENGINE = OLAP
DUPLICATE KEY(id) COMMENT 'OLAP'
DISTRIBUTED BY HASH(id) BUCKETS 2
PROPERTIES (
"replication_allocation" = "tag.location.default: 1"
);
"""
sql """
insert into multi values (1, 1, 'a'),(1, 1, 'a'), (2, 1, 'a'), (3, 1, 'a');
"""
qt_sql_window_muti1 """ select multi_distinct_group_concat(v2) over() from multi; """
qt_sql_window_muti2 """ select multi_distinct_sum(v1) over() from multi; """
qt_sql_window_muti3 """ select multi_distinct_count(v1) over() from multi; """
}

Loading