Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions be/src/io/cache/block_file_cache_profile.h
Original file line number Diff line number Diff line change
Expand Up @@ -94,6 +94,7 @@ struct FileCacheProfileReporter {
RuntimeProfile::Counter* inverted_index_bytes_scanned_from_remote = nullptr;
RuntimeProfile::Counter* inverted_index_local_io_timer = nullptr;
RuntimeProfile::Counter* inverted_index_remote_io_timer = nullptr;
RuntimeProfile::Counter* inverted_index_io_timer = nullptr;

FileCacheProfileReporter(RuntimeProfile* profile) {
static const char* cache_profile = "FileCache";
Expand Down Expand Up @@ -134,6 +135,8 @@ struct FileCacheProfileReporter {
profile, "InvertedIndexLocalIOUseTimer", cache_profile, 1);
inverted_index_remote_io_timer = ADD_CHILD_TIMER_WITH_LEVEL(
profile, "InvertedIndexRemoteIOUseTimer", cache_profile, 1);
inverted_index_io_timer =
ADD_CHILD_TIMER_WITH_LEVEL(profile, "InvertedIndexIOTimer", cache_profile, 1);
}

void update(const FileCacheStatistics* statistics) const {
Expand Down Expand Up @@ -162,6 +165,7 @@ struct FileCacheProfileReporter {
statistics->inverted_index_bytes_read_from_remote);
COUNTER_UPDATE(inverted_index_local_io_timer, statistics->inverted_index_local_io_timer);
COUNTER_UPDATE(inverted_index_remote_io_timer, statistics->inverted_index_remote_io_timer);
COUNTER_UPDATE(inverted_index_io_timer, statistics->inverted_index_io_timer);
}
};

Expand Down
1 change: 1 addition & 0 deletions be/src/io/io_common.h
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,7 @@ struct FileCacheStatistics {
int64_t inverted_index_bytes_read_from_remote = 0;
int64_t inverted_index_local_io_timer = 0;
int64_t inverted_index_remote_io_timer = 0;
int64_t inverted_index_io_timer = 0;
};

struct IOContext {
Expand Down
2 changes: 2 additions & 0 deletions be/src/olap/olap_common.h
Original file line number Diff line number Diff line change
Expand Up @@ -370,6 +370,8 @@ struct OlapReaderStatistics {
int64_t inverted_index_query_bitmap_copy_timer = 0;
int64_t inverted_index_searcher_open_timer = 0;
int64_t inverted_index_searcher_search_timer = 0;
int64_t inverted_index_searcher_search_init_timer = 0;
int64_t inverted_index_searcher_search_exec_timer = 0;
int64_t inverted_index_searcher_cache_hit = 0;
int64_t inverted_index_searcher_cache_miss = 0;
int64_t inverted_index_downgrade_count = 0;
Expand Down
57 changes: 34 additions & 23 deletions be/src/olap/rowset/segment_v2/inverted_index_fs_directory.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -204,35 +204,46 @@ void DorisFSDirectory::FSIndexInput::seekInternal(const int64_t position) {
void DorisFSDirectory::FSIndexInput::readInternal(uint8_t* b, const int32_t len) {
CND_PRECONDITION(_handle != nullptr, "shared file handle has closed");
CND_PRECONDITION(_handle->_reader != nullptr, "file is not open");
std::lock_guard<std::mutex> wlock(_handle->_shared_lock);

int64_t position = getFilePointer();
if (_pos != position) {
_pos = position;
}
int64_t inverted_index_io_timer = 0;
{
SCOPED_RAW_TIMER(&inverted_index_io_timer);

std::lock_guard<std::mutex> wlock(_handle->_shared_lock);

int64_t position = getFilePointer();
if (_pos != position) {
_pos = position;
}

if (_handle->_fpos != _pos) {
_handle->_fpos = _pos;
}

if (_handle->_fpos != _pos) {
Slice result {b, (size_t)len};
size_t bytes_read = 0;
Status st = _handle->_reader->read_at(_pos, result, &bytes_read, &_io_ctx);
DBUG_EXECUTE_IF("DorisFSDirectory::FSIndexInput::readInternal_reader_read_at_error", {
st = Status::InternalError(
"debug point: "
"DorisFSDirectory::FSIndexInput::readInternal_reader_read_at_error");
})
if (!st.ok()) {
_CLTHROWA(CL_ERR_IO, "read past EOF");
}
bufferLength = len;
DBUG_EXECUTE_IF("DorisFSDirectory::FSIndexInput::readInternal_bytes_read_error",
{ bytes_read = len + 10; })
if (bytes_read != len) {
_CLTHROWA(CL_ERR_IO, "read error");
}
_pos += bufferLength;
_handle->_fpos = _pos;
}

Slice result {b, (size_t)len};
size_t bytes_read = 0;
Status st = _handle->_reader->read_at(_pos, result, &bytes_read, &_io_ctx);
DBUG_EXECUTE_IF("DorisFSDirectory::FSIndexInput::readInternal_reader_read_at_error", {
st = Status::InternalError(
"debug point: DorisFSDirectory::FSIndexInput::readInternal_reader_read_at_error");
})
if (!st.ok()) {
_CLTHROWA(CL_ERR_IO, "read past EOF");
}
bufferLength = len;
DBUG_EXECUTE_IF("DorisFSDirectory::FSIndexInput::readInternal_bytes_read_error",
{ bytes_read = len + 10; })
if (bytes_read != len) {
_CLTHROWA(CL_ERR_IO, "read error");
if (_io_ctx.file_cache_stats != nullptr) {
_io_ctx.file_cache_stats->inverted_index_io_timer += inverted_index_io_timer;
}
_pos += bufferLength;
_handle->_fpos = _pos;
}

void DorisFSDirectory::FSIndexOutput::init(const io::FileSystemSPtr& fs, const char* path) {
Expand Down
2 changes: 2 additions & 0 deletions be/src/olap/rowset/segment_v2/inverted_index_fs_directory.h
Original file line number Diff line number Diff line change
Expand Up @@ -209,6 +209,8 @@ class DorisFSDirectory::FSIndexInput : public lucene::store::BufferedIndexInput
void seekInternal(const int64_t position) override;
// IndexInput methods
void readInternal(uint8_t* b, const int32_t len) override;

friend class DorisFSDirectoryTest;
};

class DorisFSDirectory::FSIndexOutput : public lucene::store::BufferedIndexOutput {
Expand Down
17 changes: 12 additions & 5 deletions be/src/olap/rowset/segment_v2/inverted_index_reader.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -287,8 +287,14 @@ Status InvertedIndexReader::match_index_search(
return Status::Error<ErrorCode::INVERTED_INDEX_INVALID_PARAMETERS>(
"query type " + query_type_to_string(query_type) + ", query is nullptr");
}
query->add(query_info);
query->search(*term_match_bitmap);
{
SCOPED_RAW_TIMER(&stats->inverted_index_searcher_search_init_timer);
query->add(query_info);
}
{
SCOPED_RAW_TIMER(&stats->inverted_index_searcher_search_exec_timer);
query->search(*term_match_bitmap);
}
} catch (const CLuceneError& e) {
return Status::Error<ErrorCode::INVERTED_INDEX_CLUCENE_ERROR>("CLuceneError occured: {}",
e.what());
Expand Down Expand Up @@ -615,10 +621,11 @@ Status BkdIndexReader::invoke_bkd_try_query(const io::IOContext* io_ctx, const v
return Status::OK();
}

Status BkdIndexReader::invoke_bkd_query(const io::IOContext* io_ctx, const void* query_value,
InvertedIndexQueryType query_type,
Status BkdIndexReader::invoke_bkd_query(const io::IOContext* io_ctx, OlapReaderStatistics* stats,
const void* query_value, InvertedIndexQueryType query_type,
std::shared_ptr<lucene::util::bkd::bkd_reader> r,
std::shared_ptr<roaring::Roaring>& bit_map) {
SCOPED_RAW_TIMER(&stats->inverted_index_searcher_search_timer);
switch (query_type) {
case InvertedIndexQueryType::LESS_THAN_QUERY: {
auto visitor =
Expand Down Expand Up @@ -733,7 +740,7 @@ Status BkdIndexReader::query(const io::IOContext* io_ctx, OlapReaderStatistics*
return Status::OK();
}

RETURN_IF_ERROR(invoke_bkd_query(io_ctx, query_value, query_type, r, bit_map));
RETURN_IF_ERROR(invoke_bkd_query(io_ctx, stats, query_value, query_type, r, bit_map));
bit_map->runOptimize();
cache->insert(cache_key, bit_map, &cache_handler);

Expand Down
4 changes: 2 additions & 2 deletions be/src/olap/rowset/segment_v2/inverted_index_reader.h
Original file line number Diff line number Diff line change
Expand Up @@ -353,8 +353,8 @@ class BkdIndexReader : public InvertedIndexReader {
Status invoke_bkd_try_query(const io::IOContext* io_ctx, const void* query_value,
InvertedIndexQueryType query_type,
std::shared_ptr<lucene::util::bkd::bkd_reader> r, uint32_t* count);
Status invoke_bkd_query(const io::IOContext* io_ctx, const void* query_value,
InvertedIndexQueryType query_type,
Status invoke_bkd_query(const io::IOContext* io_ctx, OlapReaderStatistics* stats,
const void* query_value, InvertedIndexQueryType query_type,
std::shared_ptr<lucene::util::bkd::bkd_reader> r,
std::shared_ptr<roaring::Roaring>& bit_map);
template <InvertedIndexQueryType QT>
Expand Down
4 changes: 4 additions & 0 deletions be/src/pipeline/exec/olap_scan_operator.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -178,6 +178,10 @@ Status OlapScanLocalState::_init_profile() {
ADD_TIMER(_segment_profile, "InvertedIndexSearcherOpenTime");
_inverted_index_searcher_search_timer =
ADD_TIMER(_segment_profile, "InvertedIndexSearcherSearchTime");
_inverted_index_searcher_search_init_timer =
ADD_TIMER(_segment_profile, "InvertedIndexSearcherSearchInitTime");
_inverted_index_searcher_search_exec_timer =
ADD_TIMER(_segment_profile, "InvertedIndexSearcherSearchExecTime");
_inverted_index_searcher_cache_hit_counter =
ADD_COUNTER(_segment_profile, "InvertedIndexSearcherCacheHit", TUnit::UNIT);
_inverted_index_searcher_cache_miss_counter =
Expand Down
2 changes: 2 additions & 0 deletions be/src/pipeline/exec/olap_scan_operator.h
Original file line number Diff line number Diff line change
Expand Up @@ -185,6 +185,8 @@ class OlapScanLocalState final : public ScanLocalState<OlapScanLocalState> {
RuntimeProfile::Counter* _inverted_index_query_bitmap_copy_timer = nullptr;
RuntimeProfile::Counter* _inverted_index_searcher_open_timer = nullptr;
RuntimeProfile::Counter* _inverted_index_searcher_search_timer = nullptr;
RuntimeProfile::Counter* _inverted_index_searcher_search_init_timer = nullptr;
RuntimeProfile::Counter* _inverted_index_searcher_search_exec_timer = nullptr;
RuntimeProfile::Counter* _inverted_index_searcher_cache_hit_counter = nullptr;
RuntimeProfile::Counter* _inverted_index_searcher_cache_miss_counter = nullptr;
RuntimeProfile::Counter* _inverted_index_downgrade_count_counter = nullptr;
Expand Down
4 changes: 4 additions & 0 deletions be/src/vec/exec/scan/olap_scanner.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -638,6 +638,10 @@ void OlapScanner::_collect_profile_before_close() {
stats.inverted_index_searcher_open_timer);
COUNTER_UPDATE(local_state->_inverted_index_searcher_search_timer,
stats.inverted_index_searcher_search_timer);
COUNTER_UPDATE(local_state->_inverted_index_searcher_search_init_timer,
stats.inverted_index_searcher_search_init_timer);
COUNTER_UPDATE(local_state->_inverted_index_searcher_search_exec_timer,
stats.inverted_index_searcher_search_exec_timer);
COUNTER_UPDATE(local_state->_inverted_index_searcher_cache_hit_counter,
stats.inverted_index_searcher_cache_hit);
COUNTER_UPDATE(local_state->_inverted_index_searcher_cache_miss_counter,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -169,8 +169,9 @@ class IndexCompactionUtils {
PrimitiveType::TYPE_INT, &param_value, query_param)
.ok());
auto result = std::make_shared<roaring::Roaring>();
OlapReaderStatistics stats;
EXPECT_TRUE(idx_reader
->invoke_bkd_query(nullptr, query_param->get_value(),
->invoke_bkd_query(nullptr, &stats, query_param->get_value(),
InvertedIndexQueryType::EQUAL_QUERY,
*bkd_searcher, result)
.ok());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -778,4 +778,47 @@ TEST_F(DorisFSDirectoryTest, FSIndexOutputFlushBufferWithNullBuffer) {
delete output;
}

TEST_F(DorisFSDirectoryTest, FSIndexInputReadInternalTimer) {
std::string file_name = "test_timer_file";
std::filesystem::path test_file = _tmp_dir / file_name;
std::ofstream ofs(test_file);
std::string content = "some test content for timer";
ofs << content;
ofs.close();

lucene::store::IndexInput* input1 = nullptr;
CLuceneError error;
bool result =
DorisFSDirectory::FSIndexInput::open(_fs, test_file.string().c_str(), input1, error);
EXPECT_TRUE(result);
ASSERT_NE(input1, nullptr);

auto* fs_input1 = dynamic_cast<DorisFSDirectory::FSIndexInput*>(input1);
ASSERT_NE(fs_input1, nullptr);

io::FileCacheStatistics stats;
fs_input1->_io_ctx.file_cache_stats = &stats;

auto* input2 = fs_input1->clone();
auto* fs_input2 = dynamic_cast<DorisFSDirectory::FSIndexInput*>(input2);
ASSERT_NE(fs_input2, nullptr);

fs_input2->_io_ctx.file_cache_stats = &stats;

uint8_t buffer1[10];
input1->readBytes(buffer1, 10);
EXPECT_GT(stats.inverted_index_io_timer, 0);
int64_t old_time = stats.inverted_index_io_timer;

std::this_thread::sleep_for(std::chrono::milliseconds(100));

input2->seek(0);
uint8_t buffer2[10];
input2->readBytes(buffer2, 10);
EXPECT_GT(stats.inverted_index_io_timer, old_time);

_CLDELETE(input2);
_CLDELETE(input1);
}

} // namespace doris::segment_v2
Loading