Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[stream-load-vec]: memtable flush only if necessary after aggregated #9459

Merged
merged 9 commits into from
May 25, 2022
2 changes: 2 additions & 0 deletions be/src/common/config.h
Original file line number Diff line number Diff line change
Expand Up @@ -464,6 +464,8 @@ CONF_Int32(memory_max_alignment, "16");
// write buffer size before flush
CONF_mInt64(write_buffer_size, "209715200");

CONF_mInt64(memtable_max_buffer_size, "419430400");
spaces-X marked this conversation as resolved.
Show resolved Hide resolved

// following 2 configs limit the memory consumption of load process on a Backend.
// eg: memory limit to 80% of mem limit config but up to 100GB(default)
// NOTICE(cmy): set these default values very large because we don't want to
Expand Down
9 changes: 6 additions & 3 deletions be/src/olap/delta_writer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -222,9 +222,12 @@ Status DeltaWriter::write(const vectorized::Block* block, const std::vector<int>
}
}

if (_mem_table->memory_usage() >= config::write_buffer_size) {
RETURN_NOT_OK(_flush_memtable_async());
_reset_mem_table();
if (_mem_table->need_to_agg()) {
_mem_table->shrink_memtable_by_agg();
if (_mem_table->is_flush()) {
RETURN_NOT_OK(_flush_memtable_async());
_reset_mem_table();
}
}

return Status::OK();
Expand Down
56 changes: 45 additions & 11 deletions be/src/olap/memtable.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -126,11 +126,10 @@ void MemTable::insert(const vectorized::Block* block, size_t row_pos, size_t num
}
}
size_t cursor_in_mutableblock = _input_mutable_block.rows();
size_t oldsize = _input_mutable_block.allocated_bytes();
_input_mutable_block.add_rows(block, row_pos, num_rows);
size_t newsize = _input_mutable_block.allocated_bytes();
_mem_usage += newsize - oldsize;
_mem_tracker->consume(newsize - oldsize);
size_t input_size = block->allocated_bytes() * num_rows / block->rows();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

what the logic means ?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

just to calculate input size of block

_mem_usage += input_size;
_mem_tracker->consume(input_size);

for (int i = 0; i < num_rows; i++) {
_row_in_blocks.emplace_back(new RowInBlock {cursor_in_mutableblock + i});
Expand Down Expand Up @@ -242,7 +241,8 @@ void MemTable::_aggregate_two_row_in_block(RowInBlock* new_row, RowInBlock* row_
new_row->_row_pos, nullptr);
}
}
vectorized::Block MemTable::_collect_vskiplist_results() {
template <bool is_final>
void MemTable::_collect_vskiplist_results() {
VecTable::Iterator it(_vec_skip_list.get());
vectorized::Block in_block = _input_mutable_block.to_block();
// TODO: should try to insert data by column, not by row. to opt the the code
Expand All @@ -251,6 +251,7 @@ vectorized::Block MemTable::_collect_vskiplist_results() {
_output_mutable_block.add_row(&in_block, it.key()->_row_pos);
}
} else {
size_t idx = 0;
for (it.SeekToFirst(); it.Valid(); it.Next()) {
auto& block_data = in_block.get_columns_with_type_and_name();
// move key columns
Expand All @@ -263,11 +264,46 @@ vectorized::Block MemTable::_collect_vskiplist_results() {
auto function = _agg_functions[i];
function->insert_result_into(it.key()->_agg_places[i],
*(_output_mutable_block.get_column_by_position(i)));
function->destroy(it.key()->_agg_places[i]);
if constexpr (is_final) {
function->destroy(it.key()->_agg_places[i]);
}
}
if constexpr (!is_final) {
// re-index the row_pos in VSkipList
it.key()->_row_pos = idx;
idx++;
}
}
if constexpr (!is_final) {
// if is not final, we collect the agg results to input_block and then continue to insert
size_t shrunked_after_agg = _output_mutable_block.allocated_bytes();
_mem_tracker->consume(shrunked_after_agg - _mem_usage);
_mem_usage = shrunked_after_agg;
_input_mutable_block.swap(_output_mutable_block);
//TODO(weixang):opt here.
std::unique_ptr<vectorized::Block> empty_input_block =
std::move(in_block.create_same_struct_block(0));
_output_mutable_block =
vectorized::MutableBlock::build_mutable_block(empty_input_block.get());
_output_mutable_block.clear_column_data();
}
}
return _output_mutable_block.to_block();
}

void MemTable::shrink_memtable_by_agg() {
if (_keys_type == KeysType::DUP_KEYS) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

DCHECK(_key_type != KeysType::DUP_KEYS)

the dup keys should not call this function

return;
}
_collect_vskiplist_results<false>();
}

bool MemTable::is_flush() {
return memory_usage() >= config::write_buffer_size;
}

bool MemTable::need_to_agg() {
return _keys_type == KeysType::DUP_KEYS ? is_flush()
: memory_usage() >= config::memtable_max_buffer_size;
}

Status MemTable::flush() {
Expand Down Expand Up @@ -301,10 +337,8 @@ Status MemTable::_do_flush(int64_t& duration_ns) {
RETURN_NOT_OK(st);
}
} else {
vectorized::Block block = _collect_vskiplist_results();
// beta rowset flush parallel, segment write add block is not
// thread safe, so use tmp variable segment_write instead of
// member variable
_collect_vskiplist_results<true>();
vectorized::Block block = _output_mutable_block.to_block();
RETURN_NOT_OK(_rowset_writer->flush_single_memtable(&block));
_flush_size = block.allocated_bytes();
}
Expand Down
10 changes: 9 additions & 1 deletion be/src/olap/memtable.h
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,12 @@ class MemTable {
// insert tuple from (row_pos) to (row_pos+num_rows)
void insert(const vectorized::Block* block, size_t row_pos, size_t num_rows);

void shrink_memtable_by_agg();

bool is_flush();

bool need_to_agg();

/// Flush
Status flush();
Status close();
Expand Down Expand Up @@ -195,7 +201,9 @@ class MemTable {
//for vectorized
vectorized::MutableBlock _input_mutable_block;
vectorized::MutableBlock _output_mutable_block;
vectorized::Block _collect_vskiplist_results();

template <bool is_final>
void _collect_vskiplist_results();
bool _is_first_insertion;

void _init_agg_functions(const vectorized::Block* block);
Expand Down
19 changes: 19 additions & 0 deletions be/src/vec/core/block.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -846,6 +846,17 @@ size_t MutableBlock::rows() const {
return 0;
}

void MutableBlock::swap(MutableBlock& another) noexcept {
_columns.swap(another._columns);
_data_types.swap(another._data_types);
}

void MutableBlock::swap(MutableBlock&& another) noexcept {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

the function should not name swap

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It kept the same name inclass block.

clear();
_columns = std::move(another._columns);
_data_types = std::move(another._data_types);
}

void MutableBlock::add_row(const Block* block, int row) {
auto& block_data = block->get_columns_with_type_and_name();
for (size_t i = 0; i < _columns.size(); ++i) {
Expand Down Expand Up @@ -973,4 +984,12 @@ size_t MutableBlock::allocated_bytes() const {
return res;
}

void MutableBlock::clear_column_data() noexcept {
for (auto& col : _columns) {
if (col) {
col->clear();
}
}
}

} // namespace doris::vectorized
7 changes: 7 additions & 0 deletions be/src/vec/core/block.h
Original file line number Diff line number Diff line change
Expand Up @@ -425,6 +425,10 @@ class MutableBlock {

Block to_block(int start_column, int end_column);

void swap(MutableBlock& other) noexcept;

void swap(MutableBlock&& other) noexcept;

void add_row(const Block* block, int row);
void add_rows(const Block* block, const int* row_begin, const int* row_end);
void add_rows(const Block* block, size_t row_begin, size_t length);
Expand All @@ -435,6 +439,9 @@ class MutableBlock {
_columns.clear();
_data_types.clear();
}

void clear_column_data() noexcept;

size_t allocated_bytes() const;
};

Expand Down