Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
62 changes: 30 additions & 32 deletions be/src/olap/memtable.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,7 @@ MemTable::MemTable(int64_t tablet_id, std::shared_ptr<TabletSchema> tablet_schem
}
// TODO: Support ZOrderComparator in the future
_init_columns_offset_by_slot_descs(slot_descs, tuple_desc);
_row_in_blocks = std::make_unique<DorisVector<RowInBlock*>>();
_row_in_blocks = std::make_unique<DorisVector<std::shared_ptr<RowInBlock>>>();
}

void MemTable::_init_columns_offset_by_slot_descs(const std::vector<SlotDescriptor*>* slot_descs,
Expand Down Expand Up @@ -169,8 +169,6 @@ MemTable::~MemTable() {
}
}

std::for_each(_row_in_blocks->begin(), _row_in_blocks->end(),
std::default_delete<RowInBlock>());
// Arena has to be destroyed after agg state, because some agg state's memory may be
// allocated in arena.
_arena.reset();
Expand Down Expand Up @@ -240,7 +238,7 @@ Status MemTable::insert(const vectorized::Block* input_block,
RETURN_IF_ERROR(_input_mutable_block.add_rows(input_block, row_idxs.data(),
row_idxs.data() + num_rows, &_column_offset));
for (int i = 0; i < num_rows; i++) {
_row_in_blocks->emplace_back(new RowInBlock {cursor_in_mutableblock + i});
_row_in_blocks->emplace_back(std::make_shared<RowInBlock>(cursor_in_mutableblock + i));
}

_stat.raw_rows += num_rows;
Expand Down Expand Up @@ -312,7 +310,7 @@ size_t MemTable::_sort() {
// sort new rows
Tie tie = Tie(_last_sorted_pos, _row_in_blocks->size());
for (size_t i = 0; i < _tablet_schema->num_key_columns(); i++) {
auto cmp = [&](const RowInBlock* lhs, const RowInBlock* rhs) -> int {
auto cmp = [&](RowInBlock* lhs, RowInBlock* rhs) -> int {
return _input_mutable_block.compare_one_column(lhs->_row_pos, rhs->_row_pos, i, -1);
};
_sort_one_column(*_row_in_blocks, tie, cmp);
Expand All @@ -323,16 +321,17 @@ size_t MemTable::_sort() {
while (iter.next()) {
pdqsort(std::next(_row_in_blocks->begin(), iter.left()),
std::next(_row_in_blocks->begin(), iter.right()),
[&is_dup](const RowInBlock* lhs, const RowInBlock* rhs) -> bool {
[&is_dup](const std::shared_ptr<RowInBlock>& lhs,
const std::shared_ptr<RowInBlock>& rhs) -> bool {
return is_dup ? lhs->_row_pos > rhs->_row_pos : lhs->_row_pos < rhs->_row_pos;
});
same_keys_num += iter.right() - iter.left();
}
// merge new rows and old rows
_vec_row_comparator->set_block(&_input_mutable_block);
auto cmp_func = [this, is_dup, &same_keys_num](const RowInBlock* l,
const RowInBlock* r) -> bool {
auto value = (*(this->_vec_row_comparator))(l, r);
auto cmp_func = [this, is_dup, &same_keys_num](const std::shared_ptr<RowInBlock>& l,
const std::shared_ptr<RowInBlock>& r) -> bool {
auto value = (*(this->_vec_row_comparator))(l.get(), r.get());
if (value == 0) {
same_keys_num++;
return is_dup ? l->_row_pos > r->_row_pos : l->_row_pos < r->_row_pos;
Expand All @@ -356,14 +355,10 @@ Status MemTable::_sort_by_cluster_keys() {
auto clone_block = in_block.clone_without_columns();
_output_mutable_block = vectorized::MutableBlock::build_mutable_block(&clone_block);

DorisVector<RowInBlock*> row_in_blocks;
std::unique_ptr<int, std::function<void(int*)>> row_in_blocks_deleter((int*)0x01, [&](int*) {
std::for_each(row_in_blocks.begin(), row_in_blocks.end(),
std::default_delete<RowInBlock>());
});
DorisVector<std::shared_ptr<RowInBlock>> row_in_blocks;
row_in_blocks.reserve(mutable_block.rows());
for (size_t i = 0; i < mutable_block.rows(); i++) {
row_in_blocks.emplace_back(new RowInBlock {i});
row_in_blocks.emplace_back(std::make_shared<RowInBlock>(i));
}
Tie tie = Tie(0, mutable_block.rows());

Expand All @@ -384,9 +379,8 @@ Status MemTable::_sort_by_cluster_keys() {
while (iter.next()) {
pdqsort(std::next(row_in_blocks.begin(), iter.left()),
std::next(row_in_blocks.begin(), iter.right()),
[](const RowInBlock* lhs, const RowInBlock* rhs) -> bool {
return lhs->_row_pos < rhs->_row_pos;
});
[](const std::shared_ptr<RowInBlock>& lhs, const std::shared_ptr<RowInBlock>& rhs)
-> bool { return lhs->_row_pos < rhs->_row_pos; });
}

in_block = mutable_block.to_block();
Expand All @@ -405,16 +399,16 @@ Status MemTable::_sort_by_cluster_keys() {
row_pos_vec.data() + in_block.rows(), &column_offset);
}

void MemTable::_sort_one_column(DorisVector<RowInBlock*>& row_in_blocks, Tie& tie,
std::function<int(const RowInBlock*, const RowInBlock*)> cmp) {
void MemTable::_sort_one_column(DorisVector<std::shared_ptr<RowInBlock>>& row_in_blocks, Tie& tie,
std::function<int(RowInBlock*, RowInBlock*)> cmp) {
auto iter = tie.iter();
while (iter.next()) {
pdqsort(std::next(row_in_blocks.begin(), static_cast<int>(iter.left())),
std::next(row_in_blocks.begin(), static_cast<int>(iter.right())),
[&cmp](auto lhs, auto rhs) -> bool { return cmp(lhs, rhs) < 0; });
[&cmp](auto lhs, auto rhs) -> bool { return cmp(lhs.get(), rhs.get()) < 0; });
tie[iter.left()] = 0;
for (auto i = iter.left() + 1; i < iter.right(); i++) {
tie[i] = (cmp(row_in_blocks[i - 1], row_in_blocks[i]) == 0);
tie[i] = (cmp(row_in_blocks[i - 1].get(), row_in_blocks[i].get()) == 0);
}
}
}
Expand Down Expand Up @@ -475,7 +469,7 @@ void MemTable::_aggregate() {
vectorized::MutableBlock::build_mutable_block(&in_block);
_vec_row_comparator->set_block(&mutable_block);
auto& block_data = in_block.get_columns_with_type_and_name();
DorisVector<RowInBlock*> temp_row_in_blocks;
DorisVector<std::shared_ptr<RowInBlock>> temp_row_in_blocks;
temp_row_in_blocks.reserve(_last_sorted_pos);
RowInBlock* prev_row = nullptr;
int row_pos = -1;
Expand All @@ -494,7 +488,8 @@ void MemTable::_aggregate() {
};

if (!has_skip_bitmap_col || _seq_col_idx_in_block == -1) {
for (RowInBlock* cur_row : *_row_in_blocks) {
for (const auto& cur_row_ptr : *_row_in_blocks) {
RowInBlock* cur_row = cur_row_ptr.get();
if (!temp_row_in_blocks.empty() && (*_vec_row_comparator)(prev_row, cur_row) == 0) {
if (!prev_row->has_init_agg()) {
init_for_agg(prev_row);
Expand All @@ -505,15 +500,16 @@ void MemTable::_aggregate() {
prev_row = cur_row;
if (!temp_row_in_blocks.empty()) {
// no more rows to merge for prev row, finalize it
_finalize_one_row<is_final>(temp_row_in_blocks.back(), block_data, row_pos);
_finalize_one_row<is_final>(temp_row_in_blocks.back().get(), block_data,
row_pos);
}
temp_row_in_blocks.push_back(prev_row);
temp_row_in_blocks.push_back(cur_row_ptr);
row_pos++;
}
}
if (!temp_row_in_blocks.empty()) {
// finalize the last low
_finalize_one_row<is_final>(temp_row_in_blocks.back(), block_data, row_pos);
_finalize_one_row<is_final>(temp_row_in_blocks.back().get(), block_data, row_pos);
}
} else {
// For flexible partial update and the table has sequence column, considering the following situation:
Expand All @@ -539,8 +535,9 @@ void MemTable::_aggregate() {
row_without_seq_col = nullptr;
}
};
auto add_row = [&](RowInBlock* row, bool with_seq_col) {
temp_row_in_blocks.push_back(row);
auto add_row = [&](const std::shared_ptr<RowInBlock>& row_ptr, bool with_seq_col) {
RowInBlock* row = row_ptr.get();
temp_row_in_blocks.push_back(row_ptr);
row_pos++;
if (with_seq_col) {
row_with_seq_col = row;
Expand All @@ -553,7 +550,8 @@ void MemTable::_aggregate() {
auto& skip_bitmaps = assert_cast<vectorized::ColumnBitmap*>(
mutable_block.mutable_columns()[_skip_bitmap_col_idx].get())
->get_data();
for (auto* cur_row : *_row_in_blocks) {
for (const auto& cur_row_ptr : *_row_in_blocks) {
RowInBlock* cur_row = cur_row_ptr.get();
const BitmapValue& skip_bitmap = skip_bitmaps[cur_row->_row_pos];
bool with_seq_col = !skip_bitmap.contains(_seq_col_unique_id);
// compare keys, the keys of row_with_seq_col and row_with_seq_col is the same,
Expand All @@ -562,7 +560,7 @@ void MemTable::_aggregate() {
if (prev_row != nullptr && (*_vec_row_comparator)(prev_row, cur_row) == 0) {
prev_row = (with_seq_col ? row_with_seq_col : row_without_seq_col);
if (prev_row == nullptr) {
add_row(cur_row, with_seq_col);
add_row(cur_row_ptr, with_seq_col);
continue;
}
if (!prev_row->has_init_agg()) {
Expand All @@ -573,7 +571,7 @@ void MemTable::_aggregate() {
} else {
// no more rows to merge for prev rows, finalize them
finalize_rows();
add_row(cur_row, with_seq_col);
add_row(cur_row_ptr, with_seq_col);
}
}
// finalize the last lows
Expand Down
6 changes: 3 additions & 3 deletions be/src/olap/memtable.h
Original file line number Diff line number Diff line change
Expand Up @@ -249,8 +249,8 @@ class MemTable {
//return number of same keys
size_t _sort();
Status _sort_by_cluster_keys();
void _sort_one_column(DorisVector<RowInBlock*>& row_in_blocks, Tie& tie,
std::function<int(const RowInBlock*, const RowInBlock*)> cmp);
void _sort_one_column(DorisVector<std::shared_ptr<RowInBlock>>& row_in_blocks, Tie& tie,
std::function<int(RowInBlock*, RowInBlock*)> cmp);
template <bool is_final>
void _finalize_one_row(RowInBlock* row, const vectorized::ColumnsWithTypeAndName& block_data,
int row_pos);
Expand All @@ -263,7 +263,7 @@ class MemTable {
std::vector<vectorized::AggregateFunctionPtr> _agg_functions;
std::vector<size_t> _offsets_of_aggregate_states;
size_t _total_size_of_aggregate_states;
std::unique_ptr<DorisVector<RowInBlock*>> _row_in_blocks;
std::unique_ptr<DorisVector<std::shared_ptr<RowInBlock>>> _row_in_blocks;

size_t _num_columns;
int32_t _seq_col_idx_in_block = -1;
Expand Down
Loading