diff --git a/be/src/common/config.h b/be/src/common/config.h index d171af23ecf474..d38c80f219b4c7 100644 --- a/be/src/common/config.h +++ b/be/src/common/config.h @@ -464,6 +464,9 @@ CONF_Int32(memory_max_alignment, "16"); // write buffer size before flush CONF_mInt64(write_buffer_size, "209715200"); +// max buffer size used in memtable for the aggregated table +CONF_mInt64(memtable_max_buffer_size, "419430400"); + // following 2 configs limit the memory consumption of load process on a Backend. // eg: memory limit to 80% of mem limit config but up to 100GB(default) // NOTICE(cmy): set these default values very large because we don't want to diff --git a/be/src/olap/delta_writer.cpp b/be/src/olap/delta_writer.cpp index b40677e2b044ae..abf6dffa31b847 100644 --- a/be/src/olap/delta_writer.cpp +++ b/be/src/olap/delta_writer.cpp @@ -222,9 +222,12 @@ Status DeltaWriter::write(const vectorized::Block* block, const std::vector } } - if (_mem_table->memory_usage() >= config::write_buffer_size) { - RETURN_NOT_OK(_flush_memtable_async()); - _reset_mem_table(); + if (_mem_table->need_to_agg()) { + _mem_table->shrink_memtable_by_agg(); + if (_mem_table->is_flush()) { + RETURN_NOT_OK(_flush_memtable_async()); + _reset_mem_table(); + } } return Status::OK(); diff --git a/be/src/olap/memtable.cpp b/be/src/olap/memtable.cpp index 769169a8ac6338..163ca551b5611a 100644 --- a/be/src/olap/memtable.cpp +++ b/be/src/olap/memtable.cpp @@ -126,11 +126,10 @@ void MemTable::insert(const vectorized::Block* block, size_t row_pos, size_t num } } size_t cursor_in_mutableblock = _input_mutable_block.rows(); - size_t oldsize = _input_mutable_block.allocated_bytes(); _input_mutable_block.add_rows(block, row_pos, num_rows); - size_t newsize = _input_mutable_block.allocated_bytes(); - _mem_usage += newsize - oldsize; - _mem_tracker->consume(newsize - oldsize); + size_t input_size = block->allocated_bytes() * num_rows / block->rows(); + _mem_usage += input_size; + _mem_tracker->consume(input_size); for (int i = 0; i < num_rows; i++) { _row_in_blocks.emplace_back(new RowInBlock {cursor_in_mutableblock + i}); @@ -242,7 +241,8 @@ void MemTable::_aggregate_two_row_in_block(RowInBlock* new_row, RowInBlock* row_ new_row->_row_pos, nullptr); } } -vectorized::Block MemTable::_collect_vskiplist_results() { +template +void MemTable::_collect_vskiplist_results() { VecTable::Iterator it(_vec_skip_list.get()); vectorized::Block in_block = _input_mutable_block.to_block(); // TODO: should try to insert data by column, not by row. to opt the the code @@ -251,6 +251,7 @@ vectorized::Block MemTable::_collect_vskiplist_results() { _output_mutable_block.add_row(&in_block, it.key()->_row_pos); } } else { + size_t idx = 0; for (it.SeekToFirst(); it.Valid(); it.Next()) { auto& block_data = in_block.get_columns_with_type_and_name(); // move key columns @@ -263,11 +264,46 @@ vectorized::Block MemTable::_collect_vskiplist_results() { auto function = _agg_functions[i]; function->insert_result_into(it.key()->_agg_places[i], *(_output_mutable_block.get_column_by_position(i))); - function->destroy(it.key()->_agg_places[i]); + if constexpr (is_final) { + function->destroy(it.key()->_agg_places[i]); + } } + if constexpr (!is_final) { + // re-index the row_pos in VSkipList + it.key()->_row_pos = idx; + idx++; + } + } + if constexpr (!is_final) { + // if is not final, we collect the agg results to input_block and then continue to insert + size_t shrunked_after_agg = _output_mutable_block.allocated_bytes(); + _mem_tracker->consume(shrunked_after_agg - _mem_usage); + _mem_usage = shrunked_after_agg; + _input_mutable_block.swap(_output_mutable_block); + //TODO(weixang):opt here. + std::unique_ptr empty_input_block = + std::move(in_block.create_same_struct_block(0)); + _output_mutable_block = + vectorized::MutableBlock::build_mutable_block(empty_input_block.get()); + _output_mutable_block.clear_column_data(); } } - return _output_mutable_block.to_block(); +} + +void MemTable::shrink_memtable_by_agg() { + if (_keys_type == KeysType::DUP_KEYS) { + return; + } + _collect_vskiplist_results(); +} + +bool MemTable::is_flush() { + return memory_usage() >= config::write_buffer_size; +} + +bool MemTable::need_to_agg() { + return _keys_type == KeysType::DUP_KEYS ? is_flush() + : memory_usage() >= config::memtable_max_buffer_size; } Status MemTable::flush() { @@ -301,10 +337,8 @@ Status MemTable::_do_flush(int64_t& duration_ns) { RETURN_NOT_OK(st); } } else { - vectorized::Block block = _collect_vskiplist_results(); - // beta rowset flush parallel, segment write add block is not - // thread safe, so use tmp variable segment_write instead of - // member variable + _collect_vskiplist_results(); + vectorized::Block block = _output_mutable_block.to_block(); RETURN_NOT_OK(_rowset_writer->flush_single_memtable(&block)); _flush_size = block.allocated_bytes(); } diff --git a/be/src/olap/memtable.h b/be/src/olap/memtable.h index 910cd92270d755..c73b39b39da990 100644 --- a/be/src/olap/memtable.h +++ b/be/src/olap/memtable.h @@ -54,6 +54,12 @@ class MemTable { // insert tuple from (row_pos) to (row_pos+num_rows) void insert(const vectorized::Block* block, size_t row_pos, size_t num_rows); + void shrink_memtable_by_agg(); + + bool is_flush(); + + bool need_to_agg(); + /// Flush Status flush(); Status close(); @@ -195,7 +201,9 @@ class MemTable { //for vectorized vectorized::MutableBlock _input_mutable_block; vectorized::MutableBlock _output_mutable_block; - vectorized::Block _collect_vskiplist_results(); + + template + void _collect_vskiplist_results(); bool _is_first_insertion; void _init_agg_functions(const vectorized::Block* block); diff --git a/be/src/vec/core/block.cpp b/be/src/vec/core/block.cpp index 8d5880531ab26b..d63438be2ddc1d 100644 --- a/be/src/vec/core/block.cpp +++ b/be/src/vec/core/block.cpp @@ -846,6 +846,17 @@ size_t MutableBlock::rows() const { return 0; } +void MutableBlock::swap(MutableBlock& another) noexcept { + _columns.swap(another._columns); + _data_types.swap(another._data_types); +} + +void MutableBlock::swap(MutableBlock&& another) noexcept { + clear(); + _columns = std::move(another._columns); + _data_types = std::move(another._data_types); +} + void MutableBlock::add_row(const Block* block, int row) { auto& block_data = block->get_columns_with_type_and_name(); for (size_t i = 0; i < _columns.size(); ++i) { @@ -973,4 +984,12 @@ size_t MutableBlock::allocated_bytes() const { return res; } +void MutableBlock::clear_column_data() noexcept { + for (auto& col : _columns) { + if (col) { + col->clear(); + } + } +} + } // namespace doris::vectorized diff --git a/be/src/vec/core/block.h b/be/src/vec/core/block.h index 375ef6906f6988..0e0a0f69e73b13 100644 --- a/be/src/vec/core/block.h +++ b/be/src/vec/core/block.h @@ -425,6 +425,10 @@ class MutableBlock { Block to_block(int start_column, int end_column); + void swap(MutableBlock& other) noexcept; + + void swap(MutableBlock&& other) noexcept; + void add_row(const Block* block, int row); void add_rows(const Block* block, const int* row_begin, const int* row_end); void add_rows(const Block* block, size_t row_begin, size_t length); @@ -435,6 +439,9 @@ class MutableBlock { _columns.clear(); _data_types.clear(); } + + void clear_column_data() noexcept; + size_t allocated_bytes() const; };