Skip to content

Commit 33edbe7

Browse files
zzzxl1993Your Name
authored andcommitted
[opt](inverted index) optimize S3 operations for inverted index (#59363)
1 parent 708affa commit 33edbe7

File tree

7 files changed

+166
-36
lines changed

7 files changed

+166
-36
lines changed

be/src/olap/collection_statistics.cpp

Lines changed: 47 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,8 @@
1717

1818
#include "collection_statistics.h"
1919

20+
#include <sstream>
21+
2022
#include "common/exception.h"
2123
#include "olap/rowset/rowset.h"
2224
#include "olap/rowset/rowset_reader.h"
@@ -35,21 +37,22 @@ namespace doris {
3537
Status CollectionStatistics::collect(
3638
RuntimeState* state, const std::vector<RowSetSplits>& rs_splits,
3739
const TabletSchemaSPtr& tablet_schema,
38-
const vectorized::VExprContextSPtrs& common_expr_ctxs_push_down) {
40+
const vectorized::VExprContextSPtrs& common_expr_ctxs_push_down, io::IOContext* io_ctx) {
3941
std::unordered_map<std::wstring, CollectInfo> collect_infos;
4042
RETURN_IF_ERROR(
4143
extract_collect_info(state, common_expr_ctxs_push_down, tablet_schema, &collect_infos));
44+
if (collect_infos.empty()) {
45+
LOG(WARNING) << "Index statistics collection: no collect info extracted.";
46+
return Status::OK();
47+
}
4248

4349
for (const auto& rs_split : rs_splits) {
4450
const auto& rs_reader = rs_split.rs_reader;
4551
auto rowset = rs_reader->rowset();
46-
auto rowset_meta = rowset->rowset_meta();
47-
4852
auto num_segments = rowset->num_segments();
4953
for (int32_t seg_id = 0; seg_id < num_segments; ++seg_id) {
50-
auto seg_path = DORIS_TRY(rowset->segment_path(seg_id));
51-
auto status = process_segment(seg_path, rowset_meta->fs(), tablet_schema.get(),
52-
collect_infos);
54+
auto status =
55+
process_segment(rowset, seg_id, tablet_schema.get(), collect_infos, io_ctx);
5356
if (!status.ok()) {
5457
if (status.code() == ErrorCode::INVERTED_INDEX_FILE_NOT_FOUND ||
5558
status.code() == ErrorCode::INVERTED_INDEX_BYPASS) {
@@ -62,15 +65,29 @@ Status CollectionStatistics::collect(
6265
}
6366

6467
#ifndef NDEBUG
65-
LOG(INFO) << "term_num_docs: " << _total_num_docs;
68+
std::stringstream ss;
69+
ss << "term_num_docs: " << _total_num_docs;
6670
for (const auto& [ws_field_name, num_tokens] : _total_num_tokens) {
67-
LOG(INFO) << "field_name: " << StringHelper::to_string(ws_field_name)
68-
<< ", num_tokens: " << num_tokens;
69-
for (const auto& [term, doc_freq] : _term_doc_freqs.at(ws_field_name)) {
70-
LOG(INFO) << "term: " << StringHelper::to_string(term) << ", doc_freq: " << doc_freq;
71+
ss << ", [field_name: " << StringHelper::to_string(ws_field_name)
72+
<< ", num_tokens: " << num_tokens;
73+
auto it = _term_doc_freqs.find(ws_field_name);
74+
if (it != _term_doc_freqs.end()) {
75+
ss << ", terms: {";
76+
bool first = true;
77+
for (const auto& [term, doc_freq] : it->second) {
78+
if (!first) {
79+
ss << ", ";
80+
}
81+
ss << StringHelper::to_string(term) << ": " << doc_freq;
82+
first = false;
83+
}
84+
ss << "}";
85+
} else {
86+
ss << ", (no term stats)";
7187
}
88+
ss << "]";
7289
}
73-
LOG(INFO) << "--------------------------------";
90+
LOG(INFO) << "CollectionStatistics: " << ss.str();
7491
#endif
7592

7693
return Status::OK();
@@ -136,6 +153,11 @@ Status handle_match_pred(RuntimeState* state, const TabletSchemaSPtr& tablet_sch
136153

137154
auto term_infos = InvertedIndexAnalyzer::get_analyse_result(
138155
right_literal->value(format_options), index_meta->properties());
156+
if (term_infos.empty()) {
157+
LOG(WARNING) << "Index statistics collection: no terms extracted from literal value, "
158+
<< "col_unique_id=" << index_meta->col_unique_ids()[0];
159+
continue;
160+
}
139161

140162
std::string field_name = std::to_string(index_meta->col_unique_ids()[0]);
141163
if (!column.suffix_path().empty()) {
@@ -188,18 +210,22 @@ Status CollectionStatistics::extract_collect_info(
188210
}
189211

190212
Status CollectionStatistics::process_segment(
191-
const std::string& seg_path, const io::FileSystemSPtr& fs,
192-
const TabletSchema* tablet_schema,
193-
const std::unordered_map<std::wstring, CollectInfo>& collect_infos) {
213+
const RowsetSharedPtr& rowset, int32_t seg_id, const TabletSchema* tablet_schema,
214+
const std::unordered_map<std::wstring, CollectInfo>& collect_infos, io::IOContext* io_ctx) {
215+
auto seg_path = DORIS_TRY(rowset->segment_path(seg_id));
216+
auto rowset_meta = rowset->rowset_meta();
217+
194218
auto idx_file_reader = std::make_unique<IndexFileReader>(
195-
fs, std::string {InvertedIndexDescriptor::get_index_file_path_prefix(seg_path)},
196-
tablet_schema->get_inverted_index_storage_format());
197-
RETURN_IF_ERROR(idx_file_reader->init());
219+
rowset_meta->fs(),
220+
std::string {InvertedIndexDescriptor::get_index_file_path_prefix(seg_path)},
221+
tablet_schema->get_inverted_index_storage_format(),
222+
rowset_meta->inverted_index_file_info(seg_id));
223+
RETURN_IF_ERROR(idx_file_reader->init(config::inverted_index_read_buffer_size, io_ctx));
198224

199225
int32_t total_seg_num_docs = 0;
200226
for (const auto& [ws_field_name, collect_info] : collect_infos) {
201227
#ifdef BE_TEST
202-
auto compound_reader = DORIS_TRY(idx_file_reader->open(collect_info.index_meta, nullptr));
228+
auto compound_reader = DORIS_TRY(idx_file_reader->open(collect_info.index_meta, io_ctx));
203229
auto* reader = lucene::index::IndexReader::open(compound_reader.get());
204230
auto index_searcher = std::make_shared<lucene::search::IndexSearcher>(reader, true);
205231

@@ -211,7 +237,7 @@ Status CollectionStatistics::process_segment(
211237
if (!InvertedIndexSearcherCache::instance()->lookup(searcher_cache_key,
212238
&inverted_index_cache_handle)) {
213239
auto compound_reader =
214-
DORIS_TRY(idx_file_reader->open(collect_info.index_meta, nullptr));
240+
DORIS_TRY(idx_file_reader->open(collect_info.index_meta, io_ctx));
215241
auto* reader = lucene::index::IndexReader::open(compound_reader.get());
216242
size_t reader_size = reader->getTermInfosRAMUsed();
217243
auto index_searcher = std::make_shared<lucene::search::IndexSearcher>(reader, true);
@@ -231,7 +257,7 @@ Status CollectionStatistics::process_segment(
231257
index_reader->sumTotalTermFreq(ws_field_name.c_str()).value_or(0);
232258

233259
for (const auto& term_info : collect_info.term_infos) {
234-
auto iter = TermIterator::create(nullptr, false, index_reader, ws_field_name,
260+
auto iter = TermIterator::create(io_ctx, false, index_reader, ws_field_name,
235261
term_info.get_single_term());
236262
_term_doc_freqs[ws_field_name][iter->term()] += iter->doc_freq();
237263
}

be/src/olap/collection_statistics.h

Lines changed: 9 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -32,10 +32,14 @@ namespace doris {
3232
namespace io {
3333
class FileSystem;
3434
using FileSystemSPtr = std::shared_ptr<FileSystem>;
35+
struct IOContext;
3536
} // namespace io
3637

3738
struct RowSetSplits;
3839

40+
class Rowset;
41+
using RowsetSharedPtr = std::shared_ptr<Rowset>;
42+
3943
class TabletIndex;
4044
class TabletSchema;
4145
using TabletSchemaSPtr = std::shared_ptr<TabletSchema>;
@@ -59,7 +63,8 @@ class CollectionStatistics {
5963

6064
Status collect(RuntimeState* state, const std::vector<RowSetSplits>& rs_splits,
6165
const TabletSchemaSPtr& tablet_schema,
62-
const vectorized::VExprContextSPtrs& common_expr_ctxs_push_down);
66+
const vectorized::VExprContextSPtrs& common_expr_ctxs_push_down,
67+
io::IOContext* io_ctx);
6368

6469
MOCK_FUNCTION float get_or_calculate_idf(const std::wstring& lucene_col_name,
6570
const std::wstring& term);
@@ -70,9 +75,10 @@ class CollectionStatistics {
7075
const vectorized::VExprContextSPtrs& common_expr_ctxs_push_down,
7176
const TabletSchemaSPtr& tablet_schema,
7277
std::unordered_map<std::wstring, CollectInfo>* collect_infos);
73-
Status process_segment(const std::string& seg_path, const io::FileSystemSPtr& fs,
78+
Status process_segment(const RowsetSharedPtr& rowset, int32_t seg_id,
7479
const TabletSchema* tablet_schema,
75-
const std::unordered_map<std::wstring, CollectInfo>& collect_infos);
80+
const std::unordered_map<std::wstring, CollectInfo>& collect_infos,
81+
io::IOContext* io_ctx);
7682

7783
uint64_t get_term_doc_freq_by_col(const std::wstring& lucene_col_name,
7884
const std::wstring& term);

be/src/pipeline/exec/olap_scan_operator.cpp

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -196,6 +196,7 @@ Status OlapScanLocalState::_init_profile() {
196196
_total_pages_num_counter = ADD_COUNTER(_segment_profile, "TotalPagesNum", TUnit::UNIT);
197197
_cached_pages_num_counter = ADD_COUNTER(_segment_profile, "CachedPagesNum", TUnit::UNIT);
198198

199+
_statistics_collect_timer = ADD_TIMER(_scanner_profile, "StatisticsCollectTime");
199200
_inverted_index_filter_counter =
200201
ADD_COUNTER(_segment_profile, "RowsInvertedIndexFiltered", TUnit::UNIT);
201202
_inverted_index_filter_timer = ADD_TIMER(_segment_profile, "InvertedIndexFilterTime");

be/src/pipeline/exec/olap_scan_operator.h

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -188,6 +188,7 @@ class OlapScanLocalState final : public ScanLocalState<OlapScanLocalState> {
188188
// used by segment v2
189189
RuntimeProfile::Counter* _cached_pages_num_counter = nullptr;
190190

191+
RuntimeProfile::Counter* _statistics_collect_timer = nullptr;
191192
RuntimeProfile::Counter* _inverted_index_filter_counter = nullptr;
192193
RuntimeProfile::Counter* _inverted_index_filter_timer = nullptr;
193194
RuntimeProfile::Counter* _inverted_index_query_null_bitmap_timer = nullptr;

be/src/vec/exec/scan/olap_scanner.cpp

Lines changed: 11 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -289,10 +289,20 @@ Status OlapScanner::prepare() {
289289
}
290290

291291
if (_tablet_reader_params.score_runtime) {
292+
SCOPED_TIMER(local_state->_statistics_collect_timer);
292293
_tablet_reader_params.collection_statistics = std::make_shared<CollectionStatistics>();
294+
295+
io::IOContext io_ctx {
296+
.reader_type = ReaderType::READER_QUERY,
297+
.expiration_time = tablet->ttl_seconds(),
298+
.query_id = &_state->query_id(),
299+
.file_cache_stats = &_tablet_reader->mutable_stats()->file_cache_stats,
300+
.is_inverted_index = true,
301+
};
302+
293303
RETURN_IF_ERROR(_tablet_reader_params.collection_statistics->collect(
294304
_state, _tablet_reader_params.rs_splits, _tablet_reader_params.tablet_schema,
295-
_tablet_reader_params.common_expr_ctxs_push_down));
305+
_tablet_reader_params.common_expr_ctxs_push_down, &io_ctx));
296306
}
297307

298308
_has_prepared = true;

be/test/olap/collection_statistics_test.cpp

Lines changed: 20 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -327,7 +327,8 @@ TEST_F(CollectionStatisticsTest, CollectWithEmptyRowsetSplits) {
327327

328328
std::vector<RowSetSplits> empty_splits;
329329

330-
auto status = stats_->collect(runtime_state_.get(), empty_splits, tablet_schema, expr_contexts);
330+
auto status = stats_->collect(runtime_state_.get(), empty_splits, tablet_schema, expr_contexts,
331+
nullptr);
331332
EXPECT_TRUE(status.ok()) << status.msg();
332333
}
333334

@@ -337,8 +338,8 @@ TEST_F(CollectionStatisticsTest, CollectWithEmptyExpressions) {
337338

338339
std::vector<RowSetSplits> empty_splits;
339340

340-
auto status =
341-
stats_->collect(runtime_state_.get(), empty_splits, tablet_schema, empty_contexts);
341+
auto status = stats_->collect(runtime_state_.get(), empty_splits, tablet_schema, empty_contexts,
342+
nullptr);
342343
EXPECT_TRUE(status.ok()) << status.msg();
343344
}
344345

@@ -353,7 +354,8 @@ TEST_F(CollectionStatisticsTest, CollectWithNonMatchExpression) {
353354

354355
std::vector<RowSetSplits> empty_splits;
355356

356-
auto status = stats_->collect(runtime_state_.get(), empty_splits, tablet_schema, contexts);
357+
auto status =
358+
stats_->collect(runtime_state_.get(), empty_splits, tablet_schema, contexts, nullptr);
357359
EXPECT_TRUE(status.ok()) << status.msg();
358360
}
359361

@@ -380,7 +382,8 @@ TEST_F(CollectionStatisticsTest, CollectWithMultipleMatchExpressions) {
380382

381383
std::vector<RowSetSplits> empty_splits;
382384

383-
auto status = stats_->collect(runtime_state_.get(), empty_splits, tablet_schema, contexts);
385+
auto status =
386+
stats_->collect(runtime_state_.get(), empty_splits, tablet_schema, contexts, nullptr);
384387
EXPECT_TRUE(status.ok()) << status.msg();
385388
}
386389

@@ -407,7 +410,8 @@ TEST_F(CollectionStatisticsTest, CollectWithNestedExpressions) {
407410

408411
std::vector<RowSetSplits> empty_splits;
409412

410-
auto status = stats_->collect(runtime_state_.get(), empty_splits, tablet_schema, contexts);
413+
auto status =
414+
stats_->collect(runtime_state_.get(), empty_splits, tablet_schema, contexts, nullptr);
411415
EXPECT_TRUE(status.ok()) << status.msg();
412416
}
413417

@@ -417,7 +421,8 @@ TEST_F(CollectionStatisticsTest, CollectWithMockRowsetSplits) {
417421

418422
auto splits = create_mock_rowset_splits(2);
419423

420-
auto status = stats_->collect(runtime_state_.get(), splits, tablet_schema, expr_contexts);
424+
auto status =
425+
stats_->collect(runtime_state_.get(), splits, tablet_schema, expr_contexts, nullptr);
421426

422427
EXPECT_TRUE(status.ok());
423428
}
@@ -428,7 +433,8 @@ TEST_F(CollectionStatisticsTest, CollectWithEmptySegments) {
428433

429434
auto splits = create_mock_rowset_splits(0);
430435

431-
auto status = stats_->collect(runtime_state_.get(), splits, tablet_schema, expr_contexts);
436+
auto status =
437+
stats_->collect(runtime_state_.get(), splits, tablet_schema, expr_contexts, nullptr);
432438
EXPECT_TRUE(status.ok()) << status.msg();
433439
}
434440

@@ -450,7 +456,8 @@ TEST_F(CollectionStatisticsTest, CollectWithMultipleRowsetSplits) {
450456
splits.push_back(split);
451457
}
452458

453-
auto status = stats_->collect(runtime_state_.get(), splits, tablet_schema, expr_contexts);
459+
auto status =
460+
stats_->collect(runtime_state_.get(), splits, tablet_schema, expr_contexts, nullptr);
454461
EXPECT_TRUE(status.ok()) << status.msg();
455462
}
456463

@@ -581,7 +588,8 @@ TEST_F(CollectionStatisticsTest, CollectWithCastWrappedSlotRef) {
581588
contexts.push_back(std::make_shared<vectorized::VExprContext>(match_expr));
582589

583590
std::vector<RowSetSplits> empty_splits;
584-
auto status = stats_->collect(runtime_state_.get(), empty_splits, tablet_schema, contexts);
591+
auto status =
592+
stats_->collect(runtime_state_.get(), empty_splits, tablet_schema, contexts, nullptr);
585593
EXPECT_TRUE(status.ok()) << status.msg();
586594
}
587595

@@ -605,7 +613,8 @@ TEST_F(CollectionStatisticsTest, CollectWithDoubleCastWrappedSlotRef) {
605613
contexts.push_back(std::make_shared<vectorized::VExprContext>(match_expr));
606614

607615
std::vector<RowSetSplits> empty_splits;
608-
auto status = stats_->collect(runtime_state_.get(), empty_splits, tablet_schema, contexts);
616+
auto status =
617+
stats_->collect(runtime_state_.get(), empty_splits, tablet_schema, contexts, nullptr);
609618
EXPECT_TRUE(status.ok()) << status.msg();
610619
}
611620

Lines changed: 77 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,77 @@
1+
// Licensed to the Apache Software Foundation (ASF) under one
2+
// or more contributor license agreements. See the NOTICE file
3+
// distributed with this work for additional information
4+
// regarding copyright ownership. The ASF licenses this file
5+
// to you under the Apache License, Version 2.0 (the
6+
// "License"); you may not use this file except in compliance
7+
// with the License. You may obtain a copy of the License at
8+
//
9+
// http://www.apache.org/licenses/LICENSE-2.0
10+
//
11+
// Unless required by applicable law or agreed to in writing,
12+
// software distributed under the License is distributed on an
13+
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14+
// KIND, either express or implied. See the License for the
15+
// specific language governing permissions and limitations
16+
// under the License.
17+
18+
import java.util.regex.Pattern
19+
20+
suite('test_inverted_index_collection_stats', 'p0') {
21+
def indexTbName1 = "test_inverted_index_collection_stats_tbl"
22+
23+
sql "DROP TABLE IF EXISTS ${indexTbName1}"
24+
25+
sql """
26+
CREATE TABLE ${indexTbName1} (
27+
`id` int(11) NULL COMMENT "",
28+
`content` text NULL COMMENT "",
29+
INDEX content_idx (`content`) USING INVERTED PROPERTIES("parser" = "english", "support_phrase" = "true") COMMENT ''
30+
) ENGINE=OLAP
31+
DUPLICATE KEY(`id`)
32+
COMMENT "OLAP"
33+
DISTRIBUTED BY RANDOM BUCKETS 1
34+
PROPERTIES (
35+
"replication_allocation" = "tag.location.default: 1"
36+
);
37+
"""
38+
39+
sql """ INSERT INTO ${indexTbName1} VALUES (1, 'hello world'), (2, 'hello doris'), (3, 'doris is great') """
40+
41+
sql "sync"
42+
43+
// Enable profile
44+
sql """ set enable_profile = true; """
45+
sql """ set profile_level = 2; """
46+
sql """ set enable_common_expr_pushdown = true; """
47+
sql """ set enable_common_expr_pushdown_for_inverted_index = true; """
48+
49+
// Execute MATCH_ALL query which triggers CollectionStatistics::collect
50+
def queryId = "test_inverted_index_collection_stats_${System.currentTimeMillis()}"
51+
try {
52+
profile("${queryId}") {
53+
run {
54+
sql "/* ${queryId} */ select score() as score from ${indexTbName1} where content match_all 'hello' order by score desc limit 10"
55+
}
56+
57+
check { profileString, exception ->
58+
def statisticsCollectTime = 0
59+
def matcher = Pattern.compile("StatisticsCollectTime:\\s*(\\d+)").matcher(profileString)
60+
if (matcher.find()) {
61+
statisticsCollectTime = Integer.parseInt(matcher.group(1))
62+
log.info("StatisticsCollectTime: {}", statisticsCollectTime)
63+
}
64+
assertTrue(statisticsCollectTime > 0, "StatisticsCollectTime should be > 0, got: ${statisticsCollectTime}")
65+
}
66+
}
67+
} catch (Exception e) {
68+
if (e.message?.contains("HttpCliAction failed")) {
69+
log.warn("Profile HTTP request failed, skipping profile check: {}", e.message)
70+
} else {
71+
log.warn("Profile check failed: {}", e.message)
72+
throw e
73+
}
74+
} finally {
75+
// sql "DROP TABLE IF EXISTS ${indexTbName1}"
76+
}
77+
}

0 commit comments

Comments
 (0)