Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion be/src/index-tools/index_tool.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -170,7 +170,7 @@ void search(lucene::store::Directory* dir, std::string& field, std::string& toke
std::vector<std::string> terms = split(token, '|');

doris::TQueryOptions queryOptions;
ConjunctionQuery conjunct_query(s, queryOptions);
ConjunctionQuery conjunct_query(s, queryOptions, nullptr);
conjunct_query.add(field_ws, terms);
conjunct_query.search(result);

Expand Down
9 changes: 3 additions & 6 deletions be/src/olap/compaction.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -613,11 +613,9 @@ Status Compaction::do_inverted_index_compaction() {
fs, std::string {InvertedIndexDescriptor::get_index_file_path_prefix(seg_path)},
_cur_tablet_schema->get_inverted_index_storage_format(),
rowset->rowset_meta()->inverted_index_file_info(seg_id));
bool open_idx_file_cache = false;
RETURN_NOT_OK_STATUS_WITH_WARN(
inverted_index_file_reader->init(config::inverted_index_read_buffer_size,
open_idx_file_cache),
"inverted_index_file_reader init failed");
inverted_index_file_reader->init(config::inverted_index_read_buffer_size),
"inverted_index_file_reader init faiqled");
inverted_index_file_readers[m.second] = std::move(inverted_index_file_reader);
}

Expand Down Expand Up @@ -785,9 +783,8 @@ void Compaction::construct_index_compaction_columns(RowsetWriterContext& ctx) {
InvertedIndexDescriptor::get_index_file_path_prefix(*seg_path)},
_cur_tablet_schema->get_inverted_index_storage_format(),
rowset->rowset_meta()->inverted_index_file_info(i));
bool open_idx_file_cache = false;
auto st = inverted_index_file_reader->init(
config::inverted_index_read_buffer_size, open_idx_file_cache);
config::inverted_index_read_buffer_size);
index_file_path = inverted_index_file_reader->get_index_file_path(index_meta);
DBUG_EXECUTE_IF(
"Compaction::construct_skip_inverted_index_index_file_reader_init_"
Expand Down
2 changes: 1 addition & 1 deletion be/src/olap/rowset/segment_v2/column_reader.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -344,7 +344,7 @@ Status ColumnReader::new_inverted_index_iterator(
{
std::shared_lock<std::shared_mutex> rlock(_load_index_lock);
if (_inverted_index) {
RETURN_IF_ERROR(_inverted_index->new_iterator(read_options.stats,
RETURN_IF_ERROR(_inverted_index->new_iterator(read_options.io_ctx, read_options.stats,
read_options.runtime_state, iterator));
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,9 @@
namespace doris::segment_v2 {

ConjunctionQuery::ConjunctionQuery(const std::shared_ptr<lucene::search::IndexSearcher>& searcher,
const TQueryOptions& query_options)
const TQueryOptions& query_options, const io::IOContext* io_ctx)
: _searcher(searcher),
_io_ctx(io_ctx),
_index_version(_searcher->getReader()->getIndexVersion()),
_conjunction_ratio(query_options.inverted_index_conjunction_opt_threshold) {}

Expand All @@ -48,7 +49,7 @@ void ConjunctionQuery::add(const std::wstring& field_name, const std::vector<std
std::wstring ws_term = StringUtil::string_to_wstring(term);
Term* t = _CLNEW Term(field_name.c_str(), ws_term.c_str());
_terms.push_back(t);
TermDocs* term_doc = _searcher->getReader()->termDocs(t);
TermDocs* term_doc = _searcher->getReader()->termDocs(t, _io_ctx);
_term_docs.push_back(term_doc);
iterators.emplace_back(term_doc);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ namespace doris::segment_v2 {
class ConjunctionQuery : public Query {
public:
ConjunctionQuery(const std::shared_ptr<lucene::search::IndexSearcher>& searcher,
const TQueryOptions& query_options);
const TQueryOptions& query_options, const io::IOContext* io_ctx);
~ConjunctionQuery() override;

void add(const std::wstring& field_name, const std::vector<std::string>& terms) override;
Expand All @@ -41,6 +41,7 @@ class ConjunctionQuery : public Query {

public:
std::shared_ptr<lucene::search::IndexSearcher> _searcher;
const io::IOContext* _io_ctx = nullptr;

IndexVersion _index_version = IndexVersion::kV0;
int32_t _conjunction_ratio = 1000;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,8 @@
namespace doris::segment_v2 {

DisjunctionQuery::DisjunctionQuery(const std::shared_ptr<lucene::search::IndexSearcher>& searcher,
const TQueryOptions& query_options)
: _searcher(searcher) {}
const TQueryOptions& query_options, const io::IOContext* io_ctx)
: _searcher(searcher), _io_ctx(io_ctx) {}

void DisjunctionQuery::add(const std::wstring& field_name, const std::vector<std::string>& terms) {
if (terms.empty()) {
Expand All @@ -36,7 +36,7 @@ void DisjunctionQuery::search(roaring::Roaring& roaring) {
auto func = [this, &roaring](const std::string& term, bool first) {
std::wstring ws_term = StringUtil::string_to_wstring(term);
auto* t = _CLNEW Term(_field_name.c_str(), ws_term.c_str());
auto* term_doc = _searcher->getReader()->termDocs(t);
auto* term_doc = _searcher->getReader()->termDocs(t, _io_ctx);
TermIterator iterator(term_doc);

DocRange doc_range;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,14 +27,15 @@ namespace doris::segment_v2 {
class DisjunctionQuery : public Query {
public:
DisjunctionQuery(const std::shared_ptr<lucene::search::IndexSearcher>& searcher,
const TQueryOptions& query_options);
const TQueryOptions& query_options, const io::IOContext* io_ctx);
~DisjunctionQuery() override = default;

void add(const std::wstring& field_name, const std::vector<std::string>& terms) override;
void search(roaring::Roaring& roaring) override;

private:
std::shared_ptr<lucene::search::IndexSearcher> _searcher;
const io::IOContext* _io_ctx = nullptr;

std::wstring _field_name;
std::vector<std::string> _terms;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@
namespace doris::segment_v2 {

PhraseEdgeQuery::PhraseEdgeQuery(const std::shared_ptr<lucene::search::IndexSearcher>& searcher,
const TQueryOptions& query_options)
const TQueryOptions& query_options, const io::IOContext* io_ctx)
: _searcher(searcher),
_query(std::make_unique<CL_NS(search)::MultiPhraseQuery>()),
_max_expansions(query_options.inverted_index_max_expansions) {}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ namespace doris::segment_v2 {
class PhraseEdgeQuery : public Query {
public:
PhraseEdgeQuery(const std::shared_ptr<lucene::search::IndexSearcher>& searcher,
const TQueryOptions& query_options);
const TQueryOptions& query_options, const io::IOContext* io_ctx);
~PhraseEdgeQuery() override = default;

void add(const std::wstring& field_name, const std::vector<std::string>& terms) override;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,8 @@
namespace doris::segment_v2 {

PhrasePrefixQuery::PhrasePrefixQuery(const std::shared_ptr<lucene::search::IndexSearcher>& searcher,
const TQueryOptions& query_options)
const TQueryOptions& query_options,
const io::IOContext* io_ctx)
: _searcher(searcher),
_query(std::make_unique<CL_NS(search)::MultiPhraseQuery>()),
_max_expansions(query_options.inverted_index_max_expansions) {}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ namespace doris::segment_v2 {
class PhrasePrefixQuery : public Query {
public:
PhrasePrefixQuery(const std::shared_ptr<lucene::search::IndexSearcher>& searcher,
const TQueryOptions& query_options);
const TQueryOptions& query_options, const io::IOContext* io_ctx);
~PhrasePrefixQuery() override = default;

void add(const std::wstring& field_name, const std::vector<std::string>& terms) override;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -123,8 +123,8 @@ bool OrderedSloppyPhraseMatcher::stretch_to_order(PostingsAndPosition* prev_post
}

PhraseQuery::PhraseQuery(const std::shared_ptr<lucene::search::IndexSearcher>& searcher,
const TQueryOptions& query_options)
: _searcher(searcher) {}
const TQueryOptions& query_options, const io::IOContext* io_ctx)
: _searcher(searcher), _io_ctx(io_ctx) {}

PhraseQuery::~PhraseQuery() {
for (auto& term_doc : _term_docs) {
Expand Down Expand Up @@ -173,7 +173,7 @@ void PhraseQuery::add(const std::wstring& field_name, const std::vector<std::str
std::wstring ws_term = StringUtil::string_to_wstring(terms[0]);
Term* t = _CLNEW Term(field_name.c_str(), ws_term.c_str());
_terms.push_back(t);
TermDocs* term_doc = _searcher->getReader()->termDocs(t);
TermDocs* term_doc = _searcher->getReader()->termDocs(t, _io_ctx);
_term_docs.push_back(term_doc);
_lead1 = TermIterator(term_doc);
return;
Expand All @@ -185,7 +185,7 @@ void PhraseQuery::add(const std::wstring& field_name, const std::vector<std::str
std::wstring ws_term = StringUtil::string_to_wstring(term);
Term* t = _CLNEW Term(field_name.c_str(), ws_term.c_str());
_terms.push_back(t);
TermPositions* term_pos = _searcher->getReader()->termPositions(t);
TermPositions* term_pos = _searcher->getReader()->termPositions(t, _io_ctx);
_term_docs.push_back(term_pos);
if (is_save_iter) {
iterators.emplace_back(term_pos);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,7 @@ using Matcher = std::variant<ExactPhraseMatcher, OrderedSloppyPhraseMatcher>;
class PhraseQuery : public Query {
public:
PhraseQuery(const std::shared_ptr<lucene::search::IndexSearcher>& searcher,
const TQueryOptions& query_options);
const TQueryOptions& query_options, const io::IOContext* io_ctx);
~PhraseQuery() override;

void add(const InvertedIndexQueryInfo& query_info) override;
Expand All @@ -112,6 +112,7 @@ class PhraseQuery : public Query {

private:
std::shared_ptr<lucene::search::IndexSearcher> _searcher;
const io::IOContext* _io_ctx = nullptr;

TermIterator _lead1;
TermIterator _lead2;
Expand Down
1 change: 1 addition & 0 deletions be/src/olap/rowset/segment_v2/inverted_index/query/query.h
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
#include <memory>

#include "common/status.h"
#include "io/io_common.h"
#include "roaring/roaring.hh"

CL_NS_USE(index)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,10 +25,10 @@
namespace doris::segment_v2 {

RegexpQuery::RegexpQuery(const std::shared_ptr<lucene::search::IndexSearcher>& searcher,
const TQueryOptions& query_options)
const TQueryOptions& query_options, const io::IOContext* io_ctx)
: _searcher(searcher),
_max_expansions(query_options.inverted_index_max_expansions),
_query(searcher, query_options) {}
_query(searcher, query_options, io_ctx) {}

void RegexpQuery::add(const std::wstring& field_name, const std::vector<std::string>& patterns) {
if (patterns.size() != 1) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ namespace doris::segment_v2 {
class RegexpQuery : public Query {
public:
RegexpQuery(const std::shared_ptr<lucene::search::IndexSearcher>& searcher,
const TQueryOptions& query_options);
const TQueryOptions& query_options, const io::IOContext* io_ctx);
~RegexpQuery() override = default;

void add(const std::wstring& field_name, const std::vector<std::string>& patterns) override;
Expand Down
15 changes: 15 additions & 0 deletions be/src/olap/rowset/segment_v2/inverted_index_compound_reader.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,8 @@ class CSIndexInput : public lucene::store::BufferedIndexInput {
CL_NS(store)::IndexInput* base;
int64_t fileOffset;
int64_t _length;
const io::IOContext* _io_ctx = nullptr;
bool _is_index_file = false; // Indicates if the file is a TII file

protected:
void readInternal(uint8_t* /*b*/, const int32_t /*len*/) override;
Expand All @@ -75,6 +77,8 @@ class CSIndexInput : public lucene::store::BufferedIndexInput {
const char* getDirectoryType() const override { return DorisCompoundReader::getClassName(); }
const char* getObjectName() const override { return getClassName(); }
static const char* getClassName() { return "CSIndexInput"; }
void setIoContext(const void* io_ctx) override;
void setIndexFile(bool isIndexFile) override;
};

CSIndexInput::CSIndexInput(CL_NS(store)::IndexInput* base, const int64_t fileOffset,
Expand All @@ -92,9 +96,12 @@ void CSIndexInput::readInternal(uint8_t* b, const int32_t len) {
if (start + len > _length) {
_CLTHROWA(CL_ERR_IO, "read past EOF");
}
base->setIoContext(_io_ctx);
base->setIndexFile(_is_index_file);
base->seek(fileOffset + start);
bool read_from_buffer = true;
base->readBytes(b, len, read_from_buffer);
base->setIoContext(nullptr);
}

CSIndexInput::~CSIndexInput() = default;
Expand All @@ -111,6 +118,14 @@ CSIndexInput::CSIndexInput(const CSIndexInput& clone) : BufferedIndexInput(clone

void CSIndexInput::close() {}

void CSIndexInput::setIoContext(const void* io_ctx) {
_io_ctx = static_cast<const io::IOContext*>(io_ctx);
}

void CSIndexInput::setIndexFile(bool isIndexFile) {
_is_index_file = isIndexFile;
}

DorisCompoundReader::DorisCompoundReader(CL_NS(store)::IndexInput* stream, int32_t read_buffer_size)
: _ram_dir(new lucene::store::RAMDirectory()),
_stream(stream),
Expand Down
5 changes: 1 addition & 4 deletions be/src/olap/rowset/segment_v2/inverted_index_file_reader.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -27,10 +27,9 @@

namespace doris::segment_v2 {

Status InvertedIndexFileReader::init(int32_t read_buffer_size, bool open_idx_file_cache) {
Status InvertedIndexFileReader::init(int32_t read_buffer_size) {
if (!_inited) {
_read_buffer_size = read_buffer_size;
_open_idx_file_cache = open_idx_file_cache;
if (_storage_format == InvertedIndexStorageFormatPB::V2) {
auto st = _init_from_v2(read_buffer_size);
if (!st.ok()) {
Expand Down Expand Up @@ -76,7 +75,6 @@ Status InvertedIndexFileReader::_init_from_v2(int32_t read_buffer_size) {
"CLuceneError occur when open idx file {}, error msg: {}", index_file_full_path,
err.what());
}
index_input->setIdxFileCache(_open_idx_file_cache);
_stream = std::unique_ptr<CL_NS(store)::IndexInput>(index_input);

// 3. read file
Expand Down Expand Up @@ -198,7 +196,6 @@ Result<std::unique_ptr<DorisCompoundReader>> InvertedIndexFileReader::_open(
}

// 3. read file in DorisCompoundReader
index_input->setIdxFileCache(_open_idx_file_cache);
compound_reader = std::make_unique<DorisCompoundReader>(index_input, _read_buffer_size);
} catch (CLuceneError& err) {
return ResultError(Status::Error<ErrorCode::INVERTED_INDEX_CLUCENE_ERROR>(
Expand Down
4 changes: 1 addition & 3 deletions be/src/olap/rowset/segment_v2/inverted_index_file_reader.h
Original file line number Diff line number Diff line change
Expand Up @@ -58,8 +58,7 @@ class InvertedIndexFileReader {
_storage_format(storage_format),
_idx_file_info(idx_file_info) {}

Status init(int32_t read_buffer_size = config::inverted_index_read_buffer_size,
bool open_idx_file_cache = false);
Status init(int32_t read_buffer_size = config::inverted_index_read_buffer_size);
Result<std::unique_ptr<DorisCompoundReader>> open(const TabletIndex* index_meta) const;
void debug_file_entries();
std::string get_index_file_cache_key(const TabletIndex* index_meta) const;
Expand All @@ -80,7 +79,6 @@ class InvertedIndexFileReader {
const io::FileSystemSPtr _fs;
std::string _index_path_prefix;
int32_t _read_buffer_size = -1;
bool _open_idx_file_cache = false;
InvertedIndexStorageFormatPB _storage_format;
mutable std::shared_mutex _mutex; // Use mutable for const read operations
bool _inited = false;
Expand Down
37 changes: 36 additions & 1 deletion be/src/olap/rowset/segment_v2/inverted_index_fs_directory.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -219,6 +219,27 @@ void DorisFSDirectory::FSIndexInput::close() {
}*/
}

void DorisFSDirectory::FSIndexInput::setIoContext(const void* io_ctx) {
if (io_ctx) {
const auto& ctx = static_cast<const io::IOContext*>(io_ctx);
_io_ctx.reader_type = ctx->reader_type;
_io_ctx.query_id = ctx->query_id;
_io_ctx.file_cache_stats = ctx->file_cache_stats;
} else {
_io_ctx.reader_type = ReaderType::UNKNOWN;
_io_ctx.query_id = nullptr;
_io_ctx.file_cache_stats = nullptr;
}
}

const void* DorisFSDirectory::FSIndexInput::getIoContext() {
return &_io_ctx;
}

void DorisFSDirectory::FSIndexInput::setIndexFile(bool isIndexFile) {
_io_ctx.is_index_data = isIndexFile;
}

void DorisFSDirectory::FSIndexInput::seekInternal(const int64_t position) {
CND_PRECONDITION(position >= 0 && position < _handle->_length, "Seeking out of range");
_pos = position;
Expand All @@ -239,9 +260,23 @@ void DorisFSDirectory::FSIndexInput::readInternal(uint8_t* b, const int32_t len)
_handle->_fpos = _pos;
}

DBUG_EXECUTE_IF(
"DorisFSDirectory::FSIndexInput::readInternal", ({
static thread_local std::unordered_map<const TUniqueId*, io::FileCacheStatistics*>
thread_file_cache_map;
auto it = thread_file_cache_map.find(_io_ctx.query_id);
if (it != thread_file_cache_map.end()) {
if (_io_ctx.file_cache_stats != it->second) {
_CLTHROWA(CL_ERR_IO, "File cache statistics mismatch");
}
} else {
thread_file_cache_map[_io_ctx.query_id] = _io_ctx.file_cache_stats;
}
}));

Slice result {b, (size_t)len};
size_t bytes_read = 0;
auto st = _handle->_reader->read_at(_pos, result, &bytes_read, &_io_ctx);
Status st = _handle->_reader->read_at(_pos, result, &bytes_read, &_io_ctx);
DBUG_EXECUTE_IF("DorisFSDirectory::FSIndexInput::readInternal_reader_read_at_error", {
st = Status::InternalError(
"debug point: DorisFSDirectory::FSIndexInput::readInternal_reader_read_at_error");
Expand Down
7 changes: 3 additions & 4 deletions be/src/olap/rowset/segment_v2/inverted_index_fs_directory.h
Original file line number Diff line number Diff line change
Expand Up @@ -180,8 +180,6 @@ class DorisFSDirectory::FSIndexInput : public lucene::store::BufferedIndexInput
: BufferedIndexInput(buffer_size) {
this->_pos = 0;
this->_handle = std::move(handle);
this->_io_ctx.reader_type = ReaderType::READER_QUERY;
this->_io_ctx.is_index_data = false;
}

protected:
Expand All @@ -199,8 +197,9 @@ class DorisFSDirectory::FSIndexInput : public lucene::store::BufferedIndexInput
const char* getDirectoryType() const override { return DorisFSDirectory::getClassName(); }
const char* getObjectName() const override { return getClassName(); }
static const char* getClassName() { return "FSIndexInput"; }

void setIdxFileCache(bool index) override { _io_ctx.is_index_data = index; }
void setIoContext(const void* io_ctx) override;
const void* getIoContext() override;
void setIndexFile(bool isIndexFile) override;

std::mutex _this_lock;

Expand Down
Loading