diff --git a/be/src/olap/rowset/segment_creator.cpp b/be/src/olap/rowset/segment_creator.cpp index 82313f988cbb2b..0a97b116088ead 100644 --- a/be/src/olap/rowset/segment_creator.cpp +++ b/be/src/olap/rowset/segment_creator.cpp @@ -66,8 +66,7 @@ Status SegmentFlusher::flush_single_block(const vectorized::Block* block, int32_ RETURN_IF_ERROR(_parse_variant_columns(flush_block)); } bool no_compression = flush_block.bytes() <= config::segment_compression_threshold_kb * 1024; - if (config::enable_vertical_segment_writer && - _context.tablet_schema->cluster_key_idxes().empty()) { + if (config::enable_vertical_segment_writer) { std::unique_ptr writer; RETURN_IF_ERROR(_create_segment_writer(writer, segment_id, no_compression)); RETURN_IF_ERROR(_add_rows(writer, &flush_block, 0, flush_block.rows())); diff --git a/be/src/olap/rowset/segment_v2/segment_writer.cpp b/be/src/olap/rowset/segment_v2/segment_writer.cpp index bdfcaba8b8eb99..99f7db3ab326ea 100644 --- a/be/src/olap/rowset/segment_v2/segment_writer.cpp +++ b/be/src/olap/rowset/segment_v2/segment_writer.cpp @@ -941,6 +941,12 @@ Status SegmentWriter::append_block(const vectorized::Block* block, size_t row_po } } RETURN_IF_ERROR(_generate_short_key_index(key_columns, num_rows, short_key_pos)); + } else { + LOG(WARNING) << "The segment does not need primary or short key index" + << ", table_id=" << _tablet_schema->table_id() + << ", keys_type=" << _tablet_schema->keys_type() + << ", cluster_key num=" << _tablet_schema->cluster_key_idxes().size(); + return Status::InternalError("The segment does not need primary or short key index"); } } diff --git a/be/src/olap/rowset/segment_v2/vertical_segment_writer.cpp b/be/src/olap/rowset/segment_v2/vertical_segment_writer.cpp index ba1bfcf353539f..cd9b0a352d6d35 100644 --- a/be/src/olap/rowset/segment_v2/vertical_segment_writer.cpp +++ b/be/src/olap/rowset/segment_v2/vertical_segment_writer.cpp @@ -98,17 +98,40 @@ VerticalSegmentWriter::VerticalSegmentWriter(io::FileWriter* file_writer, uint32 CHECK_NOTNULL(file_writer); _num_key_columns = _tablet_schema->num_key_columns(); _num_short_key_columns = _tablet_schema->num_short_key_columns(); - DCHECK(_num_key_columns >= _num_short_key_columns); + if (_tablet_schema->cluster_key_idxes().empty()) { + DCHECK(_num_key_columns >= _num_short_key_columns) + << ", table_id=" << _tablet_schema->table_id() + << ", num_key_columns=" << _num_key_columns + << ", num_short_key_columns=" << _num_short_key_columns + << ", cluster_key_columns=" << _tablet_schema->cluster_key_idxes().size(); + } for (size_t cid = 0; cid < _num_key_columns; ++cid) { const auto& column = _tablet_schema->column(cid); _key_coders.push_back(get_key_coder(column.type())); _key_index_size.push_back(column.index_length()); } // encode the sequence id into the primary key index - if (_tablet_schema->has_sequence_col() && _tablet_schema->keys_type() == UNIQUE_KEYS && - _opts.enable_unique_key_merge_on_write) { - const auto& column = _tablet_schema->column(_tablet_schema->sequence_col_idx()); - _seq_coder = get_key_coder(column.type()); + if (_tablet_schema->keys_type() == UNIQUE_KEYS && _opts.enable_unique_key_merge_on_write) { + if (_tablet_schema->has_sequence_col()) { + const auto& column = _tablet_schema->column(_tablet_schema->sequence_col_idx()); + _seq_coder = get_key_coder(column.type()); + } + // encode the rowid into the primary key index + if (!_tablet_schema->cluster_key_idxes().empty()) { + const auto* type_info = get_scalar_type_info(); + _rowid_coder = get_key_coder(type_info->type()); + // primary keys + _primary_key_coders.swap(_key_coders); + // cluster keys + _key_coders.clear(); + _key_index_size.clear(); + _num_key_columns = _tablet_schema->cluster_key_idxes().size(); + for (auto cid : _tablet_schema->cluster_key_idxes()) { + const auto& column = _tablet_schema->column(cid); + _key_coders.push_back(get_key_coder(column.type())); + _key_index_size.push_back(column.index_length()); + } + } } if (_tablet_schema->has_inverted_index()) { _inverted_index_file_writer = std::make_unique( @@ -842,6 +865,7 @@ Status VerticalSegmentWriter::write_batch() { std::vector key_columns; vectorized::IOlapColumnDataAccessor* seq_column = nullptr; + std::map column_map; for (uint32_t cid = 0; cid < _tablet_schema->num_columns(); ++cid) { RETURN_IF_ERROR(_create_column_writer(cid, _tablet_schema->column(cid), _tablet_schema)); for (auto& data : _batched_blocks) { @@ -853,10 +877,25 @@ Status VerticalSegmentWriter::write_batch() { if (!status.ok()) { return status; } - if (cid < _num_key_columns) { - key_columns.push_back(column); - } else if (_tablet_schema->has_sequence_col() && - cid == _tablet_schema->sequence_col_idx()) { + + if (_tablet_schema->keys_type() == UNIQUE_KEYS && _opts.enable_unique_key_merge_on_write) { + if (cid < _tablet_schema->num_key_columns()) { + key_columns.push_back(column); + } + if (!_tablet_schema->cluster_key_idxes().empty()) { + for (auto id : _tablet_schema->cluster_key_idxes()) { + if (cid == id) { + column_map[cid] = column; + break; + } + } + } + } else { + if (cid < _num_key_columns) { + key_columns.push_back(column); + } + } + if (_tablet_schema->has_sequence_col() && cid == _tablet_schema->sequence_col_idx()) { seq_column = column; } RETURN_IF_ERROR(_column_writers[cid]->append(column->get_nullmap(), column->get_data(), @@ -886,31 +925,32 @@ Status VerticalSegmentWriter::write_batch() { _short_key_row_pos += _opts.num_rows_per_block; short_key_pos.push_back(_short_key_row_pos - _num_rows_written); } - if (_tablet_schema->keys_type() == UNIQUE_KEYS && _opts.enable_unique_key_merge_on_write) { - // create primary indexes - std::string last_key; - for (size_t pos = 0; pos < data.num_rows; pos++) { - std::string key = _full_encode_keys(key_columns, pos); - _maybe_invalid_row_cache(key); - if (_tablet_schema->has_sequence_col()) { - _encode_seq_column(seq_column, pos, &key); - } - DCHECK(key.compare(last_key) > 0) - << "found duplicate key or key is not sorted! current key: " << key - << ", last key" << last_key; - RETURN_IF_ERROR(_primary_key_index_builder->add_item(key)); - last_key = std::move(key); + bool need_primary_key_indexes = _tablet_schema->keys_type() == UNIQUE_KEYS && + _opts.enable_unique_key_merge_on_write; + bool need_short_key_indexes = + !need_primary_key_indexes || + (need_primary_key_indexes && !_tablet_schema->cluster_key_idxes().empty()); + if (need_primary_key_indexes && !need_short_key_indexes) { // mow table without cluster keys + RETURN_IF_ERROR(_generate_primary_key_index(_key_coders, key_columns, seq_column, + data.num_rows, false)); + } else if (!need_primary_key_indexes && need_short_key_indexes) { // other tables + RETURN_IF_ERROR(_generate_short_key_index(key_columns, data.num_rows, short_key_pos)); + } else if (need_primary_key_indexes && need_short_key_indexes) { // mow with cluster keys + // 1. generate primary key index + RETURN_IF_ERROR(_generate_primary_key_index(_primary_key_coders, key_columns, + seq_column, data.num_rows, true)); + // 2. generate short key index (use cluster key) + key_columns.clear(); + for (const auto& cid : _tablet_schema->cluster_key_idxes()) { + key_columns.push_back(column_map[cid]); } + RETURN_IF_ERROR(_generate_short_key_index(key_columns, data.num_rows, short_key_pos)); } else { - // create short key indexes' - // for min_max key - _set_min_key(_full_encode_keys(key_columns, 0)); - _set_max_key(_full_encode_keys(key_columns, data.num_rows - 1)); - - key_columns.resize(_num_short_key_columns); - for (const auto pos : short_key_pos) { - RETURN_IF_ERROR(_short_key_index_builder->add_item(_encode_keys(key_columns, pos))); - } + LOG(WARNING) << "The segment does not need primary or short key index" + << ", table_id=" << _tablet_schema->table_id() + << ", keys_type=" << _tablet_schema->keys_type() + << ", cluster_key num=" << _tablet_schema->cluster_key_idxes().size(); + return Status::InternalError("The segment does not need primary or short key index"); } _olap_data_convertor->clear_source_content(); _num_rows_written += data.num_rows; @@ -933,10 +973,81 @@ Status VerticalSegmentWriter::write_batch() { return Status::OK(); } +void VerticalSegmentWriter::_encode_rowid(const uint32_t rowid, string* encoded_keys) { + encoded_keys->push_back(KEY_NORMAL_MARKER); + _rowid_coder->full_encode_ascending(&rowid, encoded_keys); +} + +Status VerticalSegmentWriter::_generate_primary_key_index( + const std::vector& primary_key_coders, + const std::vector& primary_key_columns, + vectorized::IOlapColumnDataAccessor* seq_column, size_t num_rows, bool need_sort) { + if (!need_sort) { // mow table without cluster key + std::string last_key; + for (size_t pos = 0; pos < num_rows; pos++) { + // use _key_coders + std::string key = _full_encode_keys(primary_key_columns, pos); + _maybe_invalid_row_cache(key); + if (_tablet_schema->has_sequence_col()) { + _encode_seq_column(seq_column, pos, &key); + } + DCHECK(key.compare(last_key) > 0) + << "found duplicate key or key is not sorted! current key: " << key + << ", last key" << last_key; + RETURN_IF_ERROR(_primary_key_index_builder->add_item(key)); + last_key = std::move(key); + } + } else { // mow table with cluster key + // 1. generate primary keys in memory + std::vector primary_keys; + for (uint32_t pos = 0; pos < num_rows; pos++) { + std::string key = _full_encode_keys(primary_key_coders, primary_key_columns, pos); + _maybe_invalid_row_cache(key); + if (_tablet_schema->has_sequence_col()) { + _encode_seq_column(seq_column, pos, &key); + } + _encode_rowid(pos, &key); + primary_keys.emplace_back(std::move(key)); + } + // 2. sort primary keys + std::sort(primary_keys.begin(), primary_keys.end()); + // 3. write primary keys index + std::string last_key; + for (const auto& key : primary_keys) { + DCHECK(key.compare(last_key) > 0) + << "found duplicate key or key is not sorted! current key: " << key + << ", last key" << last_key; + RETURN_IF_ERROR(_primary_key_index_builder->add_item(key)); + } + } + return Status::OK(); +} + +Status VerticalSegmentWriter::_generate_short_key_index( + std::vector& key_columns, size_t num_rows, + const std::vector& short_key_pos) { + // use _key_coders + _set_min_key(_full_encode_keys(key_columns, 0)); + _set_max_key(_full_encode_keys(key_columns, num_rows - 1)); + + key_columns.resize(_num_short_key_columns); + for (const auto pos : short_key_pos) { + RETURN_IF_ERROR(_short_key_index_builder->add_item(_encode_keys(key_columns, pos))); + } + return Status::OK(); +} + std::string VerticalSegmentWriter::_full_encode_keys( const std::vector& key_columns, size_t pos) { assert(_key_index_size.size() == _num_key_columns); assert(key_columns.size() == _num_key_columns && _key_coders.size() == _num_key_columns); + return _full_encode_keys(_key_coders, key_columns, pos); +} + +std::string VerticalSegmentWriter::_full_encode_keys( + const std::vector& key_coders, + const std::vector& key_columns, size_t pos) { + assert(key_columns.size() == key_coders.size()); std::string encoded_keys; size_t cid = 0; @@ -948,7 +1059,8 @@ std::string VerticalSegmentWriter::_full_encode_keys( continue; } encoded_keys.push_back(KEY_NORMAL_MARKER); - _key_coders[cid]->full_encode_ascending(field, &encoded_keys); + DCHECK(key_coders[cid] != nullptr); + key_coders[cid]->full_encode_ascending(field, &encoded_keys); ++cid; } return encoded_keys; diff --git a/be/src/olap/rowset/segment_v2/vertical_segment_writer.h b/be/src/olap/rowset/segment_v2/vertical_segment_writer.h index 8068b3e44be6c8..f4680675d78de5 100644 --- a/be/src/olap/rowset/segment_v2/vertical_segment_writer.h +++ b/be/src/olap/rowset/segment_v2/vertical_segment_writer.h @@ -144,6 +144,9 @@ class VerticalSegmentWriter { // used for unique-key with merge on write and segment min_max key std::string _full_encode_keys( const std::vector& key_columns, size_t pos); + std::string _full_encode_keys( + const std::vector& key_coders, + const std::vector& key_columns, size_t pos); // used for unique-key with merge on write void _encode_seq_column(const vectorized::IOlapColumnDataAccessor* seq_column, size_t pos, string* encoded_keys); @@ -157,6 +160,13 @@ class VerticalSegmentWriter { const std::vector& use_default_or_null_flag, bool has_default_or_nullable, const size_t& segment_start_pos, const vectorized::Block* block); + Status _generate_primary_key_index( + const std::vector& primary_key_coders, + const std::vector& primary_key_columns, + vectorized::IOlapColumnDataAccessor* seq_column, size_t num_rows, bool need_sort); + Status _generate_short_key_index(std::vector& key_columns, + size_t num_rows, const std::vector& short_key_pos); + void _encode_rowid(const uint32_t rowid, string* encoded_keys); private: uint32_t _segment_id; @@ -181,7 +191,10 @@ class VerticalSegmentWriter { std::unique_ptr _olap_data_convertor; // used for building short key index or primary key index during vectorized write. std::vector _key_coders; + // for mow table with cluster keys, this is primary keys + std::vector _primary_key_coders; const KeyCoder* _seq_coder = nullptr; + const KeyCoder* _rowid_coder = nullptr; std::vector _key_index_size; size_t _short_key_row_pos = 0; diff --git a/regression-test/suites/unique_with_mow_c_p0/test_pk_uk_case.groovy b/regression-test/suites/unique_with_mow_c_p0/test_pk_uk_case.groovy index c0cb1add123c99..f08d2fb1248c7d 100644 --- a/regression-test/suites/unique_with_mow_c_p0/test_pk_uk_case.groovy +++ b/regression-test/suites/unique_with_mow_c_p0/test_pk_uk_case.groovy @@ -26,7 +26,7 @@ import java.util.Map; import java.util.UUID; import java.time.format.DateTimeFormatter; -suite("test_pk_uk_case_cluster_key") { +suite("test_pk_uk_case") { def tableNamePk = "primary_key_pk_uk" def tableNameUk = "unique_key_pk_uk"