diff --git a/be/src/olap/compaction.cpp b/be/src/olap/compaction.cpp index ecbe80304098db6..ab617eb57ac2af6 100644 --- a/be/src/olap/compaction.cpp +++ b/be/src/olap/compaction.cpp @@ -154,6 +154,17 @@ Status Compaction::do_compaction_impl(int64_t permits) { // The test results show that merger is low-memory-footprint, there is no need to tracker its mem pool Merger::Statistics stats; Status res; + TabletReader::ReaderParams reader_params; + reader_params.tablet = _tablet; + reader_params.reader_type = compaction_type(); + reader_params.rs_readers = _input_rs_readers; + reader_params.version = _output_rs_writer->version(); + if (_tablet->keys_type() == KeysType::UNIQUE_KEYS && + _tablet->speed_up_unique_key_with_aux_index()) { + stats.rowid_conversion = &_rowid_conversion; + reader_params.record_rowids = true; + reader_params.delete_bitmap = &_tablet->tablet_meta()->delete_bitmap(); + } if (use_vectorized_compaction) { res = Merger::vmerge_rowsets(_tablet, compaction_type(), &cur_tablet_schema, @@ -242,6 +253,14 @@ Status Compaction::modify_rowsets() { std::vector output_rowsets; output_rowsets.push_back(_output_rowset); std::lock_guard wrlock(_tablet->get_header_lock()); + + // update dst rowset delete bitmap + if (_tablet->keys_type() == KeysType::UNIQUE_KEYS && + _tablet->speed_up_unique_key_with_aux_index()) { + _tablet->tablet_meta()->update_delete_bitmap(_input_rowsets, _output_rs_writer->version(), + _rowid_conversion); + } + RETURN_NOT_OK(_tablet->modify_rowsets(output_rowsets, _input_rowsets, true)); _tablet->save_meta(); return Status::OK(); diff --git a/be/src/olap/compaction.h b/be/src/olap/compaction.h index 4dcb4a21b181b7b..7d7a43bfd37988a 100644 --- a/be/src/olap/compaction.h +++ b/be/src/olap/compaction.h @@ -95,6 +95,7 @@ class Compaction { int64_t _oldest_write_timestamp; int64_t _newest_write_timestamp; + RowIdConversion _rowid_conversion; DISALLOW_COPY_AND_ASSIGN(Compaction); }; diff --git a/be/src/olap/iterators.h b/be/src/olap/iterators.h index b2db75a54ae1c68..0ce0d011de0c3c0 100644 --- a/be/src/olap/iterators.h +++ b/be/src/olap/iterators.h @@ -24,6 +24,7 @@ #include "olap/column_predicate.h" #include "olap/olap_common.h" #include "olap/tablet_schema.h" +#include "olap/rowid_conversion.h" #include "vec/core/block.h" namespace doris { @@ -86,6 +87,7 @@ class StorageReadOptions { int block_row_max = 4096; const TabletSchema* tablet_schema = nullptr; + bool record_rowids = false; }; // Used to read data in RowBlockV2 one by one @@ -113,6 +115,10 @@ class RowwiseIterator { return Status::NotSupported("to be implemented"); } + virtual Status current_batch_segment_rowids(std::vector* block_segment_rowids) { + return Status::NotSupported("to be implemented"); + } + // return schema for this Iterator virtual const Schema& schema() const = 0; diff --git a/be/src/olap/merger.cpp b/be/src/olap/merger.cpp index b1c712532f41b4f..bab1ba521191e1e 100644 --- a/be/src/olap/merger.cpp +++ b/be/src/olap/merger.cpp @@ -105,6 +105,10 @@ Status Merger::vmerge_rowsets(TabletSharedPtr tablet, ReaderType reader_type, reader_params.tablet_schema = cur_tablet_schema; const auto& schema = *cur_tablet_schema; + // rows id conversion can works in UNIQUE_KEYS and DUP_KEYS model + // only UNIQUE_KEYS model need rows id conversion + reader_params.record_rowids = tablet->keys_type() == KeysType::UNIQUE_KEYS; + reader_params.return_columns.resize(schema.num_columns()); std::iota(reader_params.return_columns.begin(), reader_params.return_columns.end(), 0); reader_params.origin_return_columns = &reader_params.return_columns; @@ -113,6 +117,7 @@ Status Merger::vmerge_rowsets(TabletSharedPtr tablet, ReaderType reader_type, vectorized::Block block = schema.create_block(reader_params.return_columns); size_t output_rows = 0; bool eof = false; + RowIdConversion rowid_conversion; while (!eof) { // Read one block from block reader RETURN_NOT_OK_LOG( @@ -121,6 +126,11 @@ Status Merger::vmerge_rowsets(TabletSharedPtr tablet, ReaderType reader_type, RETURN_NOT_OK_LOG( dst_rowset_writer->add_block(&block), "failed to write block when merging rowsets of tablet " + tablet->full_name()); + + if (reader_params.record_rowids && block.rows() > 0) { + stats_output->rowid_conversion->add(reader.current_block_row_locations()); + } + output_rows += block.rows(); block.clear_column_data(); } @@ -135,6 +145,19 @@ Status Merger::vmerge_rowsets(TabletSharedPtr tablet, ReaderType reader_type, dst_rowset_writer->flush(), "failed to flush rowset when merging rowsets of tablet " + tablet->full_name()); + if (reader_params.record_rowids) { + // rowid_conversion set segment rows number of destination rowset + std::vector segment_num_rows; + RETURN_NOT_OK(dst_rowset_writer->get_segment_num_rows(&segment_num_rows)); + rowid_conversion.set_dst_segment_num_rows(dst_rowset_writer->rowset_id(), segment_num_rows); + + // get row id conversion here for delete bitmap + // use exmaple: + // RowsetSegmentRowId src(rowset_id, segment_id, row_id); + // RowsetSegmentRowId dst; + // rowid_conversion.get(src, &dst); + } + return Status::OK(); } diff --git a/be/src/olap/reader.cpp b/be/src/olap/reader.cpp index b1da44702c67dbb..d0c868082e057ff 100644 --- a/be/src/olap/reader.cpp +++ b/be/src/olap/reader.cpp @@ -225,6 +225,7 @@ Status TabletReader::_capture_rs_readers(const ReaderParams& read_params, _reader_context.batch_size = _batch_size; _reader_context.is_unique = tablet()->keys_type() == UNIQUE_KEYS; _reader_context.merged_rows = &_merged_rows; + _reader_context.record_rowids = read_params.record_rowids; *valid_rs_readers = *rs_readers; diff --git a/be/src/olap/reader.h b/be/src/olap/reader.h index fd9de829402b840..456eee424c60914 100644 --- a/be/src/olap/reader.h +++ b/be/src/olap/reader.h @@ -90,6 +90,9 @@ class TabletReader { std::vector* origin_return_columns = nullptr; std::unordered_set* tablet_columns_convert_to_null_set = nullptr; + // used for comapction to record row ids + bool record_rowids = false; + void check_validation() const; std::string to_string() const; diff --git a/be/src/olap/rowid_conversion.h b/be/src/olap/rowid_conversion.h new file mode 100644 index 000000000000000..28927204aa712c5 --- /dev/null +++ b/be/src/olap/rowid_conversion.h @@ -0,0 +1,111 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#pragma once + +#include +#include + +#include "olap/olap_common.h" + +namespace doris { + +class RowIdConversion { +public: + RowIdConversion() = default; + ~RowIdConversion() = default; + + // resize segment map to its rows num + void init_segment_map(const RowsetId& src_rowset_id, const std::vector& num_rows) { + for (size_t i = 0; i < num_rows.size(); i++) { + _segments_rowid_map[{src_rowset_id, i}].resize(num_rows[i], UINT64_MAX); + } + } + + // add row id to the map + void add(const std::vector& rss_row_ids) { + for (auto& item : rss_row_ids) { + _segments_rowid_map[{item.rowset_id, item.segment_id}][item.row_id] = + _dst_rowset_num_rows; + _dst_rowset_num_rows++; + } + } + + // get destination RowLocation + // return non-zero if the src RowLocation does not exist + int get(const RowLocation& src, RowLocation* dst) const { + auto iter = _segments_rowid_map.find({src.rowset_id, src.segment_id}); + if (iter == _segments_rowid_map.end()) { + return -1; + } + const RowIdMap& rowid_map = iter->second; + if (src.row_id >= rowid_map.size()) { + return -1; + } + uint64_t dst_rowset_rowid = rowid_map[src.row_id]; + if (dst_rowset_rowid == UINT64_MAX) { + return -1; + } + + dst->rowset_id = _dst_rowst_id; + // get dst segment id and row id + for (auto i = 0; i < _dst_rowset_segment_cumu_num_rows.size(); i++) { + if (dst_rowset_rowid < _dst_rowset_segment_cumu_num_rows[i]) { + dst->segment_id = i; + if (i == 0) { + dst->row_id = dst_rowset_rowid; + } else { + dst->row_id = dst_rowset_rowid - _dst_rowset_segment_cumu_num_rows[i - 1]; + } + break; + } + } + } + + // set segment rows number of destination rowset + // record cumulative value + void set_dst_segment_num_rows(const RowsetId& dst_rowset_rowid, + const std::vector& segment_num_rows) { + _dst_rowst_id = dst_rowset_rowid; + uint64_t sum = 0; + for (auto num_rows : segment_num_rows) { + sum += num_rows; + _dst_rowset_segment_cumu_num_rows.push_back(sum); + } + DCHECK_EQ(sum, _dst_rowset_num_rows); + } + +private: + using RowId = uint32_t; + using SegmentId = uint32_t; + // key: vector index indicates row id of source segment, value: row id of destination rowset + // UINT64_MAX indicates not exist + using RowIdMap = std::vector; + // key: src segment + using SegmentsRowIdMap = std::map, RowIdMap>; + + SegmentsRowIdMap _segments_rowid_map; + + // segement rows number of destination rowset + std::vector _dst_rowset_segment_cumu_num_rows; + + RowsetId _dst_rowst_id; + // rows number of destination rowset + uint64_t _dst_rowset_num_rows = 0; +}; + +} // namespace doris diff --git a/be/src/olap/rowset/beta_rowset_reader.cpp b/be/src/olap/rowset/beta_rowset_reader.cpp index c4d36492f37df89..e924517a19af157 100644 --- a/be/src/olap/rowset/beta_rowset_reader.cpp +++ b/be/src/olap/rowset/beta_rowset_reader.cpp @@ -84,6 +84,7 @@ Status BetaRowsetReader::init(RowsetReaderContext* read_context) { } read_options.use_page_cache = read_context->use_page_cache; read_options.tablet_schema = read_context->tablet_schema; + read_options.record_rowids = read_context->record_rowids; // load segments RETURN_NOT_OK(SegmentLoader::instance()->load_segments( diff --git a/be/src/olap/rowset/beta_rowset_reader.h b/be/src/olap/rowset/beta_rowset_reader.h index a6c7caf57c6da48..233031e630880ab 100644 --- a/be/src/olap/rowset/beta_rowset_reader.h +++ b/be/src/olap/rowset/beta_rowset_reader.h @@ -56,6 +56,10 @@ class BetaRowsetReader : public RowsetReader { RowsetTypePB type() const override { return RowsetTypePB::BETA_ROWSET; } + Status current_batch_segment_rowids(std::vector* block_segment_rowids) override { + return _iterator->current_batch_segment_rowids(block_segment_rowids); + } + private: bool _should_push_down_value_predicates() const; diff --git a/be/src/olap/rowset/beta_rowset_writer.cpp b/be/src/olap/rowset/beta_rowset_writer.cpp index 0a8d84ae314c14f..b46bde66ced7e79 100644 --- a/be/src/olap/rowset/beta_rowset_writer.cpp +++ b/be/src/olap/rowset/beta_rowset_writer.cpp @@ -305,6 +305,7 @@ Status BetaRowsetWriter::_create_segment_writer( } Status BetaRowsetWriter::_flush_segment_writer(std::unique_ptr* writer) { + _segment_num_rows.push_back((*writer)->num_rows_written()); if ((*writer)->num_rows_written() == 0) { return Status::OK(); } diff --git a/be/src/olap/rowset/beta_rowset_writer.h b/be/src/olap/rowset/beta_rowset_writer.h index a66981aef7ec868..b190a130fe671c9 100644 --- a/be/src/olap/rowset/beta_rowset_writer.h +++ b/be/src/olap/rowset/beta_rowset_writer.h @@ -65,6 +65,11 @@ class BetaRowsetWriter : public RowsetWriter { RowsetTypePB type() const override { return RowsetTypePB::BETA_ROWSET; } + Status get_segment_num_rows(std::vector* segment_num_rows) const { + *segment_num_rows = _segment_num_rows; + return Status::OK(); + } + private: template Status _add_row(const RowType& row); @@ -95,6 +100,9 @@ class BetaRowsetWriter : public RowsetWriter { bool _is_pending = false; bool _already_built = false; + + // record rows number of every segment + std::vector _segment_num_rows; }; } // namespace doris diff --git a/be/src/olap/rowset/rowset_reader.h b/be/src/olap/rowset/rowset_reader.h index d3dcb30d767cf0e..804fbeb5f6f7540 100644 --- a/be/src/olap/rowset/rowset_reader.h +++ b/be/src/olap/rowset/rowset_reader.h @@ -22,6 +22,7 @@ #include #include "gen_cpp/olap_file.pb.h" +#include "olap/rowid_conversion.h" #include "olap/rowset/rowset.h" #include "olap/rowset/rowset_reader_context.h" #include "vec/core/block.h" @@ -64,6 +65,9 @@ class RowsetReader { virtual int64_t oldest_write_timestamp() = 0; virtual int64_t newest_write_timestamp() = 0; + virtual Status current_batch_segment_rowids(std::vector* block_segment_rowids) { + return Status::NotSupported("to be implemented"); + } }; } // namespace doris diff --git a/be/src/olap/rowset/rowset_reader_context.h b/be/src/olap/rowset/rowset_reader_context.h index 24379b188cf0b92..ee3dcfb994ef0af 100644 --- a/be/src/olap/rowset/rowset_reader_context.h +++ b/be/src/olap/rowset/rowset_reader_context.h @@ -66,6 +66,7 @@ struct RowsetReaderContext { bool is_unique = false; //record row num merged in generic iterator uint64_t* merged_rows = nullptr; + bool record_rowids = false; }; } // namespace doris diff --git a/be/src/olap/rowset/rowset_writer.h b/be/src/olap/rowset/rowset_writer.h index dfe5186ad80989d..114baae9f818902 100644 --- a/be/src/olap/rowset/rowset_writer.h +++ b/be/src/olap/rowset/rowset_writer.h @@ -76,6 +76,10 @@ class RowsetWriter { virtual RowsetTypePB type() const = 0; + virtual Status get_segment_num_rows(std::vector* segment_num_rows) const { + return Status::NotSupported("to be implemented"); + } + private: DISALLOW_COPY_AND_ASSIGN(RowsetWriter); }; diff --git a/be/src/olap/rowset/segment_v2/segment_iterator.cpp b/be/src/olap/rowset/segment_v2/segment_iterator.cpp index 3ab94f0d9bb1348..70936a4d4ee2257 100644 --- a/be/src/olap/rowset/segment_v2/segment_iterator.cpp +++ b/be/src/olap/rowset/segment_v2/segment_iterator.cpp @@ -972,7 +972,7 @@ Status SegmentIterator::next_batch(vectorized::Block* block) { if (UNLIKELY(!_inited)) { RETURN_IF_ERROR(_init(true)); _inited = true; - if (_lazy_materialization_read) { + if (_lazy_materialization_read || _opts.record_rowids) { _block_rowids.resize(_opts.block_row_max); } _current_return_columns.resize(_schema.columns().size()); @@ -1000,18 +1000,19 @@ Status SegmentIterator::next_batch(vectorized::Block* block) { _init_current_block(block, _current_return_columns); - uint32_t nrows_read = 0; + _current_batch_rows_read = 0; uint32_t nrows_read_limit = _opts.block_row_max; if (UNLIKELY(_estimate_row_size)) { // read 100 rows to estimate average row size nrows_read_limit = 100; } - _read_columns_by_index(nrows_read_limit, nrows_read, _lazy_materialization_read); + _read_columns_by_index(nrows_read_limit, _current_batch_rows_read, + _lazy_materialization_read || _opts.record_rowids); _opts.stats->blocks_load += 1; - _opts.stats->raw_rows_read += nrows_read; + _opts.stats->raw_rows_read += _current_batch_rows_read; - if (nrows_read == 0) { + if (_current_batch_rows_read == 0) { for (int i = 0; i < block->columns(); i++) { auto cid = _schema.column_id(i); // todo(wb) abstract make column where @@ -1027,7 +1028,7 @@ Status SegmentIterator::next_batch(vectorized::Block* block) { _output_non_pred_columns(block); } else { _convert_dict_code_for_predicate_if_necessary(); - uint16_t selected_size = nrows_read; + uint16_t selected_size = _current_batch_rows_read; uint16_t sel_rowid_idx[selected_size]; // step 1: evaluate vectorization predicate @@ -1088,5 +1089,18 @@ void SegmentIterator::_update_max_row(const vectorized::Block* block) { _opts.block_row_max = std::min(block_row_max, _opts.block_row_max); } +// used for comapction, so no need to process predicate column +Status SegmentIterator::current_batch_segment_rowids( + std::vector* block_segment_rowids) { + DCHECK(_opts.record_rowids); + DCHECK_GE(_block_rowids.size(), _current_batch_rows_read); + block_segment_rowids->resize(_current_batch_rows_read); + uint32_t sid = segment_id(); + for (auto i = 0; i < _current_batch_rows_read; i++) { + (*block_segment_rowids)[i] = SegmentRowId(sid, _block_rowids[i]); + } + return Status::OK(); +} + } // namespace segment_v2 } // namespace doris diff --git a/be/src/olap/rowset/segment_v2/segment_iterator.h b/be/src/olap/rowset/segment_v2/segment_iterator.h index 64dd5c13411cc00..a0bd6b1c12d294a 100644 --- a/be/src/olap/rowset/segment_v2/segment_iterator.h +++ b/be/src/olap/rowset/segment_v2/segment_iterator.h @@ -61,6 +61,8 @@ class SegmentIterator : public RowwiseIterator { bool is_lazy_materialization_read() const override { return _lazy_materialization_read; } uint64_t data_id() const override { return _segment->id(); } + Status current_batch_segment_rowids(std::vector* block_segment_rowids) override; + private: Status _init(bool is_vec = false); @@ -213,6 +215,9 @@ class SegmentIterator : public RowwiseIterator { // char_type columns cid std::vector _char_type_idx; + + // number of rows read in the current batch + uint32_t _current_batch_rows_read = 0; }; } // namespace segment_v2 diff --git a/be/src/vec/olap/block_reader.cpp b/be/src/vec/olap/block_reader.cpp index abef78014bb6a8c..68d9841ebcd5342 100644 --- a/be/src/vec/olap/block_reader.cpp +++ b/be/src/vec/olap/block_reader.cpp @@ -167,6 +167,12 @@ Status BlockReader::_direct_next_block(Block* block, MemPool* mem_pool, ObjectPo return res; } *eof = res.precise_code() == OLAP_ERR_DATA_EOF; + if (UNLIKELY(_reader_context.record_rowids)) { + res = _vcollect_iter.current_block_row_locations(&_block_row_locations); + if (UNLIKELY(!res.ok() && res != Status::OLAPInternalError(OLAP_ERR_DATA_EOF))) { + return res; + } + } return Status::OK(); } @@ -234,9 +240,16 @@ Status BlockReader::_unique_key_next_block(Block* block, MemPool* mem_pool, Obje auto target_block_row = 0; auto target_columns = block->mutate_columns(); + if (UNLIKELY(_reader_context.record_rowids)) { + _block_rowset_segment_rowids.resize(_batch_size); + } do { _insert_data_normal(target_columns); + if (UNLIKELY(_reader_context.record_rowids)) { + _block_rowset_segment_rowids[target_block_row] = + _vcollect_iter.current_rowset_segment_rowid(); + } target_block_row++; // the version is in reverse order, the first row is the highest version, @@ -254,6 +267,10 @@ Status BlockReader::_unique_key_next_block(Block* block, MemPool* mem_pool, Obje } } while (target_block_row < _batch_size); + if (UNLIKELY(_reader_context.record_rowids)) { + _block_rowset_segment_rowids.resize(block->rows()); + } + return Status::OK(); } diff --git a/be/src/vec/olap/block_reader.h b/be/src/vec/olap/block_reader.h index ad2d27d1d8cdb70..04f8bb10eccc338 100644 --- a/be/src/vec/olap/block_reader.h +++ b/be/src/vec/olap/block_reader.h @@ -45,6 +45,10 @@ class BlockReader final : public TabletReader { return (this->*_next_block_func)(block, mem_pool, agg_pool, eof); } + std::vector current_block_rowset_segment_rowids() { + return _block_rowset_segment_rowids; + } + private: // Directly read row from rowset and pass to upper caller. No need to do aggregation. // This is usually used for DUPLICATE KEY tables @@ -101,6 +105,8 @@ class BlockReader final : public TabletReader { Status (BlockReader::*_next_block_func)(Block* block, MemPool* mem_pool, ObjectPool* agg_pool, bool* eof) = nullptr; + + std::vector _block_rowset_segment_rowids; }; } // namespace vectorized diff --git a/be/src/vec/olap/vcollect_iterator.cpp b/be/src/vec/olap/vcollect_iterator.cpp index 81eb20f3a28eedd..6c88887179f4899 100644 --- a/be/src/vec/olap/vcollect_iterator.cpp +++ b/be/src/vec/olap/vcollect_iterator.cpp @@ -211,6 +211,10 @@ Status VCollectIterator::Level0Iterator::_refresh_current_row() { _ref.row_pos = -1; return Status::OLAPInternalError(OLAP_ERR_DATA_EOF); } + + if (UNLIKELY(_reader->_reader_context.record_rowids)) { + RETURN_NOT_OK(_rs_reader->current_block_row_locations(&_block_row_locations)); + } } } while (_block->rows() != 0); _ref.row_pos = -1; @@ -235,12 +239,34 @@ Status VCollectIterator::Level0Iterator::next(Block* block) { return res; } if (res.precise_code() == OLAP_ERR_DATA_EOF && _block->rows() == 0) { + _block_row_locations.clear(); return Status::OLAPInternalError(OLAP_ERR_DATA_EOF); } + if (UNLIKELY(_reader->_reader_context.record_rowids)) { + RETURN_NOT_OK(_rs_reader->current_block_row_locations(&_block_row_locations)); + } return Status::OK(); } } +RowsetSegmentRowId VCollectIterator::Level0Iterator::current_rowset_segment_rowid() { + SegmentRowId& segment_row_id = _block_segment_rowids[_ref.row_pos]; + return RowsetSegmentRowId(_rs_reader->rowset()->rowset_id(), segment_row_id.segment_id, + segment_row_id.row_id); +} + +Status VCollectIterator::Level0Iterator::current_batch_rowset_segment_rowids( + std::vector* block_rowset_segment_rowids) { + block_rowset_segment_rowids->resize(_block_segment_rowids.size()); + for (auto i = 0; i < _block_segment_rowids.size(); i++) { + SegmentRowId& segment_row_id = _block_segment_rowids[i]; + (*block_rowset_segment_rowids)[i] = + RowsetSegmentRowId(_rs_reader->rowset()->rowset_id(), segment_row_id.segment_id, + segment_row_id.row_id); + } + return Status::OK(); +} + VCollectIterator::Level1Iterator::Level1Iterator( const std::list& children, TabletReader* reader, bool merge, bool skip_same) @@ -406,6 +432,9 @@ Status VCollectIterator::Level1Iterator::_merge_next(Block* block) { auto target_columns = block->mutate_columns(); size_t column_count = block->columns(); IteratorRowRef cur_row = _ref; + if (UNLIKELY(_reader->_reader_context.record_rowids)) { + _block_rowset_segment_rowids.resize(_batch_size); + } do { const auto& src_block = cur_row.block; assert(src_block->columns() == column_count); @@ -413,9 +442,16 @@ Status VCollectIterator::Level1Iterator::_merge_next(Block* block) { target_columns[i]->insert_from(*(src_block->get_by_position(i).column), cur_row.row_pos); } + if (UNLIKELY(_reader->_reader_context.record_rowids)) { + _block_rowset_segment_rowids[target_block_row] = + _cur_child->current_rowset_segment_rowid(); + } ++target_block_row; auto res = _merge_next(&cur_row); if (UNLIKELY(res.precise_code() == OLAP_ERR_DATA_EOF)) { + if (UNLIKELY(_reader->_reader_context.record_rowids)) { + _block_row_locations.resize(target_block_row); + } return res; } @@ -450,5 +486,19 @@ Status VCollectIterator::Level1Iterator::_normal_next(Block* block) { } } +Status VCollectIterator::Level1Iterator::current_block_row_locations( + std::vector* block_row_locations) { + if (!_merge) { + if (UNLIKELY(_cur_child == nullptr)) { + return Status::OLAPInternalError(OLAP_ERR_DATA_EOF); + } + return _cur_child->current_block_row_locations(block_row_locations); + } else { + DCHECK(_reader->_reader_context.record_rowids); + *block_row_locations = _block_row_locations; + return Status::OK(); + } +} + } // namespace vectorized } // namespace doris diff --git a/be/src/vec/olap/vcollect_iterator.h b/be/src/vec/olap/vcollect_iterator.h index 4c8002d88520095..50240212b2f1a62 100644 --- a/be/src/vec/olap/vcollect_iterator.h +++ b/be/src/vec/olap/vcollect_iterator.h @@ -25,6 +25,7 @@ #include "olap/olap_define.h" #include "olap/reader.h" +#include "olap/rowid_conversion.h" #include "olap/rowset/rowset_reader.h" #include "vec/core/block.h" @@ -64,6 +65,15 @@ class VCollectIterator { bool is_merge() const { return _merge; } + RowsetSegmentRowId current_rowset_segment_rowid() { + return _inner_iter->current_rowset_segment_rowid(); + } + + Status current_batch_rowset_segment_rowids( + std::vector* block_rowset_segment_rowids) { + return _inner_iter->current_batch_rowset_segment_rowids(block_rowset_segment_rowids); + } + private: // This interface is the actual implementation of the new version of iterator. // It currently contains two implementations, one is Level0Iterator, @@ -93,6 +103,11 @@ class VCollectIterator { const TabletSchema& tablet_schema() const { return _schema; }; + virtual RowsetSegmentRowId current_rowset_segment_rowid() = 0; + + virtual Status current_batch_rowset_segment_rowids( + std::vector* block_rowset_segment_rowids) = 0; + protected: const TabletSchema& _schema; IteratorRowRef _ref; @@ -132,12 +147,18 @@ class VCollectIterator { Status next(Block* block) override; + RowsetSegmentRowId current_rowset_segment_rowid() override; + + Status current_batch_rowset_segment_rowids( + std::vector* block_rowset_segment_rowids) override; + private: Status _refresh_current_row(); RowsetReaderSharedPtr _rs_reader; TabletReader* _reader = nullptr; std::shared_ptr _block; + std::vector _block_segment_rowids; }; // Iterate from LevelIterators (maybe Level0Iterators or Level1Iterator or mixed) @@ -154,6 +175,12 @@ class VCollectIterator { Status next(Block* block) override; + RowsetSegmentRowId current_rowset_segment_rowid() override { + return _cur_child->current_rowset_segment_rowid(); + } + + Status current_block_row_locations(std::vector* block_row_locations) override; + ~Level1Iterator(); private: @@ -186,6 +213,8 @@ class VCollectIterator { // batch size, get from TabletReader int _batch_size; + + std::vector _block_rowset_segment_rowids; }; std::unique_ptr _inner_iter; diff --git a/be/src/vec/olap/vgeneric_iterators.cpp b/be/src/vec/olap/vgeneric_iterators.cpp index b831edde07625fb..e0ae0f4144c7322 100644 --- a/be/src/vec/olap/vgeneric_iterators.cpp +++ b/be/src/vec/olap/vgeneric_iterators.cpp @@ -196,6 +196,11 @@ class VMergeIteratorContext { } } + SegmentRowId get_current_segment_rowid() { + DCHECK(_record_rowids); + return _block_segment_rowids[_index_in_block]; + } + // Advance internal row index to next valid row // Return error if error happens // Don't call this when valid() is false, action is undefined @@ -229,10 +234,13 @@ class VMergeIteratorContext { int _block_row_max = 4096; int _num_columns; int _num_key_columns; + std::vector _block_segment_rowids; + bool _record_rowids = false; }; Status VMergeIteratorContext::init(const StorageReadOptions& opts) { _block_row_max = opts.block_row_max; + _record_rowids = opts.record_rowids; RETURN_IF_ERROR(_iter->init(opts)); RETURN_IF_ERROR(block_reset()); RETURN_IF_ERROR(_load_next_block()); @@ -268,6 +276,12 @@ Status VMergeIteratorContext::_load_next_block() { return st; } } + if (UNLIKELY(_record_rowids)) { + st = _iter->current_batch_segment_rowids(&_block_segment_rowids); + if (!st.ok()) { + return st; + } + } } while (_block.rows() == 0); _index_in_block = -1; _valid = true; @@ -298,6 +312,12 @@ class VMergeIterator : public RowwiseIterator { const Schema& schema() const override { return *_schema; } + Status current_batch_segment_rowids(std::vector* block_segment_rowids) override { + DCHECK(_record_rowids); + *block_segment_rowids = _block_segment_rowids; + return Status::OK(); + } + private: // It will be released after '_merge_heap' has been built. std::vector _origin_iters; @@ -320,6 +340,8 @@ class VMergeIterator : public RowwiseIterator { int _sequence_id_idx = -1; bool _is_unique = false; uint64_t* _merged_rows = nullptr; + bool _record_rowids = false; + std::vector _block_segment_rowids; }; Status VMergeIterator::init(const StorageReadOptions& opts) { @@ -327,6 +349,7 @@ Status VMergeIterator::init(const StorageReadOptions& opts) { return Status::OK(); } _schema = &(*_origin_iters.begin())->schema(); + _record_rowids = opts.record_rowids; for (auto iter : _origin_iters) { auto ctx = std::make_unique(iter, _sequence_id_idx, _is_unique); @@ -345,6 +368,10 @@ Status VMergeIterator::init(const StorageReadOptions& opts) { } Status VMergeIterator::next_batch(vectorized::Block* block) { + if (UNLIKELY(_record_rowids)) { + _block_segment_rowids.resize(block_row_max); + } + size_t row_idx = 0; while (block->rows() < block_row_max) { if (_merge_heap.empty()) break; @@ -356,6 +383,9 @@ Status VMergeIterator::next_batch(vectorized::Block* block) { ctx->copy_row(block); } else if (_merged_rows != nullptr) { (*_merged_rows)++; + if (UNLIKELY(_record_rowids)) { + _block_segment_rowids[row_idx] = ctx->get_current_segment_rowid(); + } } RETURN_IF_ERROR(ctx->advance()); @@ -370,6 +400,11 @@ Status VMergeIterator::next_batch(vectorized::Block* block) { return Status::OK(); } // Still last batch needs to be processed + + if (UNLIKELY(_record_rowids)) { + _block_segment_rowids.resize(row_idx); + } + return Status::EndOfFile("no more data in segment"); } @@ -392,6 +427,14 @@ class VUnionIterator : public RowwiseIterator { const Schema& schema() const override { return *_schema; } + Status current_batch_segment_rowids(std::vector* block_segment_rowids) override { + if (!_cur_iter) { + block_segment_rowids->clear(); + return Status::EndOfFile("End of VUnionIterator"); + } + return _cur_iter->current_batch_segment_rowids(block_segment_rowids); + } + private: const Schema* _schema = nullptr; RowwiseIterator* _cur_iter = nullptr; diff --git a/be/test/CMakeLists.txt b/be/test/CMakeLists.txt index cdc39b74366281c..036ac322b9b3172 100644 --- a/be/test/CMakeLists.txt +++ b/be/test/CMakeLists.txt @@ -195,6 +195,7 @@ set(OLAP_TEST_FILES # olap/push_handler_test.cpp olap/tablet_cooldown_test.cpp olap/tablet_clone_test.cpp + olap/rowid_conversion_test.cpp ) set(RUNTIME_TEST_FILES diff --git a/be/test/olap/rowid_conversion_test.cpp b/be/test/olap/rowid_conversion_test.cpp new file mode 100644 index 000000000000000..665a9b7db8a4e0a --- /dev/null +++ b/be/test/olap/rowid_conversion_test.cpp @@ -0,0 +1,89 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#include "olap/rowid_conversion.h" + +#include + +#include + +#include "common/logging.h" + +namespace doris { + +class TestRowIdConversion : public testing::Test { +public: + TestRowIdConversion() {} + virtual ~TestRowIdConversion() {} +}; + +TEST_F(TestRowIdConversion, TestConversion) { + // rowset_id, segment_id, row_id + int input_data[11][3] = {{0, 0, 0}, {0, 0, 1}, {0, 0, 2}, {0, 0, 3}, {0, 1, 0}, {0, 1, 1}, + {0, 1, 2}, {1, 0, 0}, {1, 0, 1}, {1, 0, 2}, {1, 0, 3}}; + + RowsetId src_rowset; + RowsetId dst_rowset; + dst_rowset.init(3); + + std::vector rss_row_ids; + for (auto i = 0; i < 11; i++) { + src_rowset.init(input_data[i][0]); + RowsetSegmentRowId rss_row_id(src_rowset, input_data[i][1], input_data[i][2]); + rss_row_ids.push_back(rss_row_id); + } + RowIdConversion rowid_conversion; + src_rowset.init(0); + std::vector rs0_segment_num_rows = {4, 3}; + rowid_conversion.init_segment_map(src_rowset, rs0_segment_num_rows); + src_rowset.init(1); + std::vector rs1_segment_num_rows = {4}; + rowid_conversion.init_segment_map(src_rowset, rs1_segment_num_rows); + + rowid_conversion.add(rss_row_ids); + std::vector dst_segment_num_rows = {4, 3, 4}; + rowid_conversion.set_dst_segment_num_rows(dst_rowset, dst_segment_num_rows); + + src_rowset.init(0); + RowsetSegmentRowId src0(src_rowset, 0, 0); + RowsetSegmentRowId dst0; + rowid_conversion.get(src0, &dst0); + + EXPECT_EQ(dst0.rowset_id, dst_rowset); + EXPECT_EQ(dst0.segment_id, 0); + EXPECT_EQ(dst0.row_id, 0); + + src_rowset.init(0); + RowsetSegmentRowId src1(src_rowset, 1, 2); + RowsetSegmentRowId dst1; + rowid_conversion.get(src1, &dst1); + + EXPECT_EQ(dst1.rowset_id, dst_rowset); + EXPECT_EQ(dst1.segment_id, 1); + EXPECT_EQ(dst1.row_id, 2); + + src_rowset.init(1); + RowsetSegmentRowId src2(src_rowset, 0, 3); + RowsetSegmentRowId dst2; + rowid_conversion.get(src2, &dst2); + + EXPECT_EQ(dst2.rowset_id, dst_rowset); + EXPECT_EQ(dst2.segment_id, 2); + EXPECT_EQ(dst2.row_id, 3); +} + +} // namespace doris