Skip to content

Commit

Permalink
[enhancement](load) optimize load string data and dict page write (ap…
Browse files Browse the repository at this point in the history
…ache#9123)

* [enhancement](load) optimize load string data and dict page write
  • Loading branch information
yangzhg authored and starocean999 committed May 19, 2022
1 parent bc4fba5 commit 24a5809
Show file tree
Hide file tree
Showing 5 changed files with 116 additions and 72 deletions.
87 changes: 47 additions & 40 deletions be/src/olap/memtable.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -51,18 +51,28 @@ MemTable::MemTable(int64_t tablet_id, Schema* schema, const TabletSchema* tablet
_skip_list = nullptr;
_vec_row_comparator = std::make_shared<RowInBlockComparator>(_schema);
// TODO: Support ZOrderComparator in the future
_vec_skip_list = new VecTable(_vec_row_comparator.get(), _table_mem_pool.get(),
_keys_type == KeysType::DUP_KEYS);
_vec_skip_list = std::make_unique<VecTable>(
_vec_row_comparator.get(), _table_mem_pool.get(), _keys_type == KeysType::DUP_KEYS);
} else {
_vec_skip_list = nullptr;
if (_keys_type == KeysType::DUP_KEYS) {
_insert_fn = &MemTable::_insert_dup;
} else {
_insert_fn = &MemTable::_insert_agg;
}
if (_tablet_schema->has_sequence_col()) {
_aggregate_two_row_fn = &MemTable::_aggregate_two_row_with_sequence;
} else {
_aggregate_two_row_fn = &MemTable::_aggregate_two_row;
}
if (tablet_schema->sort_type() == SortType::ZORDER) {
_row_comparator = std::make_shared<TupleRowZOrderComparator>(
_schema, tablet_schema->sort_col_num());
} else {
_row_comparator = std::make_shared<RowCursorComparator>(_schema);
}
_skip_list = new Table(_row_comparator.get(), _table_mem_pool.get(),
_keys_type == KeysType::DUP_KEYS);
_skip_list = std::make_unique<Table>(_row_comparator.get(), _table_mem_pool.get(),
_keys_type == KeysType::DUP_KEYS);
}
}

Expand All @@ -86,9 +96,6 @@ void MemTable::_init_agg_functions(const vectorized::Block* block) {
}

MemTable::~MemTable() {
delete _skip_list;
delete _vec_skip_list;

std::for_each(_row_in_blocks.begin(), _row_in_blocks.end(), std::default_delete<RowInBlock>());
_mem_tracker->release(_mem_usage);
}
Expand Down Expand Up @@ -158,44 +165,42 @@ void MemTable::_insert_one_row_from_block(RowInBlock* row_in_block) {
}
}

void MemTable::insert(const Tuple* tuple) {
// For non-DUP models, for the data rows passed from the upper layer, when copying the data,
// we first allocate from _buffer_mem_pool, and then check whether it already exists in
// _skiplist. If it exists, we aggregate the new row into the row in skiplist.
// otherwise, we need to copy it into _table_mem_pool before we can insert it.
void MemTable::_insert_agg(const Tuple* tuple) {
_rows++;
bool overwritten = false;
uint8_t* _tuple_buf = nullptr;
if (_keys_type == KeysType::DUP_KEYS) {
// Will insert directly, so use memory from _table_mem_pool
_tuple_buf = _table_mem_pool->allocate(_schema_size);
ContiguousRow row(_schema, _tuple_buf);
_tuple_to_row(tuple, &row, _table_mem_pool.get());
_skip_list->Insert((TableKey)_tuple_buf, &overwritten);
DCHECK(!overwritten) << "Duplicate key model meet overwrite in SkipList";
return;
}

// For non-DUP models, for the data rows passed from the upper layer, when copying the data,
// we first allocate from _buffer_mem_pool, and then check whether it already exists in
// _skiplist. If it exists, we aggregate the new row into the row in skiplist.
// otherwise, we need to copy it into _table_mem_pool before we can insert it.
_tuple_buf = _buffer_mem_pool->allocate(_schema_size);
ContiguousRow src_row(_schema, _tuple_buf);
uint8_t* tuple_buf = _buffer_mem_pool->allocate(_schema_size);
ContiguousRow src_row(_schema, tuple_buf);
_tuple_to_row(tuple, &src_row, _buffer_mem_pool.get());

bool is_exist = _skip_list->Find((TableKey)_tuple_buf, &_hint);
bool is_exist = _skip_list->Find((TableKey)tuple_buf, &_hint);
if (is_exist) {
_aggregate_two_row(src_row, _hint.curr->key);
(this->*_aggregate_two_row_fn)(src_row, _hint.curr->key);
} else {
_tuple_buf = _table_mem_pool->allocate(_schema_size);
ContiguousRow dst_row(_schema, _tuple_buf);
tuple_buf = _table_mem_pool->allocate(_schema_size);
ContiguousRow dst_row(_schema, tuple_buf);
_agg_object_pool.acquire_data(&_agg_buffer_pool);
copy_row_in_memtable(&dst_row, src_row, _table_mem_pool.get());
_skip_list->InsertWithHint((TableKey)_tuple_buf, is_exist, &_hint);
_skip_list->InsertWithHint((TableKey)tuple_buf, is_exist, &_hint);
}

// Make MemPool to be reusable, but does not free its memory
_buffer_mem_pool->clear();
_agg_buffer_pool.clear();
}

void MemTable::_insert_dup(const Tuple* tuple) {
_rows++;
bool overwritten = false;
uint8_t* tuple_buf = _table_mem_pool->allocate(_schema_size);
ContiguousRow row(_schema, tuple_buf);
_tuple_to_row(tuple, &row, _table_mem_pool.get());
_skip_list->Insert((TableKey)tuple_buf, &overwritten);
DCHECK(!overwritten) << "Duplicate key model meet overwrite in SkipList";
}

void MemTable::_tuple_to_row(const Tuple* tuple, ContiguousRow* row, MemPool* mem_pool) {
for (size_t i = 0; i < _slot_descs->size(); ++i) {
auto cell = row->cell(i);
Expand All @@ -209,12 +214,14 @@ void MemTable::_tuple_to_row(const Tuple* tuple, ContiguousRow* row, MemPool* me

void MemTable::_aggregate_two_row(const ContiguousRow& src_row, TableKey row_in_skiplist) {
ContiguousRow dst_row(_schema, row_in_skiplist);
if (_tablet_schema->has_sequence_col()) {
agg_update_row_with_sequence(&dst_row, src_row, _tablet_schema->sequence_col_idx(),
_table_mem_pool.get());
} else {
agg_update_row(&dst_row, src_row, _table_mem_pool.get());
}
agg_update_row(&dst_row, src_row, _table_mem_pool.get());
}

void MemTable::_aggregate_two_row_with_sequence(const ContiguousRow& src_row,
TableKey row_in_skiplist) {
ContiguousRow dst_row(_schema, row_in_skiplist);
agg_update_row_with_sequence(&dst_row, src_row, _tablet_schema->sequence_col_idx(),
_table_mem_pool.get());
}

void MemTable::_aggregate_two_row_in_block(RowInBlock* new_row, RowInBlock* row_in_skiplist) {
Expand All @@ -236,7 +243,7 @@ void MemTable::_aggregate_two_row_in_block(RowInBlock* new_row, RowInBlock* row_
}
}
vectorized::Block MemTable::_collect_vskiplist_results() {
VecTable::Iterator it(_vec_skip_list);
VecTable::Iterator it(_vec_skip_list.get());
vectorized::Block in_block = _input_mutable_block.to_block();
// TODO: should try to insert data by column, not by row. to opt the the code
if (_keys_type == KeysType::DUP_KEYS) {
Expand Down Expand Up @@ -282,7 +289,7 @@ Status MemTable::_do_flush(int64_t& duration_ns) {
if (st == Status::OLAPInternalError(OLAP_ERR_FUNC_NOT_IMPLEMENTED)) {
// For alpha rowset, we do not implement "flush_single_memtable".
// Flush the memtable like the old way.
Table::Iterator it(_skip_list);
Table::Iterator it(_skip_list.get());
for (it.SeekToFirst(); it.Valid(); it.Next()) {
char* row = (char*)it.key();
ContiguousRow dst_row(_schema, row);
Expand All @@ -307,7 +314,7 @@ Status MemTable::close() {
}

MemTable::Iterator::Iterator(MemTable* memtable)
: _mem_table(memtable), _it(memtable->_skip_list) {}
: _mem_table(memtable), _it(memtable->_skip_list.get()) {}

void MemTable::Iterator::seek_to_first() {
_it.SeekToFirst();
Expand Down
12 changes: 9 additions & 3 deletions be/src/olap/memtable.h
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ class MemTable {
size_t memory_usage() const { return _mem_tracker->consumption(); }
std::shared_ptr<MemTracker>& mem_tracker() { return _mem_tracker; }

void insert(const Tuple* tuple);
inline void insert(const Tuple* tuple) { (this->*_insert_fn)(tuple); }
// insert tuple from (row_pos) to (row_pos+num_rows)
void insert(const vectorized::Block* block, size_t row_pos, size_t num_rows);

Expand Down Expand Up @@ -140,6 +140,9 @@ class MemTable {
private:
void _tuple_to_row(const Tuple* tuple, ContiguousRow* row, MemPool* mem_pool);
void _aggregate_two_row(const ContiguousRow& new_row, TableKey row_in_skiplist);
void _aggregate_two_row_with_sequence(const ContiguousRow& new_row, TableKey row_in_skiplist);
void _insert_dup(const Tuple* tuple);
void _insert_agg(const Tuple* tuple);
// for vectorized
void _insert_one_row_from_block(RowInBlock* row_in_block);
void _aggregate_two_row_in_block(RowInBlock* new_row, RowInBlock* row_in_skiplist);
Expand Down Expand Up @@ -171,10 +174,10 @@ class MemTable {
ObjectPool _agg_object_pool;

size_t _schema_size;
Table* _skip_list;
std::unique_ptr<Table> _skip_list;
Table::Hint _hint;

VecTable* _vec_skip_list;
std::unique_ptr<VecTable> _vec_skip_list;
VecTable::Hint _vec_hint;

RowsetWriter* _rowset_writer;
Expand All @@ -185,6 +188,9 @@ class MemTable {
// This is not the rows in this memtable, because rows may be merged
// in unique or aggragate key model.
int64_t _rows = 0;
void (MemTable::*_insert_fn)(const Tuple* tuple) = nullptr;
void (MemTable::*_aggregate_two_row_fn)(const ContiguousRow& new_row,
TableKey row_in_skiplist) = nullptr;

//for vectorized
vectorized::MutableBlock _input_mutable_block;
Expand Down
34 changes: 14 additions & 20 deletions be/src/olap/rowset/segment_v2/binary_dict_page.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -23,9 +23,9 @@
#include "util/slice.h" // for Slice
#include "vec/columns/column.h"
#include "vec/columns/column_dictionary.h"
#include "vec/columns/column_vector.h"
#include "vec/columns/column_string.h"
#include "vec/columns/column_nullable.h"
#include "vec/columns/column_string.h"
#include "vec/columns/column_vector.h"
#include "vec/columns/predicate_column.h"

namespace doris {
Expand Down Expand Up @@ -66,6 +66,8 @@ Status BinaryDictPageBuilder::add(const uint8_t* vals, size_t* count) {
const Slice* src = reinterpret_cast<const Slice*>(vals);
size_t num_added = 0;
uint32_t value_code = -1;
auto* actual_builder =
down_cast<BitshufflePageBuilder<OLAP_FIELD_TYPE_INT>*>(_data_page_builder.get());

if (_data_page_builder->count() == 0) {
_first_value.assign_copy(reinterpret_cast<const uint8_t*>(src->get_data()),
Expand All @@ -90,13 +92,18 @@ Status BinaryDictPageBuilder::add(const uint8_t* vals, size_t* count) {
dict_item.relocate(item_mem);
}
value_code = _dictionary.size();
size_t add_count = 1;
RETURN_IF_ERROR(_dict_builder->add(reinterpret_cast<const uint8_t*>(&dict_item),
&add_count));
if (add_count == 0) {
// current dict page is full, stop processing remaining inputs
break;
}
_dictionary.emplace(dict_item, value_code);
_dict_items.push_back(dict_item);
_dict_builder->update_prepared_size(dict_item.size);
}
size_t add_count = 1;
RETURN_IF_ERROR(_data_page_builder->add(reinterpret_cast<const uint8_t*>(&value_code),
&add_count));
RETURN_IF_ERROR(actual_builder->single_add(
reinterpret_cast<const uint8_t*>(&value_code), &add_count));
if (add_count == 0) {
// current data page is full, stop processing remaining inputs
break;
Expand Down Expand Up @@ -144,17 +151,7 @@ uint64_t BinaryDictPageBuilder::size() const {
}

Status BinaryDictPageBuilder::get_dictionary_page(OwnedSlice* dictionary_page) {
_dictionary.clear();
_dict_builder->reset();
size_t add_count = 1;
// here do not check is_page_full of dict_builder
// because it is checked in add
for (auto& dict_item : _dict_items) {
RETURN_IF_ERROR(
_dict_builder->add(reinterpret_cast<const uint8_t*>(&dict_item), &add_count));
}
*dictionary_page = _dict_builder->finish();
_dict_items.clear();
return Status::OK();
}

Expand All @@ -180,10 +177,7 @@ Status BinaryDictPageBuilder::get_last_value(void* value) const {
}
uint32_t value_code;
RETURN_IF_ERROR(_data_page_builder->get_last_value(&value_code));
// TODO _dict_items is cleared in get_dictionary_page, which could cause
// get_last_value to fail when it's called after get_dictionary_page.
// the solution is to read last value from _dict_builder instead of _dict_items
*reinterpret_cast<Slice*>(value) = _dict_items[value_code];
*reinterpret_cast<Slice*>(value) = _dict_builder->get(value_code);
return Status::OK();
}

Expand Down
18 changes: 10 additions & 8 deletions be/src/olap/rowset/segment_v2/binary_plain_page.h
Original file line number Diff line number Diff line change
Expand Up @@ -47,14 +47,13 @@ namespace segment_v2 {
class BinaryPlainPageBuilder : public PageBuilder {
public:
BinaryPlainPageBuilder(const PageBuilderOptions& options)
: _size_estimate(0), _prepared_size(0), _options(options) {
: _size_estimate(0), _options(options) {
reset();
}

bool is_page_full() override {
// data_page_size is 0, do not limit the page size
return _options.data_page_size != 0 && (_size_estimate > _options.data_page_size ||
_prepared_size > _options.data_page_size);
return _options.data_page_size != 0 && _size_estimate > _options.data_page_size;
}

Status add(const uint8_t* vals, size_t* count) override {
Expand Down Expand Up @@ -101,7 +100,6 @@ class BinaryPlainPageBuilder : public PageBuilder {
_buffer.clear();
_buffer.reserve(_options.data_page_size == 0 ? 1024 : _options.data_page_size);
_size_estimate = sizeof(uint32_t);
_prepared_size = sizeof(uint32_t);
_finished = false;
_last_value_size = 0;
}
Expand All @@ -127,11 +125,16 @@ class BinaryPlainPageBuilder : public PageBuilder {
return Status::OK();
}

void update_prepared_size(size_t added_size) {
_prepared_size += added_size;
_prepared_size += sizeof(uint32_t);
inline Slice operator[](size_t idx) const {
DCHECK(!_finished);
DCHECK_LT(idx, _offsets.size());
size_t value_size =
(idx < _offsets.size() - 1) ? _offsets[idx + 1] - _offsets[idx] : _last_value_size;
return Slice(&_buffer[_offsets[idx]], value_size);
}

inline Slice get(std::size_t idx) const { return (*this)[idx]; }

private:
void _copy_value_at(size_t idx, faststring* value) const {
size_t value_size =
Expand All @@ -141,7 +144,6 @@ class BinaryPlainPageBuilder : public PageBuilder {

faststring _buffer;
size_t _size_estimate;
size_t _prepared_size;
// Offsets of each entry, relative to the start of the page
std::vector<uint32_t> _offsets;
bool _finished;
Expand Down
37 changes: 36 additions & 1 deletion be/src/olap/rowset/segment_v2/bitshuffle_page.h
Original file line number Diff line number Diff line change
Expand Up @@ -93,13 +93,48 @@ class BitshufflePageBuilder : public PageBuilder {
bool is_page_full() override { return _remain_element_capacity == 0; }

Status add(const uint8_t* vals, size_t* count) override {
return add_internal<false>(vals, count);
}

Status single_add(const uint8_t* vals, size_t* count) {
return add_internal<true>(vals, count);
}

template <bool single>
inline Status add_internal(const uint8_t* vals, size_t* count) {
DCHECK(!_finished);
if (_remain_element_capacity <= 0) {
*count = 0;
return Status::RuntimeError("page is full.");
}
int to_add = std::min<int>(_remain_element_capacity, *count);
_data.append(vals, to_add * SIZE_OF_TYPE);
int to_add_size = to_add * SIZE_OF_TYPE;
size_t orig_size = _data.size();
_data.resize(orig_size + to_add_size);
_count += to_add;
_remain_element_capacity -= to_add;
// return added number through count
*count = to_add;
if constexpr (single) {
if constexpr (SIZE_OF_TYPE == 1) {
*reinterpret_cast<uint8_t*>(&_data[orig_size]) = *vals;
return Status::OK();
} else if constexpr (SIZE_OF_TYPE == 2) {
*reinterpret_cast<uint16_t*>(&_data[orig_size]) =
*reinterpret_cast<const uint16_t*>(vals);
return Status::OK();
} else if constexpr (SIZE_OF_TYPE == 4) {
*reinterpret_cast<uint32_t*>(&_data[orig_size]) =
*reinterpret_cast<const uint32_t*>(vals);
return Status::OK();
} else if constexpr (SIZE_OF_TYPE == 8) {
*reinterpret_cast<uint64_t*>(&_data[orig_size]) =
*reinterpret_cast<const uint64_t*>(vals);
return Status::OK();
}
}
// when single is true and SIZE_OF_TYPE > 8 or single is false
memcpy(&_data[orig_size], vals, to_add_size);
return Status::OK();
}

Expand Down

0 comments on commit 24a5809

Please sign in to comment.