diff --git a/be/src/olap/memtable.cpp b/be/src/olap/memtable.cpp index 7e8d2fa0417a23..7b9521e775ca7a 100644 --- a/be/src/olap/memtable.cpp +++ b/be/src/olap/memtable.cpp @@ -51,18 +51,28 @@ MemTable::MemTable(int64_t tablet_id, Schema* schema, const TabletSchema* tablet _skip_list = nullptr; _vec_row_comparator = std::make_shared(_schema); // TODO: Support ZOrderComparator in the future - _vec_skip_list = new VecTable(_vec_row_comparator.get(), _table_mem_pool.get(), - _keys_type == KeysType::DUP_KEYS); + _vec_skip_list = std::make_unique( + _vec_row_comparator.get(), _table_mem_pool.get(), _keys_type == KeysType::DUP_KEYS); } else { _vec_skip_list = nullptr; + if (_keys_type == KeysType::DUP_KEYS) { + _insert_fn = &MemTable::_insert_dup; + } else { + _insert_fn = &MemTable::_insert_agg; + } + if (_tablet_schema->has_sequence_col()) { + _aggregate_two_row_fn = &MemTable::_aggregate_two_row_with_sequence; + } else { + _aggregate_two_row_fn = &MemTable::_aggregate_two_row; + } if (tablet_schema->sort_type() == SortType::ZORDER) { _row_comparator = std::make_shared( _schema, tablet_schema->sort_col_num()); } else { _row_comparator = std::make_shared(_schema); } - _skip_list = new Table(_row_comparator.get(), _table_mem_pool.get(), - _keys_type == KeysType::DUP_KEYS); + _skip_list = std::make_unique(_row_comparator.get(), _table_mem_pool.get(), + _keys_type == KeysType::DUP_KEYS); } } @@ -86,9 +96,6 @@ void MemTable::_init_agg_functions(const vectorized::Block* block) { } MemTable::~MemTable() { - delete _skip_list; - delete _vec_skip_list; - std::for_each(_row_in_blocks.begin(), _row_in_blocks.end(), std::default_delete()); _mem_tracker->release(_mem_usage); } @@ -158,37 +165,25 @@ void MemTable::_insert_one_row_from_block(RowInBlock* row_in_block) { } } -void MemTable::insert(const Tuple* tuple) { +// For non-DUP models, for the data rows passed from the upper layer, when copying the data, +// we first allocate from _buffer_mem_pool, and then check whether it already exists in +// _skiplist. If it exists, we aggregate the new row into the row in skiplist. +// otherwise, we need to copy it into _table_mem_pool before we can insert it. +void MemTable::_insert_agg(const Tuple* tuple) { _rows++; - bool overwritten = false; - uint8_t* _tuple_buf = nullptr; - if (_keys_type == KeysType::DUP_KEYS) { - // Will insert directly, so use memory from _table_mem_pool - _tuple_buf = _table_mem_pool->allocate(_schema_size); - ContiguousRow row(_schema, _tuple_buf); - _tuple_to_row(tuple, &row, _table_mem_pool.get()); - _skip_list->Insert((TableKey)_tuple_buf, &overwritten); - DCHECK(!overwritten) << "Duplicate key model meet overwrite in SkipList"; - return; - } - - // For non-DUP models, for the data rows passed from the upper layer, when copying the data, - // we first allocate from _buffer_mem_pool, and then check whether it already exists in - // _skiplist. If it exists, we aggregate the new row into the row in skiplist. - // otherwise, we need to copy it into _table_mem_pool before we can insert it. - _tuple_buf = _buffer_mem_pool->allocate(_schema_size); - ContiguousRow src_row(_schema, _tuple_buf); + uint8_t* tuple_buf = _buffer_mem_pool->allocate(_schema_size); + ContiguousRow src_row(_schema, tuple_buf); _tuple_to_row(tuple, &src_row, _buffer_mem_pool.get()); - bool is_exist = _skip_list->Find((TableKey)_tuple_buf, &_hint); + bool is_exist = _skip_list->Find((TableKey)tuple_buf, &_hint); if (is_exist) { - _aggregate_two_row(src_row, _hint.curr->key); + (this->*_aggregate_two_row_fn)(src_row, _hint.curr->key); } else { - _tuple_buf = _table_mem_pool->allocate(_schema_size); - ContiguousRow dst_row(_schema, _tuple_buf); + tuple_buf = _table_mem_pool->allocate(_schema_size); + ContiguousRow dst_row(_schema, tuple_buf); _agg_object_pool.acquire_data(&_agg_buffer_pool); copy_row_in_memtable(&dst_row, src_row, _table_mem_pool.get()); - _skip_list->InsertWithHint((TableKey)_tuple_buf, is_exist, &_hint); + _skip_list->InsertWithHint((TableKey)tuple_buf, is_exist, &_hint); } // Make MemPool to be reusable, but does not free its memory @@ -196,6 +191,16 @@ void MemTable::insert(const Tuple* tuple) { _agg_buffer_pool.clear(); } +void MemTable::_insert_dup(const Tuple* tuple) { + _rows++; + bool overwritten = false; + uint8_t* tuple_buf = _table_mem_pool->allocate(_schema_size); + ContiguousRow row(_schema, tuple_buf); + _tuple_to_row(tuple, &row, _table_mem_pool.get()); + _skip_list->Insert((TableKey)tuple_buf, &overwritten); + DCHECK(!overwritten) << "Duplicate key model meet overwrite in SkipList"; +} + void MemTable::_tuple_to_row(const Tuple* tuple, ContiguousRow* row, MemPool* mem_pool) { for (size_t i = 0; i < _slot_descs->size(); ++i) { auto cell = row->cell(i); @@ -209,12 +214,14 @@ void MemTable::_tuple_to_row(const Tuple* tuple, ContiguousRow* row, MemPool* me void MemTable::_aggregate_two_row(const ContiguousRow& src_row, TableKey row_in_skiplist) { ContiguousRow dst_row(_schema, row_in_skiplist); - if (_tablet_schema->has_sequence_col()) { - agg_update_row_with_sequence(&dst_row, src_row, _tablet_schema->sequence_col_idx(), - _table_mem_pool.get()); - } else { - agg_update_row(&dst_row, src_row, _table_mem_pool.get()); - } + agg_update_row(&dst_row, src_row, _table_mem_pool.get()); +} + +void MemTable::_aggregate_two_row_with_sequence(const ContiguousRow& src_row, + TableKey row_in_skiplist) { + ContiguousRow dst_row(_schema, row_in_skiplist); + agg_update_row_with_sequence(&dst_row, src_row, _tablet_schema->sequence_col_idx(), + _table_mem_pool.get()); } void MemTable::_aggregate_two_row_in_block(RowInBlock* new_row, RowInBlock* row_in_skiplist) { @@ -236,7 +243,7 @@ void MemTable::_aggregate_two_row_in_block(RowInBlock* new_row, RowInBlock* row_ } } vectorized::Block MemTable::_collect_vskiplist_results() { - VecTable::Iterator it(_vec_skip_list); + VecTable::Iterator it(_vec_skip_list.get()); vectorized::Block in_block = _input_mutable_block.to_block(); // TODO: should try to insert data by column, not by row. to opt the the code if (_keys_type == KeysType::DUP_KEYS) { @@ -282,7 +289,7 @@ Status MemTable::_do_flush(int64_t& duration_ns) { if (st == Status::OLAPInternalError(OLAP_ERR_FUNC_NOT_IMPLEMENTED)) { // For alpha rowset, we do not implement "flush_single_memtable". // Flush the memtable like the old way. - Table::Iterator it(_skip_list); + Table::Iterator it(_skip_list.get()); for (it.SeekToFirst(); it.Valid(); it.Next()) { char* row = (char*)it.key(); ContiguousRow dst_row(_schema, row); @@ -307,7 +314,7 @@ Status MemTable::close() { } MemTable::Iterator::Iterator(MemTable* memtable) - : _mem_table(memtable), _it(memtable->_skip_list) {} + : _mem_table(memtable), _it(memtable->_skip_list.get()) {} void MemTable::Iterator::seek_to_first() { _it.SeekToFirst(); diff --git a/be/src/olap/memtable.h b/be/src/olap/memtable.h index ed3cbc5fa329c9..4922bb62cdf477 100644 --- a/be/src/olap/memtable.h +++ b/be/src/olap/memtable.h @@ -50,7 +50,7 @@ class MemTable { size_t memory_usage() const { return _mem_tracker->consumption(); } std::shared_ptr& mem_tracker() { return _mem_tracker; } - void insert(const Tuple* tuple); + inline void insert(const Tuple* tuple) { (this->*_insert_fn)(tuple); } // insert tuple from (row_pos) to (row_pos+num_rows) void insert(const vectorized::Block* block, size_t row_pos, size_t num_rows); @@ -140,6 +140,9 @@ class MemTable { private: void _tuple_to_row(const Tuple* tuple, ContiguousRow* row, MemPool* mem_pool); void _aggregate_two_row(const ContiguousRow& new_row, TableKey row_in_skiplist); + void _aggregate_two_row_with_sequence(const ContiguousRow& new_row, TableKey row_in_skiplist); + void _insert_dup(const Tuple* tuple); + void _insert_agg(const Tuple* tuple); // for vectorized void _insert_one_row_from_block(RowInBlock* row_in_block); void _aggregate_two_row_in_block(RowInBlock* new_row, RowInBlock* row_in_skiplist); @@ -171,10 +174,10 @@ class MemTable { ObjectPool _agg_object_pool; size_t _schema_size; - Table* _skip_list; + std::unique_ptr
_skip_list; Table::Hint _hint; - VecTable* _vec_skip_list; + std::unique_ptr _vec_skip_list; VecTable::Hint _vec_hint; RowsetWriter* _rowset_writer; @@ -185,6 +188,9 @@ class MemTable { // This is not the rows in this memtable, because rows may be merged // in unique or aggragate key model. int64_t _rows = 0; + void (MemTable::*_insert_fn)(const Tuple* tuple) = nullptr; + void (MemTable::*_aggregate_two_row_fn)(const ContiguousRow& new_row, + TableKey row_in_skiplist) = nullptr; //for vectorized vectorized::MutableBlock _input_mutable_block; diff --git a/be/src/olap/rowset/segment_v2/binary_dict_page.cpp b/be/src/olap/rowset/segment_v2/binary_dict_page.cpp index a52dde719a7cba..f8c67e5e210e32 100644 --- a/be/src/olap/rowset/segment_v2/binary_dict_page.cpp +++ b/be/src/olap/rowset/segment_v2/binary_dict_page.cpp @@ -23,9 +23,9 @@ #include "util/slice.h" // for Slice #include "vec/columns/column.h" #include "vec/columns/column_dictionary.h" -#include "vec/columns/column_vector.h" -#include "vec/columns/column_string.h" #include "vec/columns/column_nullable.h" +#include "vec/columns/column_string.h" +#include "vec/columns/column_vector.h" #include "vec/columns/predicate_column.h" namespace doris { @@ -66,6 +66,8 @@ Status BinaryDictPageBuilder::add(const uint8_t* vals, size_t* count) { const Slice* src = reinterpret_cast(vals); size_t num_added = 0; uint32_t value_code = -1; + auto* actual_builder = + down_cast*>(_data_page_builder.get()); if (_data_page_builder->count() == 0) { _first_value.assign_copy(reinterpret_cast(src->get_data()), @@ -90,13 +92,18 @@ Status BinaryDictPageBuilder::add(const uint8_t* vals, size_t* count) { dict_item.relocate(item_mem); } value_code = _dictionary.size(); + size_t add_count = 1; + RETURN_IF_ERROR(_dict_builder->add(reinterpret_cast(&dict_item), + &add_count)); + if (add_count == 0) { + // current dict page is full, stop processing remaining inputs + break; + } _dictionary.emplace(dict_item, value_code); - _dict_items.push_back(dict_item); - _dict_builder->update_prepared_size(dict_item.size); } size_t add_count = 1; - RETURN_IF_ERROR(_data_page_builder->add(reinterpret_cast(&value_code), - &add_count)); + RETURN_IF_ERROR(actual_builder->single_add( + reinterpret_cast(&value_code), &add_count)); if (add_count == 0) { // current data page is full, stop processing remaining inputs break; @@ -144,17 +151,7 @@ uint64_t BinaryDictPageBuilder::size() const { } Status BinaryDictPageBuilder::get_dictionary_page(OwnedSlice* dictionary_page) { - _dictionary.clear(); - _dict_builder->reset(); - size_t add_count = 1; - // here do not check is_page_full of dict_builder - // because it is checked in add - for (auto& dict_item : _dict_items) { - RETURN_IF_ERROR( - _dict_builder->add(reinterpret_cast(&dict_item), &add_count)); - } *dictionary_page = _dict_builder->finish(); - _dict_items.clear(); return Status::OK(); } @@ -180,10 +177,7 @@ Status BinaryDictPageBuilder::get_last_value(void* value) const { } uint32_t value_code; RETURN_IF_ERROR(_data_page_builder->get_last_value(&value_code)); - // TODO _dict_items is cleared in get_dictionary_page, which could cause - // get_last_value to fail when it's called after get_dictionary_page. - // the solution is to read last value from _dict_builder instead of _dict_items - *reinterpret_cast(value) = _dict_items[value_code]; + *reinterpret_cast(value) = _dict_builder->get(value_code); return Status::OK(); } diff --git a/be/src/olap/rowset/segment_v2/binary_plain_page.h b/be/src/olap/rowset/segment_v2/binary_plain_page.h index e655baca9c7ce8..e8c8f2a18edaec 100644 --- a/be/src/olap/rowset/segment_v2/binary_plain_page.h +++ b/be/src/olap/rowset/segment_v2/binary_plain_page.h @@ -47,14 +47,13 @@ namespace segment_v2 { class BinaryPlainPageBuilder : public PageBuilder { public: BinaryPlainPageBuilder(const PageBuilderOptions& options) - : _size_estimate(0), _prepared_size(0), _options(options) { + : _size_estimate(0), _options(options) { reset(); } bool is_page_full() override { // data_page_size is 0, do not limit the page size - return _options.data_page_size != 0 && (_size_estimate > _options.data_page_size || - _prepared_size > _options.data_page_size); + return _options.data_page_size != 0 && _size_estimate > _options.data_page_size; } Status add(const uint8_t* vals, size_t* count) override { @@ -101,7 +100,6 @@ class BinaryPlainPageBuilder : public PageBuilder { _buffer.clear(); _buffer.reserve(_options.data_page_size == 0 ? 1024 : _options.data_page_size); _size_estimate = sizeof(uint32_t); - _prepared_size = sizeof(uint32_t); _finished = false; _last_value_size = 0; } @@ -127,11 +125,16 @@ class BinaryPlainPageBuilder : public PageBuilder { return Status::OK(); } - void update_prepared_size(size_t added_size) { - _prepared_size += added_size; - _prepared_size += sizeof(uint32_t); + inline Slice operator[](size_t idx) const { + DCHECK(!_finished); + DCHECK_LT(idx, _offsets.size()); + size_t value_size = + (idx < _offsets.size() - 1) ? _offsets[idx + 1] - _offsets[idx] : _last_value_size; + return Slice(&_buffer[_offsets[idx]], value_size); } + inline Slice get(std::size_t idx) const { return (*this)[idx]; } + private: void _copy_value_at(size_t idx, faststring* value) const { size_t value_size = @@ -141,7 +144,6 @@ class BinaryPlainPageBuilder : public PageBuilder { faststring _buffer; size_t _size_estimate; - size_t _prepared_size; // Offsets of each entry, relative to the start of the page std::vector _offsets; bool _finished; diff --git a/be/src/olap/rowset/segment_v2/bitshuffle_page.h b/be/src/olap/rowset/segment_v2/bitshuffle_page.h index ceadcf65796962..50edb30c2a32a3 100644 --- a/be/src/olap/rowset/segment_v2/bitshuffle_page.h +++ b/be/src/olap/rowset/segment_v2/bitshuffle_page.h @@ -93,13 +93,48 @@ class BitshufflePageBuilder : public PageBuilder { bool is_page_full() override { return _remain_element_capacity == 0; } Status add(const uint8_t* vals, size_t* count) override { + return add_internal(vals, count); + } + + Status single_add(const uint8_t* vals, size_t* count) { + return add_internal(vals, count); + } + + template + inline Status add_internal(const uint8_t* vals, size_t* count) { DCHECK(!_finished); + if (_remain_element_capacity <= 0) { + *count = 0; + return Status::RuntimeError("page is full."); + } int to_add = std::min(_remain_element_capacity, *count); - _data.append(vals, to_add * SIZE_OF_TYPE); + int to_add_size = to_add * SIZE_OF_TYPE; + size_t orig_size = _data.size(); + _data.resize(orig_size + to_add_size); _count += to_add; _remain_element_capacity -= to_add; // return added number through count *count = to_add; + if constexpr (single) { + if constexpr (SIZE_OF_TYPE == 1) { + *reinterpret_cast(&_data[orig_size]) = *vals; + return Status::OK(); + } else if constexpr (SIZE_OF_TYPE == 2) { + *reinterpret_cast(&_data[orig_size]) = + *reinterpret_cast(vals); + return Status::OK(); + } else if constexpr (SIZE_OF_TYPE == 4) { + *reinterpret_cast(&_data[orig_size]) = + *reinterpret_cast(vals); + return Status::OK(); + } else if constexpr (SIZE_OF_TYPE == 8) { + *reinterpret_cast(&_data[orig_size]) = + *reinterpret_cast(vals); + return Status::OK(); + } + } + // when single is true and SIZE_OF_TYPE > 8 or single is false + memcpy(&_data[orig_size], vals, to_add_size); return Status::OK(); }