diff --git a/be/src/olap/parallel_scanner_builder.cpp b/be/src/olap/parallel_scanner_builder.cpp index 88c69ab5c9a584..769abe4a946913 100644 --- a/be/src/olap/parallel_scanner_builder.cpp +++ b/be/src/olap/parallel_scanner_builder.cpp @@ -70,7 +70,7 @@ Status ParallelScannerBuilder::_build_scanners_by_rowid(std::list& continue; } - int segment_start = 0; + int64_t segment_start = 0; auto split = RowSetSplits(reader->clone()); for (size_t i = 0; i != segments_rows.size(); ++i) { @@ -171,22 +171,18 @@ Status ParallelScannerBuilder::_load() { if (!_state->skip_delete_predicate()) { read_source.fill_delete_predicates(); } - bool enable_segment_cache = _state->query_options().__isset.enable_segment_cache - ? _state->query_options().enable_segment_cache - : true; for (auto& rs_split : read_source.rs_splits) { auto rowset = rs_split.rs_reader->rowset(); RETURN_IF_ERROR(rowset->load()); const auto rowset_id = rowset->rowset_id(); - SegmentCacheHandle segment_cache_handle; - RETURN_IF_ERROR(SegmentLoader::instance()->load_segments( - std::dynamic_pointer_cast(rowset), &segment_cache_handle, - enable_segment_cache, false)); - - for (const auto& segment : segment_cache_handle.get_segments()) { - _all_segments_rows[rowset_id].emplace_back(segment->num_rows()); + auto beta_rowset = std::dynamic_pointer_cast(rowset); + std::vector segment_rows; + RETURN_IF_ERROR(beta_rowset->get_segment_num_rows(&segment_rows)); + auto segment_count = rowset->num_segments(); + for (int64_t i = 0; i != segment_count; i++) { + _all_segments_rows[rowset_id].emplace_back(segment_rows[i]); } _total_rows += rowset->num_rows(); } diff --git a/be/src/olap/rowset/beta_rowset.cpp b/be/src/olap/rowset/beta_rowset.cpp index a328b1b9e8b90e..59019f93c6e7ff 100644 --- a/be/src/olap/rowset/beta_rowset.cpp +++ b/be/src/olap/rowset/beta_rowset.cpp @@ -42,6 +42,7 @@ #include "olap/rowset/segment_v2/inverted_index_cache.h" #include "olap/rowset/segment_v2/inverted_index_desc.h" #include "olap/rowset/segment_v2/inverted_index_file_reader.h" +#include "olap/segment_loader.h" #include "olap/tablet_schema.h" #include "olap/utils.h" #include "util/crc32c.h" @@ -68,9 +69,23 @@ Status BetaRowset::init() { return Status::OK(); // no op } -Status BetaRowset::do_load(bool /*use_cache*/) { - // do nothing. - // the segments in this rowset will be loaded by calling load_segments() explicitly. +Status BetaRowset::get_segment_num_rows(std::vector* segment_rows) { + DCHECK(_rowset_state_machine.rowset_state() == ROWSET_LOADED); + + RETURN_IF_ERROR(_load_segment_rows_once.call([this] { + auto segment_count = num_segments(); + _segments_rows.resize(segment_count); + for (int64_t i = 0; i != segment_count; ++i) { + SegmentCacheHandle segment_cache_handle; + RETURN_IF_ERROR(SegmentLoader::instance()->load_segment( + std::static_pointer_cast(shared_from_this()), i, + &segment_cache_handle, false, false)); + const auto& tmp_segments = segment_cache_handle.get_segments(); + _segments_rows[i] = tmp_segments[0]->num_rows(); + } + return Status::OK(); + })); + segment_rows->assign(_segments_rows.cbegin(), _segments_rows.cend()); return Status::OK(); } diff --git a/be/src/olap/rowset/beta_rowset.h b/be/src/olap/rowset/beta_rowset.h index 52d5ac5c8a8742..c9ed568186a8e9 100644 --- a/be/src/olap/rowset/beta_rowset.h +++ b/be/src/olap/rowset/beta_rowset.h @@ -89,6 +89,8 @@ class BetaRowset final : public Rowset { Status show_nested_index_file(rapidjson::Value* rowset_value, rapidjson::Document::AllocatorType& allocator); + Status get_segment_num_rows(std::vector* segment_rows); + protected: BetaRowset(const TabletSchemaSPtr& schema, const RowsetMetaSharedPtr& rowset_meta, std::string tablet_path); @@ -96,8 +98,6 @@ class BetaRowset final : public Rowset { // init segment groups Status init() override; - Status do_load(bool use_cache) override; - void do_close() override; Status check_current_rowset_segment() override; @@ -107,6 +107,9 @@ class BetaRowset final : public Rowset { private: friend class RowsetFactory; friend class BetaRowsetReader; + + DorisCallOnce _load_segment_rows_once; + std::vector _segments_rows; }; } // namespace doris diff --git a/be/src/olap/rowset/beta_rowset_reader.cpp b/be/src/olap/rowset/beta_rowset_reader.cpp index 9a4d71587a02c1..822916819fed13 100644 --- a/be/src/olap/rowset/beta_rowset_reader.cpp +++ b/be/src/olap/rowset/beta_rowset_reader.cpp @@ -215,7 +215,6 @@ Status BetaRowsetReader::get_segment_iterators(RowsetReaderContext* read_context _read_options.io_ctx.expiration_time = 0; } - // load segments bool enable_segment_cache = true; auto* state = read_context->runtime_state; if (state != nullptr) { @@ -226,76 +225,41 @@ Status BetaRowsetReader::get_segment_iterators(RowsetReaderContext* read_context // When reader type is for query, session variable `enable_segment_cache` should be respected. bool should_use_cache = use_cache || (_read_context->reader_type == ReaderType::READER_QUERY && enable_segment_cache); - SegmentCacheHandle segment_cache_handle; - { - SCOPED_RAW_TIMER(&_stats->rowset_reader_load_segments_timer_ns); - RETURN_IF_ERROR(SegmentLoader::instance()->load_segments( - _rowset, &segment_cache_handle, should_use_cache, - /*need_load_pk_index_and_bf*/ false)); - } - - // create iterator for each segment - auto& segments = segment_cache_handle.get_segments(); - _segments_rows.resize(segments.size()); - for (size_t i = 0; i < segments.size(); i++) { - _segments_rows[i] = segments[i]->num_rows(); - } - if (_read_context->record_rowids) { - // init segment rowid map for rowid conversion - std::vector segment_num_rows; - RETURN_IF_ERROR(get_segment_num_rows(&segment_num_rows)); - RETURN_IF_ERROR(_read_context->rowid_conversion->init_segment_map(rowset()->rowset_id(), - segment_num_rows)); - } + auto segment_count = _rowset->num_segments(); auto [seg_start, seg_end] = _segment_offsets; + // If seg_start == seg_end, it means that the segments of a rowset is not + // split scanned by multiple scanners, and the rowset reader is used to read the whole rowset. if (seg_start == seg_end) { seg_start = 0; - seg_end = segments.size(); + seg_end = segment_count; + } + if (_read_context->record_rowids && _read_context->rowid_conversion) { + // init segment rowid map for rowid conversion + std::vector segment_rows; + RETURN_IF_ERROR(_rowset->get_segment_num_rows(&segment_rows)); + RETURN_IF_ERROR(_read_context->rowid_conversion->init_segment_map(rowset()->rowset_id(), + segment_rows)); } - const bool is_merge_iterator = _is_merge_iterator(); - const bool use_lazy_init_iterators = - !is_merge_iterator && _read_context->reader_type == ReaderType::READER_QUERY; - for (int i = seg_start; i < seg_end; i++) { + for (int64_t i = seg_start; i < seg_end; i++) { SCOPED_RAW_TIMER(&_stats->rowset_reader_create_iterators_timer_ns); - auto& seg_ptr = segments[i]; std::unique_ptr iter; - if (use_lazy_init_iterators) { - /// For non-merging iterators, we don't need to initialize them all at once when creating them. - /// Instead, we should initialize each iterator separately when really using them. - /// This optimization minimizes the lifecycle of resources like column readers - /// and prevents excessive memory consumption, especially for wide tables. - if (_segment_row_ranges.empty()) { - _read_options.row_ranges.clear(); - iter = std::make_unique(seg_ptr, _input_schema, - _read_options); - } else { - DCHECK_EQ(seg_end - seg_start, _segment_row_ranges.size()); - auto local_options = _read_options; - local_options.row_ranges = _segment_row_ranges[i - seg_start]; - iter = std::make_unique(seg_ptr, _input_schema, - local_options); - } + /// For iterators, we don't need to initialize them all at once when creating them. + /// Instead, we should initialize each iterator separately when really using them. + /// This optimization minimizes the lifecycle of resources like column readers + /// and prevents excessive memory consumption, especially for wide tables. + if (_segment_row_ranges.empty()) { + _read_options.row_ranges.clear(); + iter = std::make_unique(_rowset, i, should_use_cache, + _input_schema, _read_options); } else { - Status status; - /// If `_segment_row_ranges` is empty, the segment is not split. - if (_segment_row_ranges.empty()) { - _read_options.row_ranges.clear(); - status = seg_ptr->new_iterator(_input_schema, _read_options, &iter); - } else { - DCHECK_EQ(seg_end - seg_start, _segment_row_ranges.size()); - auto local_options = _read_options; - local_options.row_ranges = _segment_row_ranges[i - seg_start]; - status = seg_ptr->new_iterator(_input_schema, local_options, &iter); - } - - if (!status.ok()) { - LOG(WARNING) << "failed to create iterator[" << seg_ptr->id() - << "]: " << status.to_string(); - return Status::Error(status.to_string()); - } + DCHECK_EQ(seg_end - seg_start, _segment_row_ranges.size()); + auto local_options = _read_options; + local_options.row_ranges = _segment_row_ranges[i - seg_start]; + iter = std::make_unique(_rowset, i, should_use_cache, + _input_schema, local_options); } if (iter->empty()) { @@ -423,10 +387,4 @@ bool BetaRowsetReader::_should_push_down_value_predicates() const { _read_context->sequence_id_idx == -1) || _read_context->enable_unique_key_merge_on_write); } - -Status BetaRowsetReader::get_segment_num_rows(std::vector* segment_num_rows) { - segment_num_rows->assign(_segments_rows.cbegin(), _segments_rows.cend()); - return Status::OK(); -} - } // namespace doris diff --git a/be/src/olap/rowset/beta_rowset_reader.h b/be/src/olap/rowset/beta_rowset_reader.h index 33b2fb6a58c08b..b191480f7c7d04 100644 --- a/be/src/olap/rowset/beta_rowset_reader.h +++ b/be/src/olap/rowset/beta_rowset_reader.h @@ -80,8 +80,6 @@ class BetaRowsetReader : public RowsetReader { return _iterator->current_block_row_locations(locations); } - Status get_segment_num_rows(std::vector* segment_num_rows) override; - bool update_profile(RuntimeProfile* profile) override; RowsetReaderSharedPtr clone() override; @@ -97,7 +95,7 @@ class BetaRowsetReader : public RowsetReader { _rowset->rowset_meta()->is_segments_overlapping() && _get_segment_num() > 1; } - int32_t _get_segment_num() const { + int64_t _get_segment_num() const { auto [seg_start, seg_end] = _segment_offsets; if (seg_start == seg_end) { seg_start = 0; @@ -108,7 +106,7 @@ class BetaRowsetReader : public RowsetReader { DorisCallOnce _init_iter_once; - std::pair _segment_offsets; + std::pair _segment_offsets; std::vector _segment_row_ranges; SchemaSPtr _input_schema; @@ -120,8 +118,6 @@ class BetaRowsetReader : public RowsetReader { std::unique_ptr _iterator; - std::vector _segments_rows; - StorageReadOptions _read_options; bool _empty = false; diff --git a/be/src/olap/rowset/rowset.cpp b/be/src/olap/rowset/rowset.cpp index ac3a2a7a1dc5c2..0fd8e60f7cefdf 100644 --- a/be/src/olap/rowset/rowset.cpp +++ b/be/src/olap/rowset/rowset.cpp @@ -67,8 +67,6 @@ Status Rowset::load(bool use_cache) { std::lock_guard load_lock(_lock); // after lock, if rowset state is ROWSET_UNLOADING, it is ok to return if (_rowset_state_machine.rowset_state() == ROWSET_UNLOADED) { - // first do load, then change the state - RETURN_IF_ERROR(do_load(use_cache)); RETURN_IF_ERROR(_rowset_state_machine.on_load()); } } diff --git a/be/src/olap/rowset/rowset.h b/be/src/olap/rowset/rowset.h index 98d88ba19f2068..047a61a39ea22e 100644 --- a/be/src/olap/rowset/rowset.h +++ b/be/src/olap/rowset/rowset.h @@ -321,9 +321,6 @@ class Rowset : public std::enable_shared_from_this, public MetadataAdder // this is non-public because all clients should use RowsetFactory to obtain pointer to initialized Rowset virtual Status init() = 0; - // The actual implementation of load(). Guaranteed by to called exactly once. - virtual Status do_load(bool use_cache) = 0; - // release resources in this api virtual void do_close() = 0; diff --git a/be/src/olap/rowset/rowset_reader.h b/be/src/olap/rowset/rowset_reader.h index 58c0f592b9c545..6c637f47cc17e6 100644 --- a/be/src/olap/rowset/rowset_reader.h +++ b/be/src/olap/rowset/rowset_reader.h @@ -40,7 +40,7 @@ struct RowSetSplits { // if segment_offsets is not empty, means we only scan // [pair.first, pair.second) segment in rs_reader, only effective in dup key // and pipeline - std::pair segment_offsets; + std::pair segment_offsets; // RowRanges of each segment. std::vector segment_row_ranges; @@ -83,10 +83,6 @@ class RowsetReader { return Status::NotSupported("to be implemented"); } - virtual Status get_segment_num_rows(std::vector* segment_num_rows) { - return Status::NotSupported("to be implemented"); - } - virtual bool update_profile(RuntimeProfile* profile) = 0; virtual RowsetReaderSharedPtr clone() = 0; diff --git a/be/src/olap/rowset/rowset_reader_context.h b/be/src/olap/rowset/rowset_reader_context.h index fd4fe7a18234f1..2dd7132890247d 100644 --- a/be/src/olap/rowset/rowset_reader_context.h +++ b/be/src/olap/rowset/rowset_reader_context.h @@ -76,7 +76,7 @@ struct RowsetReaderContext { bool enable_unique_key_merge_on_write = false; const DeleteBitmap* delete_bitmap = nullptr; bool record_rowids = false; - RowIdConversion* rowid_conversion; + RowIdConversion* rowid_conversion = nullptr; bool is_key_column_group = false; const std::set* output_columns = nullptr; RowsetId rowset_id; diff --git a/be/src/olap/rowset/segment_v2/lazy_init_segment_iterator.cpp b/be/src/olap/rowset/segment_v2/lazy_init_segment_iterator.cpp index d70df5a7baeae9..77e2310fc48cc1 100644 --- a/be/src/olap/rowset/segment_v2/lazy_init_segment_iterator.cpp +++ b/be/src/olap/rowset/segment_v2/lazy_init_segment_iterator.cpp @@ -17,11 +17,18 @@ #include "olap/rowset/segment_v2/lazy_init_segment_iterator.h" +#include "olap/segment_loader.h" + namespace doris::segment_v2 { -LazyInitSegmentIterator::LazyInitSegmentIterator(std::shared_ptr segment, - SchemaSPtr schema, const StorageReadOptions& opts) - : _schema(std::move(schema)), _segment(std::move(segment)), _read_options(opts) {} +LazyInitSegmentIterator::LazyInitSegmentIterator(BetaRowsetSharedPtr rowset, int64_t segment_id, + bool should_use_cache, SchemaSPtr schema, + const StorageReadOptions& opts) + : _rowset(std::move(rowset)), + _segment_id(segment_id), + _should_use_cache(should_use_cache), + _schema(std::move(schema)), + _read_options(opts) {} /// Here do not use the argument of `opts`, /// see where the iterator is created in `BetaRowsetReader::get_segment_iterators` @@ -31,7 +38,15 @@ Status LazyInitSegmentIterator::init(const StorageReadOptions& /*opts*/) { return Status::OK(); } - RETURN_IF_ERROR(_segment->new_iterator(_schema, _read_options, &_inner_iterator)); + std::shared_ptr segment; + { + SegmentCacheHandle segment_cache_handle; + RETURN_IF_ERROR(SegmentLoader::instance()->load_segment( + _rowset, _segment_id, &segment_cache_handle, _should_use_cache, false)); + const auto& tmp_segments = segment_cache_handle.get_segments(); + segment = tmp_segments[0]; + } + RETURN_IF_ERROR(segment->new_iterator(_schema, _read_options, &_inner_iterator)); return _inner_iterator->init(_read_options); } diff --git a/be/src/olap/rowset/segment_v2/lazy_init_segment_iterator.h b/be/src/olap/rowset/segment_v2/lazy_init_segment_iterator.h index 923c540c456999..c31918d092c0d1 100644 --- a/be/src/olap/rowset/segment_v2/lazy_init_segment_iterator.h +++ b/be/src/olap/rowset/segment_v2/lazy_init_segment_iterator.h @@ -22,14 +22,18 @@ #include "olap/rowset/segment_v2/segment_iterator.h" #include "vec/core/block.h" +namespace doris { +class BetaRowset; +using BetaRowsetSharedPtr = std::shared_ptr; +}; // namespace doris namespace doris::segment_v2 { using namespace vectorized; class LazyInitSegmentIterator : public RowwiseIterator { public: - LazyInitSegmentIterator(std::shared_ptr segment, SchemaSPtr schema, - const StorageReadOptions& opts); + LazyInitSegmentIterator(BetaRowsetSharedPtr rowset, int64_t segment_id, bool should_use_cache, + SchemaSPtr schema, const StorageReadOptions& opts); ~LazyInitSegmentIterator() override = default; @@ -59,8 +63,10 @@ class LazyInitSegmentIterator : public RowwiseIterator { private: bool _need_lazy_init {true}; + BetaRowsetSharedPtr _rowset; + int64_t _segment_id {-1}; + bool _should_use_cache {false}; SchemaSPtr _schema = nullptr; - std::shared_ptr _segment; StorageReadOptions _read_options; RowwiseIteratorUPtr _inner_iterator; }; diff --git a/be/src/olap/segment_loader.cpp b/be/src/olap/segment_loader.cpp index 4240f7e250a06b..1bd21ad4e553c0 100644 --- a/be/src/olap/segment_loader.cpp +++ b/be/src/olap/segment_loader.cpp @@ -52,6 +52,37 @@ void SegmentCache::erase(const SegmentCache::CacheKey& key) { LRUCachePolicy::erase(key.encode()); } +Status SegmentLoader::load_segment(const BetaRowsetSharedPtr& rowset, int64_t segment_id, + SegmentCacheHandle* cache_handle, bool use_cache, + bool need_load_pk_index_and_bf, + OlapReaderStatistics* index_load_stats) { + SegmentCache::CacheKey cache_key(rowset->rowset_id(), segment_id); + if (_segment_cache->lookup(cache_key, cache_handle)) { + // Has to check the segment status here, because the segment in cache may has something wrong during + // load index or create column reader. + // Not merge this if logic with previous to make the logic more clear. + if (cache_handle->pop_unhealthy_segment() == nullptr) { + return Status::OK(); + } + } + // If the segment is not healthy, then will create a new segment and will replace the unhealthy one in SegmentCache. + segment_v2::SegmentSharedPtr segment; + RETURN_IF_ERROR(rowset->load_segment(segment_id, &segment)); + if (need_load_pk_index_and_bf) { + RETURN_IF_ERROR(segment->load_pk_index_and_bf(index_load_stats)); + } + if (use_cache && !config::disable_segment_cache) { + // memory of SegmentCache::CacheValue will be handled by SegmentCache + auto* cache_value = new SegmentCache::CacheValue(segment); + _cache_mem_usage += segment->meta_mem_usage(); + _segment_cache->insert(cache_key, *cache_value, cache_handle); + } else { + cache_handle->push_segment(std::move(segment)); + } + + return Status::OK(); +} + Status SegmentLoader::load_segments(const BetaRowsetSharedPtr& rowset, SegmentCacheHandle* cache_handle, bool use_cache, bool need_load_pk_index_and_bf, @@ -60,29 +91,8 @@ Status SegmentLoader::load_segments(const BetaRowsetSharedPtr& rowset, return Status::OK(); } for (int64_t i = 0; i < rowset->num_segments(); i++) { - SegmentCache::CacheKey cache_key(rowset->rowset_id(), i); - if (_segment_cache->lookup(cache_key, cache_handle)) { - // Has to check the segment status here, because the segment in cache may has something wrong during - // load index or create column reader. - // Not merge this if logic with previous to make the logic more clear. - if (cache_handle->pop_unhealthy_segment() == nullptr) { - continue; - } - } - // If the segment is not healthy, then will create a new segment and will replace the unhealthy one in SegmentCache. - segment_v2::SegmentSharedPtr segment; - RETURN_IF_ERROR(rowset->load_segment(i, &segment)); - if (need_load_pk_index_and_bf) { - RETURN_IF_ERROR(segment->load_pk_index_and_bf(index_load_stats)); - } - if (use_cache && !config::disable_segment_cache) { - // memory of SegmentCache::CacheValue will be handled by SegmentCache - auto* cache_value = new SegmentCache::CacheValue(segment); - _cache_mem_usage += segment->meta_mem_usage(); - _segment_cache->insert(cache_key, *cache_value, cache_handle); - } else { - cache_handle->push_segment(std::move(segment)); - } + RETURN_IF_ERROR(load_segment(rowset, i, cache_handle, use_cache, need_load_pk_index_and_bf, + index_load_stats)); } cache_handle->set_inited(); return Status::OK(); diff --git a/be/src/olap/segment_loader.h b/be/src/olap/segment_loader.h index 2c5b1ed200dde7..d1813f434cb278 100644 --- a/be/src/olap/segment_loader.h +++ b/be/src/olap/segment_loader.h @@ -120,6 +120,13 @@ class SegmentLoader { bool use_cache = false, bool need_load_pk_index_and_bf = false, OlapReaderStatistics* index_load_stats = nullptr); + // Load one segment of "rowset", return the "cache_handle" which contains segments. + // If use_cache is true, it will be loaded from _cache. + Status load_segment(const BetaRowsetSharedPtr& rowset, int64_t segment_id, + SegmentCacheHandle* cache_handle, bool use_cache = false, + bool need_load_pk_index_and_bf = false, + OlapReaderStatistics* index_load_stats = nullptr); + void erase_segment(const SegmentCache::CacheKey& key); void erase_segments(const RowsetId& rowset_id, int64_t num_segments); diff --git a/be/test/olap/ordered_data_compaction_test.cpp b/be/test/olap/ordered_data_compaction_test.cpp index 058ed52dd995dc..831e3031378e4e 100644 --- a/be/test/olap/ordered_data_compaction_test.cpp +++ b/be/test/olap/ordered_data_compaction_test.cpp @@ -464,8 +464,6 @@ TEST_F(OrderedDataCompactionTest, test_01) { EXPECT_EQ(Status::Error(""), s); EXPECT_EQ(out_rowset->rowset_meta()->num_rows(), output_data.size()); EXPECT_EQ(output_data.size(), num_input_rowset * num_segments * rows_per_segment); - std::vector segment_num_rows; - EXPECT_TRUE(output_rs_reader->get_segment_num_rows(&segment_num_rows).ok()); // check vertical compaction result for (auto id = 0; id < output_data.size(); id++) { LOG(INFO) << "output data: " << std::get<0>(output_data[id]) << " " diff --git a/be/test/olap/rowid_conversion_test.cpp b/be/test/olap/rowid_conversion_test.cpp index df56cd0559e4d8..52d81f2100aba4 100644 --- a/be/test/olap/rowid_conversion_test.cpp +++ b/be/test/olap/rowid_conversion_test.cpp @@ -382,8 +382,9 @@ class TestRowIdConversion : public testing::TestWithParam()) << s; EXPECT_EQ(out_rowset->rowset_meta()->num_rows(), output_data.size()); + auto beta_rowset = std::dynamic_pointer_cast(out_rowset); std::vector segment_num_rows; - EXPECT_TRUE(output_rs_reader->get_segment_num_rows(&segment_num_rows).ok()); + EXPECT_TRUE(beta_rowset->get_segment_num_rows(&segment_num_rows).ok()); if (has_delete_handler) { // All keys less than 1000 are deleted by delete handler for (auto& item : output_data) { diff --git a/be/test/olap/segcompaction_mow_test.cpp b/be/test/olap/segcompaction_mow_test.cpp index 62a3232889dede..5463de03f2bf19 100644 --- a/be/test/olap/segcompaction_mow_test.cpp +++ b/be/test/olap/segcompaction_mow_test.cpp @@ -239,7 +239,6 @@ class SegCompactionMoWTest : public ::testing::TestWithParam { reader_context.stats = &_stats; reader_context.delete_bitmap = delete_bitmap.get(); - std::vector segment_num_rows; Status s; // without predicates @@ -280,7 +279,9 @@ class SegCompactionMoWTest : public ::testing::TestWithParam { EXPECT_EQ(Status::Error(""), s); EXPECT_EQ(rowset->rowset_meta()->num_rows(), expect_total_rows); EXPECT_EQ(num_rows_read, expect_total_rows - rows_mark_deleted); - EXPECT_TRUE(rowset_reader->get_segment_num_rows(&segment_num_rows).ok()); + auto beta_rowset = std::dynamic_pointer_cast(rowset); + std::vector segment_num_rows; + EXPECT_TRUE(beta_rowset->get_segment_num_rows(&segment_num_rows).ok()); size_t total_num_rows = 0; for (const auto& i : segment_num_rows) { total_num_rows += i; @@ -307,7 +308,6 @@ TEST_P(SegCompactionMoWTest, SegCompactionThenRead) { config::segcompaction_candidate_max_rows = 6000; // set threshold above // rows_per_segment config::segcompaction_batch_size = 10; - std::vector segment_num_rows; DeleteBitmapPtr delete_bitmap = std::make_shared(TABLET_ID); uint32_t rows_mark_deleted = 0; { // write `num_segments * rows_per_segment` rows to rowset @@ -413,7 +413,6 @@ TEST_F(SegCompactionMoWTest, SegCompactionInterleaveWithBig_ooooOOoOooooooooO) { DeleteBitmapPtr delete_bitmap = std::make_shared(TABLET_ID); uint32_t rows_mark_deleted = 0; uint32_t total_written_rows = 0; - std::vector segment_num_rows; { // write `num_segments * rows_per_segment` rows to rowset RowsetWriterContext writer_context; create_rowset_writer_context(20048, tablet_schema, &writer_context); @@ -641,7 +640,6 @@ TEST_F(SegCompactionMoWTest, SegCompactionInterleaveWithBig_OoOoO) { RowsetSharedPtr rowset; config::segcompaction_candidate_max_rows = 6000; // set threshold above config::segcompaction_batch_size = 5; - std::vector segment_num_rows; DeleteBitmapPtr delete_bitmap = std::make_shared(TABLET_ID); uint32_t rows_mark_deleted = 0; uint32_t total_written_rows = 0; @@ -832,7 +830,6 @@ TEST_F(SegCompactionMoWTest, SegCompactionNotTrigger) { config::segcompaction_candidate_max_rows = 6000; // set threshold above // rows_per_segment config::segcompaction_batch_size = 10; - std::vector segment_num_rows; DeleteBitmapPtr delete_bitmap = std::make_shared(TABLET_ID); uint32_t rows_mark_deleted = 0; { // write `num_segments * rows_per_segment` rows to rowset diff --git a/be/test/olap/segcompaction_test.cpp b/be/test/olap/segcompaction_test.cpp index 4029467dd42fbb..4a1dfc63fd9b35 100644 --- a/be/test/olap/segcompaction_test.cpp +++ b/be/test/olap/segcompaction_test.cpp @@ -291,7 +291,6 @@ TEST_F(SegCompactionTest, SegCompactionThenRead) { config::segcompaction_candidate_max_rows = 6000; // set threshold above // rows_per_segment config::segcompaction_batch_size = 10; - std::vector segment_num_rows; { // write `num_segments * rows_per_segment` rows to rowset RowsetWriterContext writer_context; create_rowset_writer_context(10047, tablet_schema, &writer_context); @@ -387,7 +386,9 @@ TEST_F(SegCompactionTest, SegCompactionThenRead) { EXPECT_EQ(Status::Error(""), s); EXPECT_EQ(rowset->rowset_meta()->num_rows(), num_rows_read); EXPECT_EQ(num_rows_read, num_segments * rows_per_segment); - EXPECT_TRUE(rowset_reader->get_segment_num_rows(&segment_num_rows).ok()); + auto beta_rowset = std::dynamic_pointer_cast(rowset); + std::vector segment_num_rows; + EXPECT_TRUE(beta_rowset->get_segment_num_rows(&segment_num_rows).ok()); size_t total_num_rows = 0; for (const auto& i : segment_num_rows) { total_num_rows += i; @@ -406,7 +407,6 @@ TEST_F(SegCompactionTest, SegCompactionInterleaveWithBig_ooooOOoOooooooooO) { RowsetSharedPtr rowset; config::segcompaction_candidate_max_rows = 6000; // set threshold above // rows_per_segment - std::vector segment_num_rows; { // write `num_segments * rows_per_segment` rows to rowset RowsetWriterContext writer_context; create_rowset_writer_context(10048, tablet_schema, &writer_context); @@ -561,7 +561,6 @@ TEST_F(SegCompactionTest, SegCompactionInterleaveWithBig_OoOoO) { RowsetSharedPtr rowset; config::segcompaction_candidate_max_rows = 6000; // set threshold above config::segcompaction_batch_size = 5; - std::vector segment_num_rows; { // write `num_segments * rows_per_segment` rows to rowset RowsetWriterContext writer_context; create_rowset_writer_context(10049, tablet_schema, &writer_context); @@ -693,7 +692,6 @@ TEST_F(SegCompactionTest, SegCompactionThenReadUniqueTableSmall) { config::segcompaction_candidate_max_rows = 6000; // set threshold above // rows_per_segment config::segcompaction_batch_size = 3; - std::vector segment_num_rows; { // write `num_segments * rows_per_segment` rows to rowset RowsetWriterContext writer_context; create_rowset_writer_context(10051, tablet_schema, &writer_context); @@ -894,7 +892,9 @@ TEST_F(SegCompactionTest, SegCompactionThenReadUniqueTableSmall) { // duplicated keys between segments are counted duplicately // so actual read by rowset reader is less or equal to it EXPECT_GE(rowset->rowset_meta()->num_rows(), num_rows_read); - EXPECT_TRUE(rowset_reader->get_segment_num_rows(&segment_num_rows).ok()); + auto beta_rowset = std::dynamic_pointer_cast(rowset); + std::vector segment_num_rows; + EXPECT_TRUE(beta_rowset->get_segment_num_rows(&segment_num_rows).ok()); size_t total_num_rows = 0; for (const auto& i : segment_num_rows) { total_num_rows += i; @@ -926,7 +926,6 @@ TEST_F(SegCompactionTest, CreateSegCompactionWriter) { config::segcompaction_candidate_max_rows = 6000; // set threshold above // rows_per_segment config::segcompaction_batch_size = 3; - std::vector segment_num_rows; { RowsetWriterContext writer_context; create_rowset_writer_context(10052, tablet_schema, &writer_context); @@ -956,7 +955,6 @@ TEST_F(SegCompactionTest, SegCompactionThenReadAggTableSmall) { config::segcompaction_candidate_max_rows = 6000; // set threshold above // rows_per_segment config::segcompaction_batch_size = 3; - std::vector segment_num_rows; { // write `num_segments * rows_per_segment` rows to rowset RowsetWriterContext writer_context; create_rowset_writer_context(10052, tablet_schema, &writer_context); @@ -1159,7 +1157,9 @@ TEST_F(SegCompactionTest, SegCompactionThenReadAggTableSmall) { // duplicated keys between segments are counted duplicately // so actual read by rowset reader is less or equal to it EXPECT_GE(rowset->rowset_meta()->num_rows(), num_rows_read); - EXPECT_TRUE(rowset_reader->get_segment_num_rows(&segment_num_rows).ok()); + auto beta_rowset = std::dynamic_pointer_cast(rowset); + std::vector segment_num_rows; + EXPECT_TRUE(beta_rowset->get_segment_num_rows(&segment_num_rows).ok()); size_t total_num_rows = 0; for (const auto& i : segment_num_rows) { total_num_rows += i; diff --git a/be/test/testutil/mock_rowset.h b/be/test/testutil/mock_rowset.h index 36ef64fd9ec9fb..f1472b435b5ac2 100644 --- a/be/test/testutil/mock_rowset.h +++ b/be/test/testutil/mock_rowset.h @@ -73,10 +73,6 @@ class MockRowset : public Rowset { Status init() override { return Status::NotSupported("MockRowset not support this method."); } - Status do_load(bool use_cache) override { - return Status::NotSupported("MockRowset not support this method."); - } - void do_close() override { // Do nothing. } diff --git a/be/test/vec/olap/vertical_compaction_test.cpp b/be/test/vec/olap/vertical_compaction_test.cpp index 4c4409a75068c1..dd6f6932efcb26 100644 --- a/be/test/vec/olap/vertical_compaction_test.cpp +++ b/be/test/vec/olap/vertical_compaction_test.cpp @@ -521,8 +521,6 @@ TEST_F(VerticalCompactionTest, TestDupKeyVerticalMerge) { EXPECT_EQ(Status::Error(""), s); EXPECT_EQ(out_rowset->rowset_meta()->num_rows(), output_data.size()); EXPECT_EQ(output_data.size(), num_input_rowset * num_segments * rows_per_segment); - std::vector segment_num_rows; - EXPECT_TRUE(output_rs_reader->get_segment_num_rows(&segment_num_rows).ok()); // check vertical compaction result for (auto id = 0; id < output_data.size(); id++) { LOG(INFO) << "output data: " << std::get<0>(output_data[id]) << " " @@ -628,8 +626,6 @@ TEST_F(VerticalCompactionTest, TestDupWithoutKeyVerticalMerge) { EXPECT_EQ(Status::Error(""), s); EXPECT_EQ(out_rowset->rowset_meta()->num_rows(), output_data.size()); EXPECT_EQ(output_data.size(), num_input_rowset * num_segments * rows_per_segment); - std::vector segment_num_rows; - EXPECT_TRUE(output_rs_reader->get_segment_num_rows(&segment_num_rows).ok()); // check vertical compaction result for (auto id = 0; id < output_data.size(); id++) { LOG(INFO) << "output data: " << std::get<0>(output_data[id]) << " " @@ -736,8 +732,6 @@ TEST_F(VerticalCompactionTest, TestUniqueKeyVerticalMerge) { EXPECT_EQ(Status::Error(""), s); EXPECT_EQ(out_rowset->rowset_meta()->num_rows(), output_data.size()); EXPECT_EQ(output_data.size(), num_segments * rows_per_segment); - std::vector segment_num_rows; - EXPECT_TRUE(output_rs_reader->get_segment_num_rows(&segment_num_rows).ok()); // check vertical compaction result for (auto id = 0; id < output_data.size(); id++) { LOG(INFO) << "output data: " << std::get<0>(output_data[id]) << " " @@ -848,8 +842,6 @@ TEST_F(VerticalCompactionTest, TestDupKeyVerticalMergeWithDelete) { EXPECT_EQ(out_rowset->rowset_meta()->num_rows(), output_data.size()); EXPECT_EQ(output_data.size(), num_input_rowset * num_segments * rows_per_segment - num_input_rowset * 100); - std::vector segment_num_rows; - EXPECT_TRUE(output_rs_reader->get_segment_num_rows(&segment_num_rows).ok()); // All keys less than 1000 are deleted by delete handler for (auto& item : output_data) { ASSERT_GE(std::get<0>(item), 100); @@ -951,8 +943,6 @@ TEST_F(VerticalCompactionTest, TestDupWithoutKeyVerticalMergeWithDelete) { EXPECT_EQ(out_rowset->rowset_meta()->num_rows(), output_data.size()); EXPECT_EQ(output_data.size(), num_input_rowset * num_segments * rows_per_segment - num_input_rowset * 100); - std::vector segment_num_rows; - EXPECT_TRUE(output_rs_reader->get_segment_num_rows(&segment_num_rows).ok()); // All keys less than 1000 are deleted by delete handler for (auto& item : output_data) { ASSERT_GE(std::get<0>(item), 100); @@ -1042,8 +1032,6 @@ TEST_F(VerticalCompactionTest, TestAggKeyVerticalMerge) { EXPECT_EQ(Status::Error(""), s); EXPECT_EQ(out_rowset->rowset_meta()->num_rows(), output_data.size()); EXPECT_EQ(output_data.size(), num_segments * rows_per_segment); - std::vector segment_num_rows; - EXPECT_TRUE(output_rs_reader->get_segment_num_rows(&segment_num_rows).ok()); // check vertical compaction result for (auto id = 0; id < output_data.size(); id++) { LOG(INFO) << "output data: " << std::get<0>(output_data[id]) << " "