diff --git a/be/src/olap/memtable.cpp b/be/src/olap/memtable.cpp index b0d930e6d09eaf..1bc3960dd6dfcc 100644 --- a/be/src/olap/memtable.cpp +++ b/be/src/olap/memtable.cpp @@ -156,8 +156,6 @@ MemTable::~MemTable() { } } } - std::for_each(_row_in_blocks.begin(), _row_in_blocks.end(), - std::default_delete()); // Arena has to be destroyed after agg state, because some agg state's memory may be // allocated in arena. _arena.reset(); @@ -216,7 +214,7 @@ Status MemTable::insert(const vectorized::Block* input_block, RETURN_IF_ERROR(_input_mutable_block.add_rows(input_block, row_idxs.data(), row_idxs.data() + num_rows, &_column_offset)); for (int i = 0; i < num_rows; i++) { - _row_in_blocks.emplace_back(new RowInBlock {cursor_in_mutableblock + i}); + _row_in_blocks.emplace_back(std::make_shared(cursor_in_mutableblock + i)); } _stat.raw_rows += num_rows; @@ -264,7 +262,7 @@ size_t MemTable::_sort() { // sort new rows Tie tie = Tie(_last_sorted_pos, _row_in_blocks.size()); for (size_t i = 0; i < _tablet_schema->num_key_columns(); i++) { - auto cmp = [&](const RowInBlock* lhs, const RowInBlock* rhs) -> int { + auto cmp = [&](RowInBlock* lhs, RowInBlock* rhs) -> int { return _input_mutable_block.compare_one_column(lhs->_row_pos, rhs->_row_pos, i, -1); }; _sort_one_column(_row_in_blocks, tie, cmp); @@ -275,16 +273,17 @@ size_t MemTable::_sort() { while (iter.next()) { pdqsort(std::next(_row_in_blocks.begin(), iter.left()), std::next(_row_in_blocks.begin(), iter.right()), - [&is_dup](const RowInBlock* lhs, const RowInBlock* rhs) -> bool { + [&is_dup](const std::shared_ptr& lhs, + const std::shared_ptr& rhs) -> bool { return is_dup ? lhs->_row_pos > rhs->_row_pos : lhs->_row_pos < rhs->_row_pos; }); same_keys_num += iter.right() - iter.left(); } // merge new rows and old rows _vec_row_comparator->set_block(&_input_mutable_block); - auto cmp_func = [this, is_dup, &same_keys_num](const RowInBlock* l, - const RowInBlock* r) -> bool { - auto value = (*(this->_vec_row_comparator))(l, r); + auto cmp_func = [this, is_dup, &same_keys_num](const std::shared_ptr& l, + const std::shared_ptr& r) -> bool { + auto value = (*(this->_vec_row_comparator))(l.get(), r.get()); if (value == 0) { same_keys_num++; return is_dup ? l->_row_pos > r->_row_pos : l->_row_pos < r->_row_pos; @@ -308,14 +307,10 @@ Status MemTable::_sort_by_cluster_keys() { auto clone_block = in_block.clone_without_columns(); _output_mutable_block = vectorized::MutableBlock::build_mutable_block(&clone_block); - std::vector row_in_blocks; - std::unique_ptr> row_in_blocks_deleter((int*)0x01, [&](int*) { - std::for_each(row_in_blocks.begin(), row_in_blocks.end(), - std::default_delete()); - }); + std::vector> row_in_blocks; row_in_blocks.reserve(mutable_block.rows()); for (size_t i = 0; i < mutable_block.rows(); i++) { - row_in_blocks.emplace_back(new RowInBlock {i}); + row_in_blocks.emplace_back(std::make_shared(i)); } Tie tie = Tie(0, mutable_block.rows()); @@ -336,9 +331,8 @@ Status MemTable::_sort_by_cluster_keys() { while (iter.next()) { pdqsort(std::next(row_in_blocks.begin(), iter.left()), std::next(row_in_blocks.begin(), iter.right()), - [](const RowInBlock* lhs, const RowInBlock* rhs) -> bool { - return lhs->_row_pos < rhs->_row_pos; - }); + [](const std::shared_ptr& lhs, const std::shared_ptr& rhs) + -> bool { return lhs->_row_pos < rhs->_row_pos; }); } in_block = mutable_block.to_block(); @@ -353,16 +347,16 @@ Status MemTable::_sort_by_cluster_keys() { row_pos_vec.data() + in_block.rows(), &_column_offset); } -void MemTable::_sort_one_column(std::vector& row_in_blocks, Tie& tie, - std::function cmp) { +void MemTable::_sort_one_column(std::vector>& row_in_blocks, Tie& tie, + std::function cmp) { auto iter = tie.iter(); while (iter.next()) { - pdqsort(std::next(row_in_blocks.begin(), iter.left()), - std::next(row_in_blocks.begin(), iter.right()), - [&cmp](auto lhs, auto rhs) -> bool { return cmp(lhs, rhs) < 0; }); + pdqsort(std::next(row_in_blocks.begin(), static_cast(iter.left())), + std::next(row_in_blocks.begin(), static_cast(iter.right())), + [&cmp](auto lhs, auto rhs) -> bool { return cmp(lhs.get(), rhs.get()) < 0; }); tie[iter.left()] = 0; - for (int i = iter.left() + 1; i < iter.right(); i++) { - tie[i] = (cmp(row_in_blocks[i - 1], row_in_blocks[i]) == 0); + for (auto i = iter.left() + 1; i < iter.right(); i++) { + tie[i] = (cmp(row_in_blocks[i - 1].get(), row_in_blocks[i].get()) == 0); } } } @@ -423,14 +417,14 @@ void MemTable::_aggregate() { vectorized::MutableBlock::build_mutable_block(&in_block); _vec_row_comparator->set_block(&mutable_block); auto& block_data = in_block.get_columns_with_type_and_name(); - std::vector temp_row_in_blocks; + std::vector> temp_row_in_blocks; temp_row_in_blocks.reserve(_last_sorted_pos); RowInBlock* prev_row = nullptr; int row_pos = -1; //only init agg if needed - for (int i = 0; i < _row_in_blocks.size(); i++) { - if (!temp_row_in_blocks.empty() && - (*_vec_row_comparator)(prev_row, _row_in_blocks[i]) == 0) { + for (const auto& row_ptr : _row_in_blocks) { + RowInBlock* current_row = row_ptr.get(); + if (!temp_row_in_blocks.empty() && (*_vec_row_comparator)(prev_row, current_row) == 0) { if (!prev_row->has_init_agg()) { prev_row->init_agg_places( _arena->aligned_alloc(_total_size_of_aggregate_states, 16), @@ -445,20 +439,20 @@ void MemTable::_aggregate() { } } _stat.merged_rows++; - _aggregate_two_row_in_block(mutable_block, _row_in_blocks[i], prev_row); + _aggregate_two_row_in_block(mutable_block, current_row, prev_row); } else { - prev_row = _row_in_blocks[i]; + prev_row = current_row; if (!temp_row_in_blocks.empty()) { // no more rows to merge for prev row, finalize it - _finalize_one_row(temp_row_in_blocks.back(), block_data, row_pos); + _finalize_one_row(temp_row_in_blocks.back().get(), block_data, row_pos); } - temp_row_in_blocks.push_back(prev_row); + temp_row_in_blocks.push_back(row_ptr); row_pos++; } } if (!temp_row_in_blocks.empty()) { - // finalize the last low - _finalize_one_row(temp_row_in_blocks.back(), block_data, row_pos); + // finalize the last row + _finalize_one_row(temp_row_in_blocks.back().get(), block_data, row_pos); } if constexpr (!is_final) { // if is not final, we collect the agg results to input_block and then continue to insert diff --git a/be/src/olap/memtable.h b/be/src/olap/memtable.h index 4ae92c2d2d8949..1e08891f3f8fe7 100644 --- a/be/src/olap/memtable.h +++ b/be/src/olap/memtable.h @@ -250,8 +250,8 @@ class MemTable { //return number of same keys size_t _sort(); Status _sort_by_cluster_keys(); - void _sort_one_column(std::vector& row_in_blocks, Tie& tie, - std::function cmp); + void _sort_one_column(std::vector>& row_in_blocks, Tie& tie, + std::function cmp); template void _finalize_one_row(RowInBlock* row, const vectorized::ColumnsWithTypeAndName& block_data, int row_pos); @@ -264,7 +264,7 @@ class MemTable { std::vector _agg_functions; std::vector _offsets_of_aggregate_states; size_t _total_size_of_aggregate_states; - std::vector _row_in_blocks; + std::vector> _row_in_blocks; size_t _num_columns; int32_t _seq_col_idx_in_block = -1;