Skip to content

Commit

Permalink
fix multishard incorrect distinct result
Browse files Browse the repository at this point in the history
  • Loading branch information
Jasmine-ge committed Aug 20, 2024
1 parent c52243c commit 57973ce
Show file tree
Hide file tree
Showing 2 changed files with 93 additions and 29 deletions.
117 changes: 92 additions & 25 deletions src/AggregateFunctions/Streaming/AggregateFunctionDistinct.h
Original file line number Diff line number Diff line change
Expand Up @@ -23,9 +23,13 @@ struct AggregateFunctionDistinctSingleNumericData
/// Optimized, put the new coming data that the set does not have into extra_data_since_last_finalize.
std::vector<T> extra_data_since_last_finalize;

// If has new data
/// If has new data
bool has_new_data = false;

/// A blank mark that's used to decide which kind is the current block.
/// 0 means data block, 1 means a special data block whose extra data has been deleted, 2 means a blank block.
int is_blank = 0;

void add(const IColumn ** columns, size_t /* columns_num */, size_t row_num, Arena *)
{
const auto & vec = assert_cast<const ColumnVector<T> &>(*columns[0]).getData();
Expand All @@ -36,35 +40,86 @@ struct AggregateFunctionDistinctSingleNumericData
has_new_data = true;
extra_data_since_last_finalize.emplace_back(vec[row_num]);
}
if (is_blank == 2)
{
is_blank = 1;
}
}

void merge(const Self & rhs, Arena *)
{
if (rhs.extra_data_since_last_finalize.size())
/// mark out the blank blocks
if (is_blank != 2 && !extra_data_since_last_finalize.size())
{
for (const auto & data : rhs.extra_data_since_last_finalize)
is_blank = 2;
}
/// if the block is in 1 mode, merge the set and delete the data both existed in the current extra data and the set.
if (rhs.is_blank == 1)
{
for (const auto & data : rhs.set)
{
auto [_, inserted] = set.insert(data);
if (inserted)
extra_data_since_last_finalize.emplace_back(data);
double value = data.getValue();
auto it = std::find(extra_data_since_last_finalize.begin(), extra_data_since_last_finalize.end(), value);
if (it != extra_data_since_last_finalize.end())
{
extra_data_since_last_finalize.erase(it);
}

}
if (rhs.extra_data_since_last_finalize.size())
{
for (const auto & data : rhs.extra_data_since_last_finalize)
{
auto [_, inserted] = set.insert(data);
if (inserted)
{
extra_data_since_last_finalize.emplace_back(data);
}
}
}
set.merge(rhs.set);
}
/**
* Under what circumstances will extra_data_since_last_finalize.size() be zero but has_new_data be true?
* Only in the first round of inserting data into multi-shard stream.
* For example: create stream test(id int, value int) settings shards=3;
* select count_distinct(value) from test;
* insert into test(id, value) values (3, 30), (4, 40);
* when execute the 'insert' command, it will trigger merge function, because in Aggregator::mergeSingleLevelDataImpl(...),
* there is a varible 'non_empty_data' to indicate if the other shard has data, and then call merge function.
* Since we are in the first round of inserting data, the other shard has no data, then in the insertResultIntoImpl function,
* the extra_data_since_last_finalize will be cleared.But actually, we do have new data.
* So it is just a special case and will just happen once.
*/
else if (rhs.has_new_data)
else
{
set.merge(rhs.set);
if (rhs.extra_data_since_last_finalize.size())
{
for (const auto & data : rhs.extra_data_since_last_finalize)
{
auto [_, inserted] = set.insert(data);
if (inserted)
{
extra_data_since_last_finalize.emplace_back(data);
}
}
}
/**
* Under what circumstances will extra_data_since_last_finalize.size() be zero but has_new_data be true?
* Only in the first round of inserting data into multi-shard stream.
* For example: create stream test(id int, value int) settings shards=3;
* select count_distinct(value) from test;
* insert into test(id, value) values (3, 30), (4, 40);
* when execute the 'insert' command, it will trigger merge function, because in Aggregator::mergeSingleLevelDataImpl(...),
* there is a varible 'non_empty_data' to indicate if the other shard has data, and then call merge function.
* Since we are in the first round of inserting data, the other shard has no data, then in the insertResultIntoImpl function,
* the extra_data_since_last_finalize will be cleared.But actually, we do have new data.
* So it is just a special case and will just happen once.
*/
else if (rhs.has_new_data)
{
for (const auto & data : rhs.set)
{
double value = data.getValue();
auto it = std::find(extra_data_since_last_finalize.begin(), extra_data_since_last_finalize.end(), value);
if (it != extra_data_since_last_finalize.end())
{
extra_data_since_last_finalize.erase(it);
}

}
set.merge(rhs.set);
}
}

}

void serialize(WriteBuffer & buf) const
Expand Down Expand Up @@ -103,13 +158,18 @@ struct AggregateFunctionDistinctGenericData
/// Optimized, put the new coming data that the set does not have into extra_data_since_last_finalize.
std::vector<StringRef> extra_data_since_last_finalize;
bool has_new_data = false;

int is_blank = 0;

void merge(const Self & rhs, Arena * arena)
{
Set::LookupResult it;
bool inserted;

if (is_blank != 2 && !extra_data_since_last_finalize.size()) {
is_blank = 2;
}
if (rhs.is_blank == 1) {
set.merge(rhs.set);
}
if (rhs.extra_data_since_last_finalize.size())
{
for (const auto & data : rhs.extra_data_since_last_finalize)
Expand Down Expand Up @@ -168,10 +228,14 @@ struct AggregateFunctionDistinctSingleGenericData : public AggregateFunctionDist
bool inserted;
auto key_holder = getKeyHolder<is_plain_column>(*columns[0], row_num, *arena);
set.emplace(key_holder, it, inserted);

if (inserted)
{
assert(it);
has_new_data = true;
if (is_blank == 2) {
is_blank = 1;
}
extra_data_since_last_finalize.emplace_back(it->getValue());
}
}
Expand Down Expand Up @@ -211,6 +275,9 @@ struct AggregateFunctionDistinctMultipleGenericData : public AggregateFunctionDi
{
assert(it);
has_new_data = true;
if (is_blank == 2) {
is_blank = 1;
}
extra_data_since_last_finalize.emplace_back(it->getValue());
}
}
Expand Down Expand Up @@ -301,10 +368,10 @@ class AggregateFunctionDistinct : public IAggregateFunctionDataHelper<Data, Aggr
else
nested_func->insertResultInto(getNestedPlace(place), to, arena);

/// proton: starts. Next finalization will use extra data, used in streaming global aggregation query.
// this->data(place).use_extra_data = true;
/// only the blank block's extra data will be cleaned
this->data(place).extra_data_since_last_finalize.clear();
/// proton: ends.
this->data(place).is_blank = 2;

}

void insertResultInto(AggregateDataPtr __restrict place, IColumn & to, Arena * arena) const override
Expand Down
5 changes: 1 addition & 4 deletions tests/stream/test_stream_smoke/0012_multishards7.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -467,13 +467,11 @@ tests:
wait: 1
query: |
drop stream if exists test13_multishard_7;
- client: python
query_type: table
wait: 1
query: |
create stream if not exists test13_multishard_7(id int, val int) settings shards=3, sharding_expr='to_int(id)';
- client: python
query_id: '1364'
depends_on_stream: test13_multishard_7
Expand All @@ -498,9 +496,8 @@ tests:
wait: 3
query: |
drop stream if exists test13_multishard_7;
expected_results:
- query_id: '1364'
expected_results:
- [70, 70]
- [70, 140]
- [70, 140]

0 comments on commit 57973ce

Please sign in to comment.