Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Bugfix/issue 425 multishard incorrect result on distinct #784

Merged
Merged
Show file tree
Hide file tree
Changes from 8 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
184 changes: 88 additions & 96 deletions src/AggregateFunctions/Streaming/AggregateFunctionDistinct.h
Original file line number Diff line number Diff line change
Expand Up @@ -5,12 +5,12 @@
#include <Columns/ColumnArray.h>
#include <Common/assert_cast.h>
#include <Common/HashTable/HashSet.h>
#include <Common/serde.h>

namespace DB
{
namespace Streaming
{

template <typename T>
struct AggregateFunctionDistinctSingleNumericData
{
Expand All @@ -19,84 +19,73 @@ struct AggregateFunctionDistinctSingleNumericData
using Self = AggregateFunctionDistinctSingleNumericData<T>;
Set set;

/// proton: starts. Resolve multiple finalizations problem for streaming global aggreagtion query
/// Resolve multiple finalizations problem for streaming global aggreagtion query
/// Optimized, put the new coming data that the set does not have into extra_data_since_last_finalize.
std::vector<T> extra_data_since_last_finalize;
bool use_extra_data = false; /// Optimized, only streaming global aggreagtion query need to use extra data after first finalization.
/// proton: ends.

NO_SERDE std::vector<uintptr_t> merged_places;

/// not used, but is kept for backward compatibility
bool use_extra_data = false;

void add(const IColumn ** columns, size_t /* columns_num */, size_t row_num, Arena *)
{
const auto & vec = assert_cast<const ColumnVector<T> &>(*columns[0]).getData();
/// proton: starts.
auto [_, inserted] = set.insert(vec[row_num]);
if (use_extra_data && inserted)
if (inserted)
extra_data_since_last_finalize.emplace_back(vec[row_num]);
/// proton: ends.

Jasmine-ge marked this conversation as resolved.
Show resolved Hide resolved
}

void merge(const Self & rhs, Arena *)
{
/// proton: starts.
if (rhs.use_extra_data)
{
for (const auto & data : rhs.extra_data_since_last_finalize)
{
auto [_, inserted] = set.insert(data);
if (use_extra_data && inserted)
extra_data_since_last_finalize.emplace_back(data);
}
}
else if (use_extra_data)
/// Deduplicate owned extra data based on rhs
for (auto it = extra_data_since_last_finalize.begin(); it != extra_data_since_last_finalize.end();)
{
for (const auto & elem : rhs.set)
{
auto [_, inserted] = set.insert(elem.getValue());
if (inserted)
extra_data_since_last_finalize.emplace_back(elem.getValue());
}
if (rhs.set.find(*it) != rhs.set.end())
it = extra_data_since_last_finalize.erase(it);
else
++it;
}
else

/// Merge and deduplicate rhs' extra data
for (const auto & data : rhs.extra_data_since_last_finalize)
{
set.merge(rhs.set);
auto [_, inserted] = set.insert(data);
if (inserted)
extra_data_since_last_finalize.emplace_back(data);
}
/// proton: ends.

set.merge(rhs.set);
Jasmine-ge marked this conversation as resolved.
Show resolved Hide resolved

uintptr_t merged_place = reinterpret_cast<uintptr_t>(&rhs);
auto find_place = std::find(merged_places.begin(), merged_places.end(),merged_place);
if (find_place == merged_places.end())
merged_places.emplace_back(merged_place);

}

void serialize(WriteBuffer & buf) const
{
set.write(buf);
/// proton: starts.
writeVectorBinary(extra_data_since_last_finalize, buf);
writeBoolText(use_extra_data, buf);
/// proton: ends.
}

void deserialize(ReadBuffer & buf, Arena *)
{
set.read(buf);
/// proton: starts.
readVectorBinary(extra_data_since_last_finalize, buf);
readBoolText(use_extra_data, buf);
/// proton: ends.
}

MutableColumns getArguments(const DataTypes & argument_types) const
{
MutableColumns argument_columns;
argument_columns.emplace_back(argument_types[0]->createColumn());

/// proton: starts.
if (use_extra_data)
{
for (const auto & data : extra_data_since_last_finalize)
argument_columns[0]->insert(data);
}
else
{
for (const auto & elem : set)
argument_columns[0]->insert(elem.getValue());
}
/// proton: ends.
for (const auto & data : extra_data_since_last_finalize)
argument_columns[0]->insert(data);

return argument_columns;
}
Expand All @@ -108,27 +97,45 @@ struct AggregateFunctionDistinctGenericData
using Set = HashSetWithSavedHashWithStackMemory<StringRef, StringRefHash, 4>;
using Self = AggregateFunctionDistinctGenericData;
Set set;
/// proton: starts. Resolve multiple finalizations problem for streaming global aggreagtion query
/// Resolve multiple finalizations problem for streaming global aggreagtion query
/// Optimized, put the new coming data that the set does not have into extra_data_since_last_finalize.
std::vector<StringRef> extra_data_since_last_finalize;
bool use_extra_data = false; /// Optimized, only streaming global aggreagtion query need to use extra data after first finalization.
/// proton: ends.

NO_SERDE std::vector<uintptr_t> merged_places;

bool use_extra_data = false;

void merge(const Self & rhs, Arena * arena)
{
Set::LookupResult it;
bool inserted;
for (const auto & elem : rhs.set)
/// proton: starts.
/// Deduplicate owned extra data based on rhs
for (auto next = extra_data_since_last_finalize.begin(); next != extra_data_since_last_finalize.end();)
{
set.emplace(ArenaKeyHolder{elem.getValue(), *arena}, it, inserted);
if (rhs.set.find(*next) != rhs.set.end())
next = extra_data_since_last_finalize.erase(next);
else
++next;
}

if (use_extra_data && inserted)
/// Merge and deduplicate rhs' extra data
for (const auto & data : rhs.extra_data_since_last_finalize)
{
set.emplace(ArenaKeyHolder{data, *arena}, it, inserted);
if (inserted)
{
assert(it);
extra_data_since_last_finalize.emplace_back(it->getValue());
}
}
/// proton: ends.

set.merge(rhs.set);

uintptr_t merged_place = reinterpret_cast<uintptr_t>(&rhs);
auto find_place = std::find(merged_places.begin(), merged_places.end(),merged_place);
if (find_place == merged_places.end())
merged_places.emplace_back(merged_place);

}

void serialize(WriteBuffer & buf) const
Expand All @@ -137,13 +144,11 @@ struct AggregateFunctionDistinctGenericData
for (const auto & elem : set)
writeStringBinary(elem.getValue(), buf);

/// proton: starts.
writeVarUInt(extra_data_since_last_finalize.size(), buf);
for (const auto & data : extra_data_since_last_finalize)
writeStringBinary(data, buf);

writeBoolText(use_extra_data, buf);
/// proton: ends.
}

void deserialize(ReadBuffer & buf, Arena * arena)
Expand All @@ -153,15 +158,13 @@ struct AggregateFunctionDistinctGenericData
for (size_t i = 0; i < size; ++i)
set.insert(readStringBinaryInto(*arena, buf));

/// proton: starts.
size_t extra_size;
readVarUInt(extra_size, buf);
extra_data_since_last_finalize.resize(extra_size);
for (size_t i = 0; i < extra_size; ++i)
extra_data_since_last_finalize[i] = readStringBinaryInto(*arena, buf);

readBoolText(use_extra_data, buf);
/// proton: ends.
}
};

Expand All @@ -174,33 +177,21 @@ struct AggregateFunctionDistinctSingleGenericData : public AggregateFunctionDist
bool inserted;
auto key_holder = getKeyHolder<is_plain_column>(*columns[0], row_num, *arena);
set.emplace(key_holder, it, inserted);

/// proton: starts.
if (use_extra_data && inserted)

if (inserted)
{
assert(it);
extra_data_since_last_finalize.emplace_back(it->getValue());
}
/// proton: ends.
}

MutableColumns getArguments(const DataTypes & argument_types) const
{
MutableColumns argument_columns;
argument_columns.emplace_back(argument_types[0]->createColumn());

/// proton: starts.
if (use_extra_data)
{
for (const auto & data : extra_data_since_last_finalize)
deserializeAndInsert<is_plain_column>(data, *argument_columns[0]);
}
else
{
for (const auto & elem : set)
deserializeAndInsert<is_plain_column>(elem.getValue(), *argument_columns[0]);
}
/// proton: ends.
for (const auto & data : extra_data_since_last_finalize)
deserializeAndInsert<is_plain_column>(data, *argument_columns[0]);

return argument_columns;
}
Expand All @@ -224,13 +215,11 @@ struct AggregateFunctionDistinctMultipleGenericData : public AggregateFunctionDi
auto key_holder = SerializedKeyHolder{value, *arena};
set.emplace(key_holder, it, inserted);

/// proton: starts.
if (use_extra_data && inserted)
if (inserted)
{
assert(it);
extra_data_since_last_finalize.emplace_back(it->getValue());
}
/// proton: ends.
}

MutableColumns getArguments(const DataTypes & argument_types) const
Expand All @@ -239,26 +228,12 @@ struct AggregateFunctionDistinctMultipleGenericData : public AggregateFunctionDi
for (size_t i = 0; i < argument_types.size(); ++i)
argument_columns[i] = argument_types[i]->createColumn();

/// proton: starts.
if (use_extra_data)
{
for (const auto & data : extra_data_since_last_finalize)
{
const char * begin = data.data;
for (auto & column : argument_columns)
begin = column->deserializeAndInsertFromArena(begin);
}
}
else
for (const auto & data : extra_data_since_last_finalize)
{
for (const auto & elem : set)
{
const char * begin = elem.getValue().data;
for (auto & column : argument_columns)
begin = column->deserializeAndInsertFromArena(begin);
}
const char * begin = data.data;
for (auto & column : argument_columns)
begin = column->deserializeAndInsertFromArena(begin);
}
/// proton: ends.

return argument_columns;
}
Expand Down Expand Up @@ -327,16 +302,33 @@ class AggregateFunctionDistinct : public IAggregateFunctionDataHelper<Data, Aggr
arguments_raw[i] = arguments[i].get();

assert(!arguments.empty());
/// Accumulation for current data block
nested_func->addBatchSinglePlace(0, arguments[0]->size(), getNestedPlace(place), arguments_raw.data(), arena);
if constexpr (MergeResult)
nested_func->insertMergeResultInto(getNestedPlace(place), to, arena);
else
nested_func->insertResultInto(getNestedPlace(place), to, arena);

/// proton: starts. Next finalization will use extra data, used in streaming global aggregation query.
this->data(place).use_extra_data = true;
this->data(place).extra_data_since_last_finalize.clear();
/// proton: ends.

/// Clear all the extra data in related blocks
for (auto & item : this->data(place).merged_places)
this->data(reinterpret_cast<AggregateDataPtr>(item)).extra_data_since_last_finalize.clear();

/* Add the temp data to sum so that the distinct outcome can accumulate.
The block we using here may be a blank block, therefore we need to find data block and store sum in it.
For example: if the current sum is 70, what we stored in sum{} of blank block will be 70, but it will be
deleted, so we find the information of the first block stored in merged_places and put 70 to its sum
using addBatchSinglePlace.
*/
Jasmine-ge marked this conversation as resolved.
Show resolved Hide resolved
if(this->data(place).merged_places.size())
nested_func->addBatchSinglePlace(0, arguments[0]->size(), getNestedPlace(reinterpret_cast<AggregateDataPtr>(this->data(place).merged_places[0])), arguments_raw.data(), arena);

if (this->data(place).extra_data_since_last_finalize.size())
this->data(place).extra_data_since_last_finalize.clear();

if (this->data(place).merged_places.size())
this->data(place).merged_places.clear();

Jasmine-ge marked this conversation as resolved.
Show resolved Hide resolved
}

void insertResultInto(AggregateDataPtr __restrict place, IColumn & to, Arena * arena) const override
Expand Down
Loading
Loading