Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions be/src/io/cache/block_file_cache_profile.h
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,7 @@ struct FileCacheProfile {
struct FileCacheProfileReporter {
RuntimeProfile::Counter* num_local_io_total = nullptr;
RuntimeProfile::Counter* num_remote_io_total = nullptr;
RuntimeProfile::Counter* num_inverted_index_remote_io_total = nullptr;
RuntimeProfile::Counter* local_io_timer = nullptr;
RuntimeProfile::Counter* bytes_scanned_from_cache = nullptr;
RuntimeProfile::Counter* bytes_scanned_from_remote = nullptr;
Expand All @@ -90,6 +91,8 @@ struct FileCacheProfileReporter {
cache_profile, 1);
num_remote_io_total = ADD_CHILD_COUNTER_WITH_LEVEL(profile, "NumRemoteIOTotal", TUnit::UNIT,
cache_profile, 1);
num_inverted_index_remote_io_total = ADD_CHILD_COUNTER_WITH_LEVEL(
profile, "NumInvertedIndexRemoteIOTotal", TUnit::UNIT, cache_profile, 1);
local_io_timer = ADD_CHILD_TIMER_WITH_LEVEL(profile, "LocalIOUseTimer", cache_profile, 1);
remote_io_timer = ADD_CHILD_TIMER_WITH_LEVEL(profile, "RemoteIOUseTimer", cache_profile, 1);
write_cache_io_timer =
Expand All @@ -107,6 +110,8 @@ struct FileCacheProfileReporter {
void update(const FileCacheStatistics* statistics) const {
COUNTER_UPDATE(num_local_io_total, statistics->num_local_io_total);
COUNTER_UPDATE(num_remote_io_total, statistics->num_remote_io_total);
COUNTER_UPDATE(num_inverted_index_remote_io_total,
statistics->num_inverted_index_remote_io_total);
COUNTER_UPDATE(local_io_timer, statistics->local_io_timer);
COUNTER_UPDATE(remote_io_timer, statistics->remote_io_timer);
COUNTER_UPDATE(write_cache_io_timer, statistics->write_cache_io_timer);
Expand Down
8 changes: 6 additions & 2 deletions be/src/io/cache/cached_remote_file_reader.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -126,7 +126,7 @@ Status CachedRemoteFileReader::read_at_impl(size_t offset, Slice result, size_t*
ReadStatistics stats;
auto defer_func = [&](int*) {
if (io_ctx->file_cache_stats) {
_update_state(stats, io_ctx->file_cache_stats);
_update_state(stats, io_ctx->file_cache_stats, io_ctx->is_inverted_index);
io::FileCacheProfile::instance().update(io_ctx->file_cache_stats);
}
};
Expand Down Expand Up @@ -312,14 +312,18 @@ Status CachedRemoteFileReader::read_at_impl(size_t offset, Slice result, size_t*
}

void CachedRemoteFileReader::_update_state(const ReadStatistics& read_stats,
FileCacheStatistics* statis) const {
FileCacheStatistics* statis,
bool is_inverted_index) const {
if (statis == nullptr) {
return;
}
if (read_stats.hit_cache) {
statis->num_local_io_total++;
statis->bytes_read_from_local += read_stats.bytes_read;
} else {
if (is_inverted_index) {
statis->num_inverted_index_remote_io_total++;
}
statis->num_remote_io_total++;
statis->bytes_read_from_remote += read_stats.bytes_read;
}
Expand Down
3 changes: 2 additions & 1 deletion be/src/io/cache/cached_remote_file_reader.h
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,8 @@ class CachedRemoteFileReader final : public FileReader {
int64_t local_read_timer = 0;
int64_t local_write_timer = 0;
};
void _update_state(const ReadStatistics& stats, FileCacheStatistics* state) const;
void _update_state(const ReadStatistics& stats, FileCacheStatistics* state,
bool is_inverted_index) const;
};

} // namespace doris::io
2 changes: 2 additions & 0 deletions be/src/io/io_common.h
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ namespace io {
struct FileCacheStatistics {
int64_t num_local_io_total = 0;
int64_t num_remote_io_total = 0;
int64_t num_inverted_index_remote_io_total = 0;
int64_t local_io_timer = 0;
int64_t bytes_read_from_local = 0;
int64_t bytes_read_from_remote = 0;
Expand All @@ -60,6 +61,7 @@ struct IOContext {
int64_t expiration_time = 0;
const TUniqueId* query_id = nullptr; // Ref
FileCacheStatistics* file_cache_stats = nullptr; // Ref
bool is_inverted_index = false;
};

} // namespace io
Expand Down
11 changes: 9 additions & 2 deletions be/src/olap/rowset/segment_v2/inverted_index_compound_reader.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -96,12 +96,19 @@ void CSIndexInput::readInternal(uint8_t* b, const int32_t len) {
if (start + len > _length) {
_CLTHROWA(CL_ERR_IO, "read past EOF");
}
base->setIoContext(_io_ctx);

if (_io_ctx) {
base->setIoContext(_io_ctx);
}

base->setIndexFile(_is_index_file);
base->seek(fileOffset + start);
bool read_from_buffer = true;
base->readBytes(b, len, read_from_buffer);
base->setIoContext(nullptr);

if (_io_ctx) {
base->setIoContext(nullptr);
}
}

CSIndexInput::~CSIndexInput() = default;
Expand Down
13 changes: 10 additions & 3 deletions be/src/olap/rowset/segment_v2/inverted_index_file_reader.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -27,21 +27,27 @@

namespace doris::segment_v2 {

Status InvertedIndexFileReader::init(int32_t read_buffer_size) {
Status InvertedIndexFileReader::init(int32_t read_buffer_size, const io::IOContext* io_ctx) {
if (!_inited) {
_read_buffer_size = read_buffer_size;
if (_storage_format >= InvertedIndexStorageFormatPB::V2) {
auto st = _init_from(read_buffer_size);
auto st = _init_from(read_buffer_size, io_ctx);
if (!st.ok()) {
return st;
}
}
_inited = true;
} else {
if (_storage_format == InvertedIndexStorageFormatPB::V2) {
if (_stream) {
_stream->setIoContext(io_ctx);
}
}
}
return Status::OK();
}

Status InvertedIndexFileReader::_init_from(int32_t read_buffer_size) {
Status InvertedIndexFileReader::_init_from(int32_t read_buffer_size, const io::IOContext* io_ctx) {
auto index_file_full_path = InvertedIndexDescriptor::get_index_file_path_v2(_index_path_prefix);

std::unique_lock<std::shared_mutex> lock(_mutex); // Lock for writing
Expand Down Expand Up @@ -76,6 +82,7 @@ Status InvertedIndexFileReader::_init_from(int32_t read_buffer_size) {
err.what());
}
_stream = std::unique_ptr<CL_NS(store)::IndexInput>(index_input);
_stream->setIoContext(io_ctx);

// 3. read file
int32_t version = _stream->readInt(); // Read version number
Expand Down
5 changes: 3 additions & 2 deletions be/src/olap/rowset/segment_v2/inverted_index_file_reader.h
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,8 @@ class InvertedIndexFileReader {
_storage_format(storage_format),
_idx_file_info(idx_file_info) {}

Status init(int32_t read_buffer_size = config::inverted_index_read_buffer_size);
Status init(int32_t read_buffer_size = config::inverted_index_read_buffer_size,
const io::IOContext* io_ctx = nullptr);
Result<std::unique_ptr<DorisCompoundReader>> open(const TabletIndex* index_meta) const;
void debug_file_entries();
std::string get_index_file_cache_key(const TabletIndex* index_meta) const;
Expand All @@ -70,7 +71,7 @@ class InvertedIndexFileReader {
int64_t get_inverted_file_size() const { return _stream == nullptr ? 0 : _stream->length(); }

private:
Status _init_from(int32_t read_buffer_size);
Status _init_from(int32_t read_buffer_size, const io::IOContext* io_ctx);
Result<std::unique_ptr<DorisCompoundReader>> _open(int64_t index_id,
const std::string& index_suffix) const;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -180,6 +180,7 @@ class DorisFSDirectory::FSIndexInput : public lucene::store::BufferedIndexInput
: BufferedIndexInput(buffer_size) {
this->_pos = 0;
this->_handle = std::move(handle);
_io_ctx.is_inverted_index = true;
}

protected:
Expand Down
10 changes: 7 additions & 3 deletions be/src/olap/rowset/segment_v2/inverted_index_reader.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -121,7 +121,8 @@ Status InvertedIndexReader::read_null_bitmap(const io::IOContext* io_ctx,

if (!dir) {
// TODO: ugly code here, try to refact.
auto st = _inverted_index_file_reader->init(config::inverted_index_read_buffer_size);
auto st = _inverted_index_file_reader->init(config::inverted_index_read_buffer_size,
io_ctx);
if (!st.ok()) {
LOG(WARNING) << st;
return st;
Expand All @@ -137,7 +138,6 @@ Status InvertedIndexReader::read_null_bitmap(const io::IOContext* io_ctx,
InvertedIndexDescriptor::get_temporary_null_bitmap_file_name();
if (dir->fileExists(null_bitmap_file_name)) {
null_bitmap_in = dir->openInput(null_bitmap_file_name);
null_bitmap_in->setIoContext(io_ctx);
size_t null_bitmap_size = null_bitmap_in->length();
faststring buf;
buf.resize(null_bitmap_size);
Expand Down Expand Up @@ -180,7 +180,8 @@ Status InvertedIndexReader::handle_searcher_cache(
SCOPED_RAW_TIMER(&stats->inverted_index_searcher_open_timer);
IndexSearcherPtr searcher;

auto st = _inverted_index_file_reader->init(config::inverted_index_read_buffer_size);
auto st =
_inverted_index_file_reader->init(config::inverted_index_read_buffer_size, io_ctx);
if (!st.ok()) {
LOG(WARNING) << st;
return st;
Expand Down Expand Up @@ -211,6 +212,9 @@ Status InvertedIndexReader::create_index_searcher(lucene::store::Directory* dir,
auto searcher_result = DORIS_TRY(index_searcher_builder->get_index_searcher(dir));
*searcher = searcher_result;

// When the meta information has been read, the ioContext needs to be reset to prevent it from being used by other queries.
static_cast<DorisCompoundReader*>(dir)->getDorisIndexInput()->setIoContext(nullptr);

// NOTE: before mem_tracker hook becomes active, we caculate reader memory size by hand.
mem_tracker->consume(index_searcher_builder->get_reader_size());
return Status::OK();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,6 @@ suite("test_index_io_context", "nonConcurrent") {

try {
GetDebugPoint().enableDebugPointForAllBEs("DorisFSDirectory::FSIndexInput::readInternal")

qt_sql """ select count() from ${tableName1} where request match_any 'ticket_quest_bg2.jpg'; """
qt_sql """ select count() from ${tableName1} where request match_any 'ticket_quest_bg2.jpg'; """
qt_sql """ select count() from ${tableName1} where request match_any 'ticket_quest_bg2.jpg'; """
Expand Down
Loading