diff --git a/be/src/olap/memtable.cpp b/be/src/olap/memtable.cpp index 76c667f60ae140..ab4f9e8fc82c76 100644 --- a/be/src/olap/memtable.cpp +++ b/be/src/olap/memtable.cpp @@ -83,7 +83,7 @@ MemTable::MemTable(int64_t tablet_id, std::shared_ptr tablet_schem } // TODO: Support ZOrderComparator in the future _init_columns_offset_by_slot_descs(slot_descs, tuple_desc); - _row_in_blocks = std::make_unique>(); + _row_in_blocks = std::make_unique>>(); } void MemTable::_init_columns_offset_by_slot_descs(const std::vector* slot_descs, @@ -169,8 +169,6 @@ MemTable::~MemTable() { } } - std::for_each(_row_in_blocks->begin(), _row_in_blocks->end(), - std::default_delete()); // Arena has to be destroyed after agg state, because some agg state's memory may be // allocated in arena. _arena.reset(); @@ -240,7 +238,7 @@ Status MemTable::insert(const vectorized::Block* input_block, RETURN_IF_ERROR(_input_mutable_block.add_rows(input_block, row_idxs.data(), row_idxs.data() + num_rows, &_column_offset)); for (int i = 0; i < num_rows; i++) { - _row_in_blocks->emplace_back(new RowInBlock {cursor_in_mutableblock + i}); + _row_in_blocks->emplace_back(std::make_shared(cursor_in_mutableblock + i)); } _stat.raw_rows += num_rows; @@ -312,7 +310,7 @@ size_t MemTable::_sort() { // sort new rows Tie tie = Tie(_last_sorted_pos, _row_in_blocks->size()); for (size_t i = 0; i < _tablet_schema->num_key_columns(); i++) { - auto cmp = [&](const RowInBlock* lhs, const RowInBlock* rhs) -> int { + auto cmp = [&](RowInBlock* lhs, RowInBlock* rhs) -> int { return _input_mutable_block.compare_one_column(lhs->_row_pos, rhs->_row_pos, i, -1); }; _sort_one_column(*_row_in_blocks, tie, cmp); @@ -323,16 +321,17 @@ size_t MemTable::_sort() { while (iter.next()) { pdqsort(std::next(_row_in_blocks->begin(), iter.left()), std::next(_row_in_blocks->begin(), iter.right()), - [&is_dup](const RowInBlock* lhs, const RowInBlock* rhs) -> bool { + [&is_dup](const std::shared_ptr& lhs, + const std::shared_ptr& rhs) -> bool { return is_dup ? lhs->_row_pos > rhs->_row_pos : lhs->_row_pos < rhs->_row_pos; }); same_keys_num += iter.right() - iter.left(); } // merge new rows and old rows _vec_row_comparator->set_block(&_input_mutable_block); - auto cmp_func = [this, is_dup, &same_keys_num](const RowInBlock* l, - const RowInBlock* r) -> bool { - auto value = (*(this->_vec_row_comparator))(l, r); + auto cmp_func = [this, is_dup, &same_keys_num](const std::shared_ptr& l, + const std::shared_ptr& r) -> bool { + auto value = (*(this->_vec_row_comparator))(l.get(), r.get()); if (value == 0) { same_keys_num++; return is_dup ? l->_row_pos > r->_row_pos : l->_row_pos < r->_row_pos; @@ -356,14 +355,10 @@ Status MemTable::_sort_by_cluster_keys() { auto clone_block = in_block.clone_without_columns(); _output_mutable_block = vectorized::MutableBlock::build_mutable_block(&clone_block); - DorisVector row_in_blocks; - std::unique_ptr> row_in_blocks_deleter((int*)0x01, [&](int*) { - std::for_each(row_in_blocks.begin(), row_in_blocks.end(), - std::default_delete()); - }); + DorisVector> row_in_blocks; row_in_blocks.reserve(mutable_block.rows()); for (size_t i = 0; i < mutable_block.rows(); i++) { - row_in_blocks.emplace_back(new RowInBlock {i}); + row_in_blocks.emplace_back(std::make_shared(i)); } Tie tie = Tie(0, mutable_block.rows()); @@ -384,9 +379,8 @@ Status MemTable::_sort_by_cluster_keys() { while (iter.next()) { pdqsort(std::next(row_in_blocks.begin(), iter.left()), std::next(row_in_blocks.begin(), iter.right()), - [](const RowInBlock* lhs, const RowInBlock* rhs) -> bool { - return lhs->_row_pos < rhs->_row_pos; - }); + [](const std::shared_ptr& lhs, const std::shared_ptr& rhs) + -> bool { return lhs->_row_pos < rhs->_row_pos; }); } in_block = mutable_block.to_block(); @@ -405,16 +399,16 @@ Status MemTable::_sort_by_cluster_keys() { row_pos_vec.data() + in_block.rows(), &column_offset); } -void MemTable::_sort_one_column(DorisVector& row_in_blocks, Tie& tie, - std::function cmp) { +void MemTable::_sort_one_column(DorisVector>& row_in_blocks, Tie& tie, + std::function cmp) { auto iter = tie.iter(); while (iter.next()) { pdqsort(std::next(row_in_blocks.begin(), static_cast(iter.left())), std::next(row_in_blocks.begin(), static_cast(iter.right())), - [&cmp](auto lhs, auto rhs) -> bool { return cmp(lhs, rhs) < 0; }); + [&cmp](auto lhs, auto rhs) -> bool { return cmp(lhs.get(), rhs.get()) < 0; }); tie[iter.left()] = 0; for (auto i = iter.left() + 1; i < iter.right(); i++) { - tie[i] = (cmp(row_in_blocks[i - 1], row_in_blocks[i]) == 0); + tie[i] = (cmp(row_in_blocks[i - 1].get(), row_in_blocks[i].get()) == 0); } } } @@ -475,7 +469,7 @@ void MemTable::_aggregate() { vectorized::MutableBlock::build_mutable_block(&in_block); _vec_row_comparator->set_block(&mutable_block); auto& block_data = in_block.get_columns_with_type_and_name(); - DorisVector temp_row_in_blocks; + DorisVector> temp_row_in_blocks; temp_row_in_blocks.reserve(_last_sorted_pos); RowInBlock* prev_row = nullptr; int row_pos = -1; @@ -494,7 +488,8 @@ void MemTable::_aggregate() { }; if (!has_skip_bitmap_col || _seq_col_idx_in_block == -1) { - for (RowInBlock* cur_row : *_row_in_blocks) { + for (const auto& cur_row_ptr : *_row_in_blocks) { + RowInBlock* cur_row = cur_row_ptr.get(); if (!temp_row_in_blocks.empty() && (*_vec_row_comparator)(prev_row, cur_row) == 0) { if (!prev_row->has_init_agg()) { init_for_agg(prev_row); @@ -505,15 +500,16 @@ void MemTable::_aggregate() { prev_row = cur_row; if (!temp_row_in_blocks.empty()) { // no more rows to merge for prev row, finalize it - _finalize_one_row(temp_row_in_blocks.back(), block_data, row_pos); + _finalize_one_row(temp_row_in_blocks.back().get(), block_data, + row_pos); } - temp_row_in_blocks.push_back(prev_row); + temp_row_in_blocks.push_back(cur_row_ptr); row_pos++; } } if (!temp_row_in_blocks.empty()) { // finalize the last low - _finalize_one_row(temp_row_in_blocks.back(), block_data, row_pos); + _finalize_one_row(temp_row_in_blocks.back().get(), block_data, row_pos); } } else { // For flexible partial update and the table has sequence column, considering the following situation: @@ -539,8 +535,9 @@ void MemTable::_aggregate() { row_without_seq_col = nullptr; } }; - auto add_row = [&](RowInBlock* row, bool with_seq_col) { - temp_row_in_blocks.push_back(row); + auto add_row = [&](const std::shared_ptr& row_ptr, bool with_seq_col) { + RowInBlock* row = row_ptr.get(); + temp_row_in_blocks.push_back(row_ptr); row_pos++; if (with_seq_col) { row_with_seq_col = row; @@ -553,7 +550,8 @@ void MemTable::_aggregate() { auto& skip_bitmaps = assert_cast( mutable_block.mutable_columns()[_skip_bitmap_col_idx].get()) ->get_data(); - for (auto* cur_row : *_row_in_blocks) { + for (const auto& cur_row_ptr : *_row_in_blocks) { + RowInBlock* cur_row = cur_row_ptr.get(); const BitmapValue& skip_bitmap = skip_bitmaps[cur_row->_row_pos]; bool with_seq_col = !skip_bitmap.contains(_seq_col_unique_id); // compare keys, the keys of row_with_seq_col and row_with_seq_col is the same, @@ -562,7 +560,7 @@ void MemTable::_aggregate() { if (prev_row != nullptr && (*_vec_row_comparator)(prev_row, cur_row) == 0) { prev_row = (with_seq_col ? row_with_seq_col : row_without_seq_col); if (prev_row == nullptr) { - add_row(cur_row, with_seq_col); + add_row(cur_row_ptr, with_seq_col); continue; } if (!prev_row->has_init_agg()) { @@ -573,7 +571,7 @@ void MemTable::_aggregate() { } else { // no more rows to merge for prev rows, finalize them finalize_rows(); - add_row(cur_row, with_seq_col); + add_row(cur_row_ptr, with_seq_col); } } // finalize the last lows diff --git a/be/src/olap/memtable.h b/be/src/olap/memtable.h index 0be8cdd695701e..637a1dcd545fd1 100644 --- a/be/src/olap/memtable.h +++ b/be/src/olap/memtable.h @@ -249,8 +249,8 @@ class MemTable { //return number of same keys size_t _sort(); Status _sort_by_cluster_keys(); - void _sort_one_column(DorisVector& row_in_blocks, Tie& tie, - std::function cmp); + void _sort_one_column(DorisVector>& row_in_blocks, Tie& tie, + std::function cmp); template void _finalize_one_row(RowInBlock* row, const vectorized::ColumnsWithTypeAndName& block_data, int row_pos); @@ -263,7 +263,7 @@ class MemTable { std::vector _agg_functions; std::vector _offsets_of_aggregate_states; size_t _total_size_of_aggregate_states; - std::unique_ptr> _row_in_blocks; + std::unique_ptr>> _row_in_blocks; size_t _num_columns; int32_t _seq_col_idx_in_block = -1;