Skip to content

Commit

Permalink
[fix](cluster key) cluster key support vertical_segment_writer
Browse files Browse the repository at this point in the history
  • Loading branch information
mymeiyi committed Jul 30, 2024
1 parent 8ecc249 commit 2f6dbff
Show file tree
Hide file tree
Showing 5 changed files with 166 additions and 36 deletions.
3 changes: 1 addition & 2 deletions be/src/olap/rowset/segment_creator.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -66,8 +66,7 @@ Status SegmentFlusher::flush_single_block(const vectorized::Block* block, int32_
RETURN_IF_ERROR(_parse_variant_columns(flush_block));
}
bool no_compression = flush_block.bytes() <= config::segment_compression_threshold_kb * 1024;
if (config::enable_vertical_segment_writer &&
_context.tablet_schema->cluster_key_idxes().empty()) {
if (config::enable_vertical_segment_writer) {
std::unique_ptr<segment_v2::VerticalSegmentWriter> writer;
RETURN_IF_ERROR(_create_segment_writer(writer, segment_id, no_compression));
RETURN_IF_ERROR(_add_rows(writer, &flush_block, 0, flush_block.rows()));
Expand Down
6 changes: 6 additions & 0 deletions be/src/olap/rowset/segment_v2/segment_writer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -941,6 +941,12 @@ Status SegmentWriter::append_block(const vectorized::Block* block, size_t row_po
}
}
RETURN_IF_ERROR(_generate_short_key_index(key_columns, num_rows, short_key_pos));
} else {
LOG(WARNING) << "The segment does not need primary or short key index"
<< ", table_id=" << _tablet_schema->table_id()
<< ", keys_type=" << _tablet_schema->keys_type()
<< ", cluster_key num=" << _tablet_schema->cluster_key_idxes().size();
return Status::InternalError("The segment does not need primary or short key index");
}
}

Expand Down
178 changes: 145 additions & 33 deletions be/src/olap/rowset/segment_v2/vertical_segment_writer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -98,17 +98,40 @@ VerticalSegmentWriter::VerticalSegmentWriter(io::FileWriter* file_writer, uint32
CHECK_NOTNULL(file_writer);
_num_key_columns = _tablet_schema->num_key_columns();
_num_short_key_columns = _tablet_schema->num_short_key_columns();
DCHECK(_num_key_columns >= _num_short_key_columns);
if (_tablet_schema->cluster_key_idxes().empty()) {
DCHECK(_num_key_columns >= _num_short_key_columns)
<< ", table_id=" << _tablet_schema->table_id()
<< ", num_key_columns=" << _num_key_columns
<< ", num_short_key_columns=" << _num_short_key_columns
<< ", cluster_key_columns=" << _tablet_schema->cluster_key_idxes().size();
}
for (size_t cid = 0; cid < _num_key_columns; ++cid) {
const auto& column = _tablet_schema->column(cid);
_key_coders.push_back(get_key_coder(column.type()));
_key_index_size.push_back(column.index_length());
}
// encode the sequence id into the primary key index
if (_tablet_schema->has_sequence_col() && _tablet_schema->keys_type() == UNIQUE_KEYS &&
_opts.enable_unique_key_merge_on_write) {
const auto& column = _tablet_schema->column(_tablet_schema->sequence_col_idx());
_seq_coder = get_key_coder(column.type());
if (_tablet_schema->keys_type() == UNIQUE_KEYS && _opts.enable_unique_key_merge_on_write) {
if (_tablet_schema->has_sequence_col()) {
const auto& column = _tablet_schema->column(_tablet_schema->sequence_col_idx());
_seq_coder = get_key_coder(column.type());
}
// encode the rowid into the primary key index
if (!_tablet_schema->cluster_key_idxes().empty()) {
const auto* type_info = get_scalar_type_info<FieldType::OLAP_FIELD_TYPE_UNSIGNED_INT>();
_rowid_coder = get_key_coder(type_info->type());
// primary keys
_primary_key_coders.swap(_key_coders);
// cluster keys
_key_coders.clear();
_key_index_size.clear();
_num_key_columns = _tablet_schema->cluster_key_idxes().size();
for (auto cid : _tablet_schema->cluster_key_idxes()) {
const auto& column = _tablet_schema->column(cid);
_key_coders.push_back(get_key_coder(column.type()));
_key_index_size.push_back(column.index_length());
}
}
}
if (_tablet_schema->has_inverted_index()) {
_inverted_index_file_writer = std::make_unique<InvertedIndexFileWriter>(
Expand Down Expand Up @@ -842,6 +865,7 @@ Status VerticalSegmentWriter::write_batch() {

std::vector<vectorized::IOlapColumnDataAccessor*> key_columns;
vectorized::IOlapColumnDataAccessor* seq_column = nullptr;
std::map<uint32_t, vectorized::IOlapColumnDataAccessor*> column_map;
for (uint32_t cid = 0; cid < _tablet_schema->num_columns(); ++cid) {
RETURN_IF_ERROR(_create_column_writer(cid, _tablet_schema->column(cid), _tablet_schema));
for (auto& data : _batched_blocks) {
Expand All @@ -853,10 +877,25 @@ Status VerticalSegmentWriter::write_batch() {
if (!status.ok()) {
return status;
}
if (cid < _num_key_columns) {
key_columns.push_back(column);
} else if (_tablet_schema->has_sequence_col() &&
cid == _tablet_schema->sequence_col_idx()) {

if (_tablet_schema->keys_type() == UNIQUE_KEYS && _opts.enable_unique_key_merge_on_write) {
if (cid < _tablet_schema->num_key_columns()) {
key_columns.push_back(column);
}
if (!_tablet_schema->cluster_key_idxes().empty()) {
for (auto id : _tablet_schema->cluster_key_idxes()) {
if (cid == id) {
column_map[cid] = column;
break;
}
}
}
} else {
if (cid < _num_key_columns) {
key_columns.push_back(column);
}
}
if (_tablet_schema->has_sequence_col() && cid == _tablet_schema->sequence_col_idx()) {
seq_column = column;
}
RETURN_IF_ERROR(_column_writers[cid]->append(column->get_nullmap(), column->get_data(),
Expand Down Expand Up @@ -886,31 +925,32 @@ Status VerticalSegmentWriter::write_batch() {
_short_key_row_pos += _opts.num_rows_per_block;
short_key_pos.push_back(_short_key_row_pos - _num_rows_written);
}
if (_tablet_schema->keys_type() == UNIQUE_KEYS && _opts.enable_unique_key_merge_on_write) {
// create primary indexes
std::string last_key;
for (size_t pos = 0; pos < data.num_rows; pos++) {
std::string key = _full_encode_keys(key_columns, pos);
_maybe_invalid_row_cache(key);
if (_tablet_schema->has_sequence_col()) {
_encode_seq_column(seq_column, pos, &key);
}
DCHECK(key.compare(last_key) > 0)
<< "found duplicate key or key is not sorted! current key: " << key
<< ", last key" << last_key;
RETURN_IF_ERROR(_primary_key_index_builder->add_item(key));
last_key = std::move(key);
bool need_primary_key_indexes = _tablet_schema->keys_type() == UNIQUE_KEYS &&
_opts.enable_unique_key_merge_on_write;
bool need_short_key_indexes =
!need_primary_key_indexes ||
(need_primary_key_indexes && !_tablet_schema->cluster_key_idxes().empty());
if (need_primary_key_indexes && !need_short_key_indexes) { // mow table without cluster keys
RETURN_IF_ERROR(_generate_primary_key_index(_key_coders, key_columns, seq_column,
data.num_rows, false));
} else if (!need_primary_key_indexes && need_short_key_indexes) { // other tables
RETURN_IF_ERROR(_generate_short_key_index(key_columns, data.num_rows, short_key_pos));
} else if (need_primary_key_indexes && need_short_key_indexes) { // mow with cluster keys
// 1. generate primary key index
RETURN_IF_ERROR(_generate_primary_key_index(_primary_key_coders, key_columns,
seq_column, data.num_rows, true));
// 2. generate short key index (use cluster key)
key_columns.clear();
for (const auto& cid : _tablet_schema->cluster_key_idxes()) {
key_columns.push_back(column_map[cid]);
}
RETURN_IF_ERROR(_generate_short_key_index(key_columns, data.num_rows, short_key_pos));
} else {
// create short key indexes'
// for min_max key
_set_min_key(_full_encode_keys(key_columns, 0));
_set_max_key(_full_encode_keys(key_columns, data.num_rows - 1));

key_columns.resize(_num_short_key_columns);
for (const auto pos : short_key_pos) {
RETURN_IF_ERROR(_short_key_index_builder->add_item(_encode_keys(key_columns, pos)));
}
LOG(WARNING) << "The segment does not need primary or short key index"
<< ", table_id=" << _tablet_schema->table_id()
<< ", keys_type=" << _tablet_schema->keys_type()
<< ", cluster_key num=" << _tablet_schema->cluster_key_idxes().size();
return Status::InternalError("The segment does not need primary or short key index");
}
_olap_data_convertor->clear_source_content();
_num_rows_written += data.num_rows;
Expand All @@ -933,10 +973,81 @@ Status VerticalSegmentWriter::write_batch() {
return Status::OK();
}

void VerticalSegmentWriter::_encode_rowid(const uint32_t rowid, string* encoded_keys) {
encoded_keys->push_back(KEY_NORMAL_MARKER);
_rowid_coder->full_encode_ascending(&rowid, encoded_keys);
}

Status VerticalSegmentWriter::_generate_primary_key_index(
const std::vector<const KeyCoder*>& primary_key_coders,
const std::vector<vectorized::IOlapColumnDataAccessor*>& primary_key_columns,
vectorized::IOlapColumnDataAccessor* seq_column, size_t num_rows, bool need_sort) {
if (!need_sort) { // mow table without cluster key
std::string last_key;
for (size_t pos = 0; pos < num_rows; pos++) {
// use _key_coders
std::string key = _full_encode_keys(primary_key_columns, pos);
_maybe_invalid_row_cache(key);
if (_tablet_schema->has_sequence_col()) {
_encode_seq_column(seq_column, pos, &key);
}
DCHECK(key.compare(last_key) > 0)
<< "found duplicate key or key is not sorted! current key: " << key
<< ", last key" << last_key;
RETURN_IF_ERROR(_primary_key_index_builder->add_item(key));
last_key = std::move(key);
}
} else { // mow table with cluster key
// 1. generate primary keys in memory
std::vector<std::string> primary_keys;
for (uint32_t pos = 0; pos < num_rows; pos++) {
std::string key = _full_encode_keys(primary_key_coders, primary_key_columns, pos);
_maybe_invalid_row_cache(key);
if (_tablet_schema->has_sequence_col()) {
_encode_seq_column(seq_column, pos, &key);
}
_encode_rowid(pos, &key);
primary_keys.emplace_back(std::move(key));
}
// 2. sort primary keys
std::sort(primary_keys.begin(), primary_keys.end());
// 3. write primary keys index
std::string last_key;
for (const auto& key : primary_keys) {
DCHECK(key.compare(last_key) > 0)
<< "found duplicate key or key is not sorted! current key: " << key
<< ", last key" << last_key;
RETURN_IF_ERROR(_primary_key_index_builder->add_item(key));
}
}
return Status::OK();
}

Status VerticalSegmentWriter::_generate_short_key_index(
std::vector<vectorized::IOlapColumnDataAccessor*>& key_columns, size_t num_rows,
const std::vector<size_t>& short_key_pos) {
// use _key_coders
_set_min_key(_full_encode_keys(key_columns, 0));
_set_max_key(_full_encode_keys(key_columns, num_rows - 1));

key_columns.resize(_num_short_key_columns);
for (const auto pos : short_key_pos) {
RETURN_IF_ERROR(_short_key_index_builder->add_item(_encode_keys(key_columns, pos)));
}
return Status::OK();
}

std::string VerticalSegmentWriter::_full_encode_keys(
const std::vector<vectorized::IOlapColumnDataAccessor*>& key_columns, size_t pos) {
assert(_key_index_size.size() == _num_key_columns);
assert(key_columns.size() == _num_key_columns && _key_coders.size() == _num_key_columns);
return _full_encode_keys(_key_coders, key_columns, pos);
}

std::string VerticalSegmentWriter::_full_encode_keys(
const std::vector<const KeyCoder*>& key_coders,
const std::vector<vectorized::IOlapColumnDataAccessor*>& key_columns, size_t pos) {
assert(key_columns.size() == key_coders.size());

std::string encoded_keys;
size_t cid = 0;
Expand All @@ -948,7 +1059,8 @@ std::string VerticalSegmentWriter::_full_encode_keys(
continue;
}
encoded_keys.push_back(KEY_NORMAL_MARKER);
_key_coders[cid]->full_encode_ascending(field, &encoded_keys);
DCHECK(key_coders[cid] != nullptr);
key_coders[cid]->full_encode_ascending(field, &encoded_keys);
++cid;
}
return encoded_keys;
Expand Down
13 changes: 13 additions & 0 deletions be/src/olap/rowset/segment_v2/vertical_segment_writer.h
Original file line number Diff line number Diff line change
Expand Up @@ -144,6 +144,9 @@ class VerticalSegmentWriter {
// used for unique-key with merge on write and segment min_max key
std::string _full_encode_keys(
const std::vector<vectorized::IOlapColumnDataAccessor*>& key_columns, size_t pos);
std::string _full_encode_keys(
const std::vector<const KeyCoder*>& key_coders,
const std::vector<vectorized::IOlapColumnDataAccessor*>& key_columns, size_t pos);
// used for unique-key with merge on write
void _encode_seq_column(const vectorized::IOlapColumnDataAccessor* seq_column, size_t pos,
string* encoded_keys);
Expand All @@ -157,6 +160,13 @@ class VerticalSegmentWriter {
const std::vector<bool>& use_default_or_null_flag,
bool has_default_or_nullable, const size_t& segment_start_pos,
const vectorized::Block* block);
Status _generate_primary_key_index(
const std::vector<const KeyCoder*>& primary_key_coders,
const std::vector<vectorized::IOlapColumnDataAccessor*>& primary_key_columns,
vectorized::IOlapColumnDataAccessor* seq_column, size_t num_rows, bool need_sort);
Status _generate_short_key_index(std::vector<vectorized::IOlapColumnDataAccessor*>& key_columns,
size_t num_rows, const std::vector<size_t>& short_key_pos);
void _encode_rowid(const uint32_t rowid, string* encoded_keys);

private:
uint32_t _segment_id;
Expand All @@ -181,7 +191,10 @@ class VerticalSegmentWriter {
std::unique_ptr<vectorized::OlapBlockDataConvertor> _olap_data_convertor;
// used for building short key index or primary key index during vectorized write.
std::vector<const KeyCoder*> _key_coders;
// for mow table with cluster keys, this is primary keys
std::vector<const KeyCoder*> _primary_key_coders;
const KeyCoder* _seq_coder = nullptr;
const KeyCoder* _rowid_coder = nullptr;
std::vector<uint16_t> _key_index_size;
size_t _short_key_row_pos = 0;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ import java.util.Map;
import java.util.UUID;
import java.time.format.DateTimeFormatter;

suite("test_pk_uk_case_cluster_key") {
suite("test_pk_uk_case") {
def tableNamePk = "primary_key_pk_uk"
def tableNameUk = "unique_key_pk_uk"

Expand Down

0 comments on commit 2f6dbff

Please sign in to comment.