diff --git a/be/src/olap/collection_statistics.cpp b/be/src/olap/collection_statistics.cpp index 9f0c07abe603f2..714a19fe6b7798 100644 --- a/be/src/olap/collection_statistics.cpp +++ b/be/src/olap/collection_statistics.cpp @@ -17,6 +17,8 @@ #include "collection_statistics.h" +#include + #include "common/exception.h" #include "olap/rowset/rowset.h" #include "olap/rowset/rowset_reader.h" @@ -35,21 +37,22 @@ namespace doris { Status CollectionStatistics::collect( RuntimeState* state, const std::vector& rs_splits, const TabletSchemaSPtr& tablet_schema, - const vectorized::VExprContextSPtrs& common_expr_ctxs_push_down) { + const vectorized::VExprContextSPtrs& common_expr_ctxs_push_down, io::IOContext* io_ctx) { std::unordered_map collect_infos; RETURN_IF_ERROR( extract_collect_info(state, common_expr_ctxs_push_down, tablet_schema, &collect_infos)); + if (collect_infos.empty()) { + LOG(WARNING) << "Index statistics collection: no collect info extracted."; + return Status::OK(); + } for (const auto& rs_split : rs_splits) { const auto& rs_reader = rs_split.rs_reader; auto rowset = rs_reader->rowset(); - auto rowset_meta = rowset->rowset_meta(); - auto num_segments = rowset->num_segments(); for (int32_t seg_id = 0; seg_id < num_segments; ++seg_id) { - auto seg_path = DORIS_TRY(rowset->segment_path(seg_id)); - auto status = process_segment(seg_path, rowset_meta->fs(), tablet_schema.get(), - collect_infos); + auto status = + process_segment(rowset, seg_id, tablet_schema.get(), collect_infos, io_ctx); if (!status.ok()) { if (status.code() == ErrorCode::INVERTED_INDEX_FILE_NOT_FOUND || status.code() == ErrorCode::INVERTED_INDEX_BYPASS) { @@ -62,15 +65,29 @@ Status CollectionStatistics::collect( } #ifndef NDEBUG - LOG(INFO) << "term_num_docs: " << _total_num_docs; + std::stringstream ss; + ss << "term_num_docs: " << _total_num_docs; for (const auto& [ws_field_name, num_tokens] : _total_num_tokens) { - LOG(INFO) << "field_name: " << StringHelper::to_string(ws_field_name) - << ", num_tokens: " << num_tokens; - for (const auto& [term, doc_freq] : _term_doc_freqs.at(ws_field_name)) { - LOG(INFO) << "term: " << StringHelper::to_string(term) << ", doc_freq: " << doc_freq; + ss << ", [field_name: " << StringHelper::to_string(ws_field_name) + << ", num_tokens: " << num_tokens; + auto it = _term_doc_freqs.find(ws_field_name); + if (it != _term_doc_freqs.end()) { + ss << ", terms: {"; + bool first = true; + for (const auto& [term, doc_freq] : it->second) { + if (!first) { + ss << ", "; + } + ss << StringHelper::to_string(term) << ": " << doc_freq; + first = false; + } + ss << "}"; + } else { + ss << ", (no term stats)"; } + ss << "]"; } - LOG(INFO) << "--------------------------------"; + LOG(INFO) << "CollectionStatistics: " << ss.str(); #endif return Status::OK(); @@ -136,6 +153,11 @@ Status handle_match_pred(RuntimeState* state, const TabletSchemaSPtr& tablet_sch auto term_infos = InvertedIndexAnalyzer::get_analyse_result( right_literal->value(format_options), index_meta->properties()); + if (term_infos.empty()) { + LOG(WARNING) << "Index statistics collection: no terms extracted from literal value, " + << "col_unique_id=" << index_meta->col_unique_ids()[0]; + continue; + } std::string field_name = std::to_string(index_meta->col_unique_ids()[0]); if (!column.suffix_path().empty()) { @@ -188,18 +210,22 @@ Status CollectionStatistics::extract_collect_info( } Status CollectionStatistics::process_segment( - const std::string& seg_path, const io::FileSystemSPtr& fs, - const TabletSchema* tablet_schema, - const std::unordered_map& collect_infos) { + const RowsetSharedPtr& rowset, int32_t seg_id, const TabletSchema* tablet_schema, + const std::unordered_map& collect_infos, io::IOContext* io_ctx) { + auto seg_path = DORIS_TRY(rowset->segment_path(seg_id)); + auto rowset_meta = rowset->rowset_meta(); + auto idx_file_reader = std::make_unique( - fs, std::string {InvertedIndexDescriptor::get_index_file_path_prefix(seg_path)}, - tablet_schema->get_inverted_index_storage_format()); - RETURN_IF_ERROR(idx_file_reader->init()); + rowset_meta->fs(), + std::string {InvertedIndexDescriptor::get_index_file_path_prefix(seg_path)}, + tablet_schema->get_inverted_index_storage_format(), + rowset_meta->inverted_index_file_info(seg_id)); + RETURN_IF_ERROR(idx_file_reader->init(config::inverted_index_read_buffer_size, io_ctx)); int32_t total_seg_num_docs = 0; for (const auto& [ws_field_name, collect_info] : collect_infos) { #ifdef BE_TEST - auto compound_reader = DORIS_TRY(idx_file_reader->open(collect_info.index_meta, nullptr)); + auto compound_reader = DORIS_TRY(idx_file_reader->open(collect_info.index_meta, io_ctx)); auto* reader = lucene::index::IndexReader::open(compound_reader.get()); auto index_searcher = std::make_shared(reader, true); @@ -211,7 +237,7 @@ Status CollectionStatistics::process_segment( if (!InvertedIndexSearcherCache::instance()->lookup(searcher_cache_key, &inverted_index_cache_handle)) { auto compound_reader = - DORIS_TRY(idx_file_reader->open(collect_info.index_meta, nullptr)); + DORIS_TRY(idx_file_reader->open(collect_info.index_meta, io_ctx)); auto* reader = lucene::index::IndexReader::open(compound_reader.get()); size_t reader_size = reader->getTermInfosRAMUsed(); auto index_searcher = std::make_shared(reader, true); @@ -231,7 +257,7 @@ Status CollectionStatistics::process_segment( index_reader->sumTotalTermFreq(ws_field_name.c_str()).value_or(0); for (const auto& term_info : collect_info.term_infos) { - auto iter = TermIterator::create(nullptr, false, index_reader, ws_field_name, + auto iter = TermIterator::create(io_ctx, false, index_reader, ws_field_name, term_info.get_single_term()); _term_doc_freqs[ws_field_name][iter->term()] += iter->doc_freq(); } diff --git a/be/src/olap/collection_statistics.h b/be/src/olap/collection_statistics.h index 87a91c755a3623..9fdd3ddde30650 100644 --- a/be/src/olap/collection_statistics.h +++ b/be/src/olap/collection_statistics.h @@ -32,10 +32,14 @@ namespace doris { namespace io { class FileSystem; using FileSystemSPtr = std::shared_ptr; +struct IOContext; } // namespace io struct RowSetSplits; +class Rowset; +using RowsetSharedPtr = std::shared_ptr; + class TabletIndex; class TabletSchema; using TabletSchemaSPtr = std::shared_ptr; @@ -59,7 +63,8 @@ class CollectionStatistics { Status collect(RuntimeState* state, const std::vector& rs_splits, const TabletSchemaSPtr& tablet_schema, - const vectorized::VExprContextSPtrs& common_expr_ctxs_push_down); + const vectorized::VExprContextSPtrs& common_expr_ctxs_push_down, + io::IOContext* io_ctx); MOCK_FUNCTION float get_or_calculate_idf(const std::wstring& lucene_col_name, const std::wstring& term); @@ -70,9 +75,10 @@ class CollectionStatistics { const vectorized::VExprContextSPtrs& common_expr_ctxs_push_down, const TabletSchemaSPtr& tablet_schema, std::unordered_map* collect_infos); - Status process_segment(const std::string& seg_path, const io::FileSystemSPtr& fs, + Status process_segment(const RowsetSharedPtr& rowset, int32_t seg_id, const TabletSchema* tablet_schema, - const std::unordered_map& collect_infos); + const std::unordered_map& collect_infos, + io::IOContext* io_ctx); uint64_t get_term_doc_freq_by_col(const std::wstring& lucene_col_name, const std::wstring& term); diff --git a/be/src/pipeline/exec/olap_scan_operator.cpp b/be/src/pipeline/exec/olap_scan_operator.cpp index 7738c195474f64..962a2fe14ce47e 100644 --- a/be/src/pipeline/exec/olap_scan_operator.cpp +++ b/be/src/pipeline/exec/olap_scan_operator.cpp @@ -196,6 +196,7 @@ Status OlapScanLocalState::_init_profile() { _total_pages_num_counter = ADD_COUNTER(_segment_profile, "TotalPagesNum", TUnit::UNIT); _cached_pages_num_counter = ADD_COUNTER(_segment_profile, "CachedPagesNum", TUnit::UNIT); + _statistics_collect_timer = ADD_TIMER(_scanner_profile, "StatisticsCollectTime"); _inverted_index_filter_counter = ADD_COUNTER(_segment_profile, "RowsInvertedIndexFiltered", TUnit::UNIT); _inverted_index_filter_timer = ADD_TIMER(_segment_profile, "InvertedIndexFilterTime"); diff --git a/be/src/pipeline/exec/olap_scan_operator.h b/be/src/pipeline/exec/olap_scan_operator.h index 331091a36504c4..4d6910b065f143 100644 --- a/be/src/pipeline/exec/olap_scan_operator.h +++ b/be/src/pipeline/exec/olap_scan_operator.h @@ -193,6 +193,7 @@ class OlapScanLocalState final : public ScanLocalState { // used by segment v2 RuntimeProfile::Counter* _cached_pages_num_counter = nullptr; + RuntimeProfile::Counter* _statistics_collect_timer = nullptr; RuntimeProfile::Counter* _inverted_index_filter_counter = nullptr; RuntimeProfile::Counter* _inverted_index_filter_timer = nullptr; RuntimeProfile::Counter* _inverted_index_query_null_bitmap_timer = nullptr; diff --git a/be/src/vec/exec/scan/olap_scanner.cpp b/be/src/vec/exec/scan/olap_scanner.cpp index 9f72d5caeddf86..b5a3cd62507c71 100644 --- a/be/src/vec/exec/scan/olap_scanner.cpp +++ b/be/src/vec/exec/scan/olap_scanner.cpp @@ -288,10 +288,20 @@ Status OlapScanner::prepare() { } if (_tablet_reader_params.score_runtime) { + SCOPED_TIMER(local_state->_statistics_collect_timer); _tablet_reader_params.collection_statistics = std::make_shared(); + + io::IOContext io_ctx { + .reader_type = ReaderType::READER_QUERY, + .expiration_time = tablet->ttl_seconds(), + .query_id = &_state->query_id(), + .file_cache_stats = &_tablet_reader->mutable_stats()->file_cache_stats, + .is_inverted_index = true, + }; + RETURN_IF_ERROR(_tablet_reader_params.collection_statistics->collect( _state, _tablet_reader_params.rs_splits, _tablet_reader_params.tablet_schema, - _tablet_reader_params.common_expr_ctxs_push_down)); + _tablet_reader_params.common_expr_ctxs_push_down, &io_ctx)); } _has_prepared = true; diff --git a/be/test/olap/collection_statistics_test.cpp b/be/test/olap/collection_statistics_test.cpp index 7988f84216150b..c5b9b7054e7a76 100644 --- a/be/test/olap/collection_statistics_test.cpp +++ b/be/test/olap/collection_statistics_test.cpp @@ -327,7 +327,8 @@ TEST_F(CollectionStatisticsTest, CollectWithEmptyRowsetSplits) { std::vector empty_splits; - auto status = stats_->collect(runtime_state_.get(), empty_splits, tablet_schema, expr_contexts); + auto status = stats_->collect(runtime_state_.get(), empty_splits, tablet_schema, expr_contexts, + nullptr); EXPECT_TRUE(status.ok()) << status.msg(); } @@ -337,8 +338,8 @@ TEST_F(CollectionStatisticsTest, CollectWithEmptyExpressions) { std::vector empty_splits; - auto status = - stats_->collect(runtime_state_.get(), empty_splits, tablet_schema, empty_contexts); + auto status = stats_->collect(runtime_state_.get(), empty_splits, tablet_schema, empty_contexts, + nullptr); EXPECT_TRUE(status.ok()) << status.msg(); } @@ -353,7 +354,8 @@ TEST_F(CollectionStatisticsTest, CollectWithNonMatchExpression) { std::vector empty_splits; - auto status = stats_->collect(runtime_state_.get(), empty_splits, tablet_schema, contexts); + auto status = + stats_->collect(runtime_state_.get(), empty_splits, tablet_schema, contexts, nullptr); EXPECT_TRUE(status.ok()) << status.msg(); } @@ -380,7 +382,8 @@ TEST_F(CollectionStatisticsTest, CollectWithMultipleMatchExpressions) { std::vector empty_splits; - auto status = stats_->collect(runtime_state_.get(), empty_splits, tablet_schema, contexts); + auto status = + stats_->collect(runtime_state_.get(), empty_splits, tablet_schema, contexts, nullptr); EXPECT_TRUE(status.ok()) << status.msg(); } @@ -407,7 +410,8 @@ TEST_F(CollectionStatisticsTest, CollectWithNestedExpressions) { std::vector empty_splits; - auto status = stats_->collect(runtime_state_.get(), empty_splits, tablet_schema, contexts); + auto status = + stats_->collect(runtime_state_.get(), empty_splits, tablet_schema, contexts, nullptr); EXPECT_TRUE(status.ok()) << status.msg(); } @@ -417,7 +421,8 @@ TEST_F(CollectionStatisticsTest, CollectWithMockRowsetSplits) { auto splits = create_mock_rowset_splits(2); - auto status = stats_->collect(runtime_state_.get(), splits, tablet_schema, expr_contexts); + auto status = + stats_->collect(runtime_state_.get(), splits, tablet_schema, expr_contexts, nullptr); EXPECT_TRUE(status.ok()); } @@ -428,7 +433,8 @@ TEST_F(CollectionStatisticsTest, CollectWithEmptySegments) { auto splits = create_mock_rowset_splits(0); - auto status = stats_->collect(runtime_state_.get(), splits, tablet_schema, expr_contexts); + auto status = + stats_->collect(runtime_state_.get(), splits, tablet_schema, expr_contexts, nullptr); EXPECT_TRUE(status.ok()) << status.msg(); } @@ -450,7 +456,8 @@ TEST_F(CollectionStatisticsTest, CollectWithMultipleRowsetSplits) { splits.push_back(split); } - auto status = stats_->collect(runtime_state_.get(), splits, tablet_schema, expr_contexts); + auto status = + stats_->collect(runtime_state_.get(), splits, tablet_schema, expr_contexts, nullptr); EXPECT_TRUE(status.ok()) << status.msg(); } @@ -581,7 +588,8 @@ TEST_F(CollectionStatisticsTest, CollectWithCastWrappedSlotRef) { contexts.push_back(std::make_shared(match_expr)); std::vector empty_splits; - auto status = stats_->collect(runtime_state_.get(), empty_splits, tablet_schema, contexts); + auto status = + stats_->collect(runtime_state_.get(), empty_splits, tablet_schema, contexts, nullptr); EXPECT_TRUE(status.ok()) << status.msg(); } @@ -605,7 +613,8 @@ TEST_F(CollectionStatisticsTest, CollectWithDoubleCastWrappedSlotRef) { contexts.push_back(std::make_shared(match_expr)); std::vector empty_splits; - auto status = stats_->collect(runtime_state_.get(), empty_splits, tablet_schema, contexts); + auto status = + stats_->collect(runtime_state_.get(), empty_splits, tablet_schema, contexts, nullptr); EXPECT_TRUE(status.ok()) << status.msg(); } diff --git a/regression-test/suites/inverted_index_p0/test_inverted_index_collection_stats.groovy b/regression-test/suites/inverted_index_p0/test_inverted_index_collection_stats.groovy new file mode 100644 index 00000000000000..2b118436c07f55 --- /dev/null +++ b/regression-test/suites/inverted_index_p0/test_inverted_index_collection_stats.groovy @@ -0,0 +1,77 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +import java.util.regex.Pattern + +suite('test_inverted_index_collection_stats', 'p0') { + def indexTbName1 = "test_inverted_index_collection_stats_tbl" + + sql "DROP TABLE IF EXISTS ${indexTbName1}" + + sql """ + CREATE TABLE ${indexTbName1} ( + `id` int(11) NULL COMMENT "", + `content` text NULL COMMENT "", + INDEX content_idx (`content`) USING INVERTED PROPERTIES("parser" = "english", "support_phrase" = "true") COMMENT '' + ) ENGINE=OLAP + DUPLICATE KEY(`id`) + COMMENT "OLAP" + DISTRIBUTED BY RANDOM BUCKETS 1 + PROPERTIES ( + "replication_allocation" = "tag.location.default: 1" + ); + """ + + sql """ INSERT INTO ${indexTbName1} VALUES (1, 'hello world'), (2, 'hello doris'), (3, 'doris is great') """ + + sql "sync" + + // Enable profile + sql """ set enable_profile = true; """ + sql """ set profile_level = 2; """ + sql """ set enable_common_expr_pushdown = true; """ + sql """ set enable_common_expr_pushdown_for_inverted_index = true; """ + + // Execute MATCH_ALL query which triggers CollectionStatistics::collect + def queryId = "test_inverted_index_collection_stats_${System.currentTimeMillis()}" + try { + profile("${queryId}") { + run { + sql "/* ${queryId} */ select score() as score from ${indexTbName1} where content match_all 'hello' order by score desc limit 10" + } + + check { profileString, exception -> + def statisticsCollectTime = 0 + def matcher = Pattern.compile("StatisticsCollectTime:\\s*(\\d+)").matcher(profileString) + if (matcher.find()) { + statisticsCollectTime = Integer.parseInt(matcher.group(1)) + log.info("StatisticsCollectTime: {}", statisticsCollectTime) + } + assertTrue(statisticsCollectTime > 0, "StatisticsCollectTime should be > 0, got: ${statisticsCollectTime}") + } + } + } catch (Exception e) { + if (e.message?.contains("HttpCliAction failed")) { + log.warn("Profile HTTP request failed, skipping profile check: {}", e.message) + } else { + log.warn("Profile check failed: {}", e.message) + throw e + } + } finally { + // sql "DROP TABLE IF EXISTS ${indexTbName1}" + } +}