From cceecddbfe02aa104ef27d39111cf9f3aeaad704 Mon Sep 17 00:00:00 2001 From: Jasmine-ge Date: Mon, 2 Sep 2024 11:06:28 +0800 Subject: [PATCH 1/2] fix the problem of multishard incorrect result on distinct --- .../Streaming/AggregateFunctionDistinct.h | 14 +++++++++++++- .../test_stream_smoke/0012_multishards7.yaml | 8 ++++---- 2 files changed, 17 insertions(+), 5 deletions(-) diff --git a/src/AggregateFunctions/Streaming/AggregateFunctionDistinct.h b/src/AggregateFunctions/Streaming/AggregateFunctionDistinct.h index f763fd409c..b1cda89298 100644 --- a/src/AggregateFunctions/Streaming/AggregateFunctionDistinct.h +++ b/src/AggregateFunctions/Streaming/AggregateFunctionDistinct.h @@ -107,9 +107,21 @@ struct AggregateFunctionDistinctSingleNumericData for (auto it = extra_data_since_last_finalize.begin(); it != extra_data_since_last_finalize.end();) { if (rhs.set.find(*it) != rhs.set.end()) - it = extra_data_since_last_finalize.erase(it); + { + auto found = std::find(rhs.extra_data_since_last_finalize.begin(), rhs.extra_data_since_last_finalize.end(), *it); + if (found == rhs.extra_data_since_last_finalize.end()) + { + it = extra_data_since_last_finalize.erase(it); + } + else + { + ++it; + } + } else + { ++it; + } } /// Merge and deduplicate rhs' extra data diff --git a/tests/stream/test_stream_smoke/0012_multishards7.yaml b/tests/stream/test_stream_smoke/0012_multishards7.yaml index ef87bb5944..642fe57681 100644 --- a/tests/stream/test_stream_smoke/0012_multishards7.yaml +++ b/tests/stream/test_stream_smoke/0012_multishards7.yaml @@ -606,14 +606,14 @@ tests: query_type: table depends_on: '1366' wait: 1 - query: insert into test13_multishard_7(id, val) values(1, 30), (2, 40), (3, 60) + query: insert into test13_multishard_7(id, val) values(1, 40), (1, 30) - client: python query_type: table kill: '1366' kill_wait: 2 wait: 1 - query: insert into test13_multishard_7(id, val) values(1, 30), (1, 40), (1, 60), (2, 40), (3, 40) + query: insert into test13_multishard_7(id, val) values(1, 30), (1, 50), (2, 50), (3, 50) - client: python query_type: table @@ -624,5 +624,5 @@ tests: expected_results: - query_id: '1366' expected_results: - - [130, 130] - - [130, 340] + - [70, 70] + - [120, 250] From 3ae578ddd817a4a81da0d012224a1493dd0296bc Mon Sep 17 00:00:00 2001 From: Jasmine-ge Date: Mon, 2 Sep 2024 12:02:38 +0800 Subject: [PATCH 2/2] code revise --- .../Streaming/AggregateFunctionDistinct.h | 24 ++++++++++++------- 1 file changed, 15 insertions(+), 9 deletions(-) diff --git a/src/AggregateFunctions/Streaming/AggregateFunctionDistinct.h b/src/AggregateFunctions/Streaming/AggregateFunctionDistinct.h index b1cda89298..170abe626e 100644 --- a/src/AggregateFunctions/Streaming/AggregateFunctionDistinct.h +++ b/src/AggregateFunctions/Streaming/AggregateFunctionDistinct.h @@ -108,15 +108,12 @@ struct AggregateFunctionDistinctSingleNumericData { if (rhs.set.find(*it) != rhs.set.end()) { - auto found = std::find(rhs.extra_data_since_last_finalize.begin(), rhs.extra_data_since_last_finalize.end(), *it); - if (found == rhs.extra_data_since_last_finalize.end()) - { - it = extra_data_since_last_finalize.erase(it); - } - else - { + bool is_new_data = std::find(rhs.extra_data_since_last_finalize.begin(), rhs.extra_data_since_last_finalize.end(), *it) + != rhs.extra_data_since_last_finalize.end(); + if (is_new_data) ++it; - } + else + it = extra_data_since_last_finalize.erase(it); } else { @@ -188,9 +185,18 @@ struct AggregateFunctionDistinctGenericData for (auto next = extra_data_since_last_finalize.begin(); next != extra_data_since_last_finalize.end();) { if (rhs.set.find(*next) != rhs.set.end()) - next = extra_data_since_last_finalize.erase(next); + { + bool is_new_data = std::find(rhs.extra_data_since_last_finalize.begin(), rhs.extra_data_since_last_finalize.end(), *next) + != rhs.extra_data_since_last_finalize.end(); + if (is_new_data) + ++next; + else + next = extra_data_since_last_finalize.erase(next); + } else + { ++next; + } } /// Merge and deduplicate rhs' extra data