From b59f9a1a320d3e7d2435b277677e6c8435bd449b Mon Sep 17 00:00:00 2001 From: zhangstar333 Date: Fri, 20 Jun 2025 13:23:28 +0800 Subject: [PATCH] [Bug](aggregate) fix bitmap_union return error result in query sql --- .../aggregate_function_bitmap.h | 4 + .../operator/streaming_agg_operator_test.cpp | 91 +++++++++++++++++++ 2 files changed, 95 insertions(+) diff --git a/be/src/vec/aggregate_functions/aggregate_function_bitmap.h b/be/src/vec/aggregate_functions/aggregate_function_bitmap.h index a442fc3e1c55b2..0c7b41f694c8f9 100644 --- a/be/src/vec/aggregate_functions/aggregate_function_bitmap.h +++ b/be/src/vec/aggregate_functions/aggregate_function_bitmap.h @@ -50,6 +50,7 @@ struct AggregateFunctionBitmapUnionOp { template static void add(BitmapValue& res, const T& data, bool& is_first) { res.add(data); + is_first = false; } static void add(BitmapValue& res, const BitmapValue& data, bool& is_first) { @@ -63,6 +64,9 @@ struct AggregateFunctionBitmapUnionOp { static void add_batch(BitmapValue& res, std::vector& data, bool& is_first) { res.fastunion(data); + // after fastunion, res myabe have many datas, so is_first should be false + // then call add function will not reset res + is_first = false; } static void merge(BitmapValue& res, const BitmapValue& data, bool& is_first) { diff --git a/be/test/pipeline/operator/streaming_agg_operator_test.cpp b/be/test/pipeline/operator/streaming_agg_operator_test.cpp index 86c2b973b4e250..8073370c2c5863 100644 --- a/be/test/pipeline/operator/streaming_agg_operator_test.cpp +++ b/be/test/pipeline/operator/streaming_agg_operator_test.cpp @@ -29,7 +29,9 @@ #include "testutil/mock/mock_agg_fn_evaluator.h" #include "testutil/mock/mock_runtime_state.h" #include "testutil/mock/mock_slot_ref.h" +#include "util/bitmap_value.h" #include "util/jsonb_document.h" +#include "vec/data_types/data_type_bitmap.h" #include "vec/data_types/data_type_number.h" namespace doris::pipeline { @@ -315,4 +317,93 @@ TEST_F(StreamingAggOperatorTest, test3) { { EXPECT_TRUE(local_state->close(state.get()).ok()); } } +TEST_F(StreamingAggOperatorTest, test4) { + op->_aggregate_evaluators.push_back(vectorized::create_agg_fn( + pool, "bitmap_union", {std::make_shared()}, false)); + op->_pool = &pool; + op->_needs_finalize = false; + op->_is_merge = false; + + EXPECT_TRUE(op->set_child(child_op)); + + EXPECT_TRUE(op->prepare(state.get()).ok()); + op->_probe_expr_ctxs = MockSlotRef::create_mock_contexts( + 1, std::make_shared(std::make_shared())); + + { + auto local_state = std::make_unique(state.get(), op.get()); + LocalStateInfo info {.parent_profile = &profile, + .scan_ranges = {}, + .shared_state = nullptr, + .shared_state_map = {}, + .task_idx = 0}; + + EXPECT_TRUE(local_state->init(state.get(), info).ok()); + state->resize_op_id_to_local_state(-100); + state->emplace_local_state(op->operator_id(), std::move(local_state)); + } + + { + local_state = + static_cast(state->get_local_state(op->operator_id())); + EXPECT_TRUE(local_state->open(state.get()).ok()); + } + + { + std::vector bitmaps = {BitmapValue(1), BitmapValue(2), BitmapValue(3), + BitmapValue(4), BitmapValue(5), BitmapValue(6)}; + + vectorized::Block block { + ColumnHelper::create_column_with_name(bitmaps), + ColumnHelper::create_nullable_column_with_name( + {1, 1, 2, 2, 2, 3}, {false, false, false, false, false, true})}; + local_state->should_not_do_pre_agg = false; + local_state->_should_expand_hash_table = true; + std::cout << block.dump_data() << std::endl; + auto st = op->push(state.get(), &block, true); + EXPECT_TRUE(st.ok()) << st.msg(); + + EXPECT_EQ(local_state->_get_hash_table_size(), 3); + EXPECT_TRUE(op->need_more_input_data(state.get())); + } + + { + local_state->should_not_do_pre_agg = false; + local_state->_should_expand_hash_table = false; + std::vector bitmaps2 = {BitmapValue(6), BitmapValue(7), BitmapValue(8), + BitmapValue(9), BitmapValue(10), BitmapValue(11)}; + vectorized::Block block { + ColumnHelper::create_column_with_name(bitmaps2), + ColumnHelper::create_nullable_column_with_name( + {2, 2, 2, 2, 4, 4}, {false, false, false, false, false, false})}; + std::cout << block.dump_data() << std::endl; + auto st = op->push(state.get(), &block, true); + EXPECT_TRUE(st.ok()) << st.msg(); + + EXPECT_EQ(local_state->_get_hash_table_size(), 4); + EXPECT_TRUE(op->need_more_input_data(state.get())); + } + + { + bool eos = false; + vectorized::Block block; + auto st = op->pull(state.get(), &block, &eos); + std::cout << block.dump_data() << std::endl; + EXPECT_TRUE(st.ok()) << st.msg(); + EXPECT_TRUE(eos); + EXPECT_EQ(block.rows(), 4); + std::vector bitmaps_res = {BitmapValue({1, 2}), + BitmapValue({3, 4, 5, 6, 7, 8, 9}), + BitmapValue({10, 11}), BitmapValue(6)}; + vectorized::Block res_block { + ColumnHelper::create_nullable_column_with_name( + {1, 2, 4, 5}, {false, false, false, true}), + ColumnHelper::create_column_with_name(bitmaps_res)}; + EXPECT_TRUE(ColumnHelper::block_equal_with_sort(block, res_block)) + << "Expected: " << res_block.dump_data() << ", but got: " << block.dump_data(); + } + + { EXPECT_TRUE(local_state->close(state.get()).ok()); } +} + } // namespace doris::pipeline