diff --git a/be/src/clucene b/be/src/clucene index 569398a5c96b4c..8d8f92ef8ddd0e 160000 --- a/be/src/clucene +++ b/be/src/clucene @@ -1 +1 @@ -Subproject commit 569398a5c96b4c626251ccbe81257945a3d2aef4 +Subproject commit 8d8f92ef8ddd0e50b6fc76f8f6572abaef1b5213 diff --git a/be/src/olap/rowset/segment_v2/inverted_index_compaction.cpp b/be/src/olap/rowset/segment_v2/inverted_index_compaction.cpp index dcbdca921ab8e8..5f3679aaeae350 100644 --- a/be/src/olap/rowset/segment_v2/inverted_index_compaction.cpp +++ b/be/src/olap/rowset/segment_v2/inverted_index_compaction.cpp @@ -17,6 +17,7 @@ #include "inverted_index_compaction.h" +#include "inverted_index_common.h" #include "inverted_index_file_writer.h" #include "inverted_index_fs_directory.h" #include "io/fs/local_file_system.h" @@ -41,15 +42,17 @@ Status compact_column(int64_t index_id, "debug point: index compaction error"); } }) + bool can_use_ram_dir = true; - lucene::store::Directory* dir = DorisFSDirectoryFactory::getDirectory( - io::global_local_filesystem(), tmp_path.data(), can_use_ram_dir); + std::unique_ptr dir( + DorisFSDirectoryFactory::getDirectory(io::global_local_filesystem(), tmp_path.data(), + can_use_ram_dir)); DBUG_EXECUTE_IF("compact_column_getDirectory_error", { _CLTHROWA(CL_ERR_IO, "debug point: compact_column_getDirectory_error in index compaction"); }) lucene::analysis::SimpleAnalyzer analyzer; - auto* index_writer = _CLNEW lucene::index::IndexWriter(dir, &analyzer, true /* create */, - true /* closeDirOnShutdown */); + std::unique_ptr index_writer(_CLNEW lucene::index::IndexWriter( + dir.get(), &analyzer, true /* create */, true /* closeDirOnShutdown */)); DBUG_EXECUTE_IF("compact_column_create_index_writer_error", { _CLTHROWA(CL_ERR_IO, "debug point: compact_column_create_index_writer_error in index compaction"); @@ -71,11 +74,6 @@ Status compact_column(int64_t index_id, _CLTHROWA(CL_ERR_IO, "debug point: compact_column_index_writer_close_error in index compaction"); }) - _CLDELETE(index_writer); - // NOTE: need to ref_cnt-- for dir, - // when index_writer is destroyed, if closeDir is set, dir will be close - // _CLDECDELETE(dir) will try to ref_cnt--, when it decreases to 1, dir will be destroyed. - _CLDECDELETE(dir) // delete temporary segment_path, only when inverted_index_ram_dir_enable is false if (!config::inverted_index_ram_dir_enable) { diff --git a/be/src/olap/rowset/segment_v2/inverted_index_compound_reader.cpp b/be/src/olap/rowset/segment_v2/inverted_index_compound_reader.cpp index 86efe86ca43389..37878689cfdde0 100644 --- a/be/src/olap/rowset/segment_v2/inverted_index_compound_reader.cpp +++ b/be/src/olap/rowset/segment_v2/inverted_index_compound_reader.cpp @@ -22,10 +22,10 @@ #include #include #include -#include -#include -#include +#include +#include +#include #include #include @@ -34,22 +34,13 @@ #include "olap/tablet_schema.h" #include "util/debug_points.h" -namespace doris { -namespace io { +namespace doris::io { class FileWriter; -} // namespace io -} // namespace doris +} // namespace doris::io #define BUFFER_LENGTH 16384 #define CL_MAX_PATH 4096 - -#define STRDUP_WtoA(x) CL_NS(util)::Misc::_wideToChar(x) -#define STRDUP_TtoA STRDUP_WtoA - -using FileWriterPtr = std::unique_ptr; - -namespace doris { -namespace segment_v2 { +namespace doris::segment_v2 { /** Implementation of an IndexInput that reads from a portion of the * compound file. @@ -94,7 +85,7 @@ CSIndexInput::CSIndexInput(CL_NS(store)::IndexInput* base, const std::string& fi void CSIndexInput::readInternal(uint8_t* b, const int32_t len) { std::lock_guard wlock(((DorisFSDirectory::FSIndexInput*)base)->_this_lock); - int64_t start = getFilePointer(); + auto start = getFilePointer(); if (start + len > _length) { _CLTHROWA(CL_ERR_IO, "read past EOF"); } @@ -153,56 +144,63 @@ void CSIndexInput::setIoContext(const void* io_ctx) { } DorisCompoundReader::DorisCompoundReader(CL_NS(store)::IndexInput* stream, - EntriesType* entries_clone, int32_t read_buffer_size, + const EntriesType& entries_clone, int32_t read_buffer_size, const io::IOContext* io_ctx) : _stream(stream), - _entries(_CLNEW EntriesType(true, true)), + _entries(std::make_unique()), _read_buffer_size(read_buffer_size) { // After stream clone, the io_ctx needs to be reconfigured. initialize(io_ctx); - for (auto& e : *entries_clone) { - auto* origin_entry = e.second; - auto* entry = _CLNEW ReaderFileEntry(); - char* aid = strdup(e.first); + for (const auto& e : entries_clone) { + const auto& origin_entry = e.second; + auto entry = std::make_unique(); entry->file_name = origin_entry->file_name; entry->offset = origin_entry->offset; entry->length = origin_entry->length; - _entries->put(aid, entry); + (*_entries)[e.first] = std::move(entry); } }; DorisCompoundReader::DorisCompoundReader(CL_NS(store)::IndexInput* stream, int32_t read_buffer_size, const io::IOContext* io_ctx) - : _ram_dir(new lucene::store::RAMDirectory()), + : _ram_dir(std::make_unique()), _stream(stream), - _entries(_CLNEW EntriesType(true, true)), + _entries(std::make_unique()), _read_buffer_size(read_buffer_size) { // After stream clone, the io_ctx needs to be reconfigured. initialize(io_ctx); try { int32_t count = _stream->readVInt(); - ReaderFileEntry* entry = nullptr; - TCHAR tid[CL_MAX_PATH]; uint8_t buffer[BUFFER_LENGTH]; for (int32_t i = 0; i < count; i++) { - entry = _CLNEW ReaderFileEntry(); - stream->readString(tid, CL_MAX_PATH); - char* aid = STRDUP_TtoA(tid); - entry->file_name = aid; + auto entry = std::make_unique(); + // Read the string length first + int32_t string_length = stream->readVInt(); + // Allocate appropriate buffer for the string + std::wstring tid; + tid.resize(string_length); + // Read the string characters directly + stream->readChars(tid.data(), 0, string_length); + std::string file_name_str(tid.begin(), tid.end()); + entry->file_name = file_name_str; entry->offset = stream->readLong(); entry->length = stream->readLong(); + VLOG_DEBUG << "string_length:" << string_length << " file_name:" << entry->file_name + << " offset:" << entry->offset << " length:" << entry->length; DBUG_EXECUTE_IF("construct_DorisCompoundReader_failed", { CLuceneError err; err.set(CL_ERR_IO, "construct_DorisCompoundReader_failed"); throw err; }) - _entries->put(aid, entry); // read header file data if (entry->offset < 0) { - copyFile(entry->file_name.c_str(), entry->length, buffer, BUFFER_LENGTH); + //if offset is -1, it means it's size is lower than DorisFSDirectory::MAX_HEADER_DATA_SIZE, which is 128k. + copyFile(entry->file_name.c_str(), static_cast(entry->length), buffer, + BUFFER_LENGTH); } + _entries->emplace(std::move(file_name_str), std::move(entry)); } } catch (...) { try { @@ -212,11 +210,9 @@ DorisCompoundReader::DorisCompoundReader(CL_NS(store)::IndexInput* stream, int32 } if (_entries != nullptr) { _entries->clear(); - _CLDELETE(_entries); } if (_ram_dir) { _ram_dir->close(); - _CLDELETE(_ram_dir) } } catch (CLuceneError& err) { if (err.number() != CL_ERR_IO) { @@ -231,11 +227,12 @@ void DorisCompoundReader::copyFile(const char* file, int64_t file_length, uint8_ int64_t buffer_length) { std::unique_ptr output(_ram_dir->createOutput(file)); int64_t start_ptr = output->getFilePointer(); - int64_t remainder = file_length; - int64_t chunk = buffer_length; + auto remainder = file_length; + auto chunk = buffer_length; + auto batch_len = file_length < chunk ? file_length : chunk; while (remainder > 0) { - int64_t len = std::min(std::min(chunk, file_length), remainder); + auto len = remainder < batch_len ? remainder : batch_len; _stream->readBytes(buffer, len); output->writeBytes(buffer, len); remainder -= len; @@ -270,7 +267,6 @@ DorisCompoundReader::~DorisCompoundReader() { LOG(ERROR) << "DorisCompoundReader finalize error:" << err.what(); } } - _CLDELETE(_entries) } const char* DorisCompoundReader::getClassName() { @@ -284,8 +280,8 @@ bool DorisCompoundReader::list(std::vector* names) const { if (_closed || _entries == nullptr) { _CLTHROWA(CL_ERR_IO, "DorisCompoundReader is already closed"); } - for (EntriesType::const_iterator i = _entries->begin(); i != _entries->end(); i++) { - names->push_back(i->first); + for (const auto& entry : *_entries) { + names->push_back(entry.first); } return true; } @@ -294,7 +290,7 @@ bool DorisCompoundReader::fileExists(const char* name) const { if (_closed || _entries == nullptr) { _CLTHROWA(CL_ERR_IO, "DorisCompoundReader is already closed"); } - return _entries->exists((char*)name); + return _entries->find(std::string(name)) != _entries->end(); } int64_t DorisCompoundReader::fileModified(const char* name) const { @@ -305,15 +301,15 @@ int64_t DorisCompoundReader::fileLength(const char* name) const { if (_closed || _entries == nullptr) { _CLTHROWA(CL_ERR_IO, "DorisCompoundReader is already closed"); } - ReaderFileEntry* e = _entries->get((char*)name); - if (e == nullptr) { + auto it = _entries->find(std::string(name)); + if (it == _entries->end()) { char buf[CL_MAX_PATH + 30]; strcpy(buf, "File "); strncat(buf, name, CL_MAX_PATH); strcat(buf, " does not exist"); _CLTHROWA(CL_ERR_IO, buf); } - return e->length; + return it->second->length; } bool DorisCompoundReader::openInput(const char* name, @@ -338,14 +334,16 @@ bool DorisCompoundReader::openInput(const char* name, lucene::store::IndexInput* return false; } - const ReaderFileEntry* entry = _entries->get((char*)name); - if (entry == nullptr) { + auto it = _entries->find(std::string(name)); + if (it == _entries->end()) { char buf[CL_MAX_PATH + 26]; snprintf(buf, CL_MAX_PATH + 26, "No sub-file with id %s found", name); error.set(CL_ERR_IO, buf); return false; } + const auto& entry = it->second; + // If file is in RAM, just return. if (_ram_dir && _ram_dir->fileExists(name)) { return _ram_dir->openInput(name, ret, error, bufferSize); @@ -374,7 +372,6 @@ void DorisCompoundReader::close() { } if (_ram_dir) { _ram_dir->close(); - _CLDELETE(_ram_dir) } _closed = true; } @@ -400,7 +397,7 @@ lucene::store::IndexOutput* DorisCompoundReader::createOutput(const char* /*name } std::string DorisCompoundReader::toString() const { - return std::string("DorisCompoundReader@"); + return "DorisCompoundReader@"; } CL_NS(store)::IndexInput* DorisCompoundReader::getDorisIndexInput() { @@ -412,5 +409,4 @@ void DorisCompoundReader::initialize(const io::IOContext* io_ctx) { _stream->setIdxFileCache(true); } -} // namespace segment_v2 -} // namespace doris +} // namespace doris::segment_v2 \ No newline at end of file diff --git a/be/src/olap/rowset/segment_v2/inverted_index_compound_reader.h b/be/src/olap/rowset/segment_v2/inverted_index_compound_reader.h index 4a687e4ed3ea75..09e6faaeb9198e 100644 --- a/be/src/olap/rowset/segment_v2/inverted_index_compound_reader.h +++ b/be/src/olap/rowset/segment_v2/inverted_index_compound_reader.h @@ -24,12 +24,13 @@ #include #include #include -#include +#include #include #include #include #include +#include #include #include @@ -43,9 +44,7 @@ namespace lucene::store { class RAMDirectory; } // namespace lucene::store -namespace doris { -class TabletIndex; -namespace segment_v2 { +namespace doris::segment_v2 { class ReaderFileEntry : LUCENE_BASE { public: @@ -60,16 +59,14 @@ class ReaderFileEntry : LUCENE_BASE { ~ReaderFileEntry() override = default; }; -using EntriesType = - lucene::util::CLHashMap>; +using EntriesType = std::unordered_map>; + class CLUCENE_EXPORT DorisCompoundReader : public lucene::store::Directory { private: - lucene::store::RAMDirectory* _ram_dir = nullptr; + std::unique_ptr _ram_dir; CL_NS(store)::IndexInput* _stream = nullptr; // The life cycle of _entries should be consistent with that of the DorisCompoundReader. - EntriesType* _entries = nullptr; + std::unique_ptr _entries; std::mutex _this_lock; bool _closed = false; int32_t _read_buffer_size = CL_NS(store)::BufferedIndexInput::BUFFER_SIZE; @@ -79,7 +76,7 @@ class CLUCENE_EXPORT DorisCompoundReader : public lucene::store::Directory { bool doDeleteFile(const char* name) override; public: - DorisCompoundReader(CL_NS(store)::IndexInput* stream, EntriesType* entries_clone, + DorisCompoundReader(CL_NS(store)::IndexInput* stream, const EntriesType& entries_clone, int32_t read_buffer_size = CL_NS(store)::BufferedIndexInput::BUFFER_SIZE, const io::IOContext* io_ctx = nullptr); DorisCompoundReader(CL_NS(store)::IndexInput* stream, @@ -109,5 +106,4 @@ class CLUCENE_EXPORT DorisCompoundReader : public lucene::store::Directory { void initialize(const io::IOContext* io_ctx); }; -} // namespace segment_v2 -} // namespace doris +} // namespace doris::segment_v2 diff --git a/be/src/olap/rowset/segment_v2/inverted_index_file_reader.cpp b/be/src/olap/rowset/segment_v2/inverted_index_file_reader.cpp index 1b9440ae14bec8..79c57949ca9b69 100644 --- a/be/src/olap/rowset/segment_v2/inverted_index_file_reader.cpp +++ b/be/src/olap/rowset/segment_v2/inverted_index_file_reader.cpp @@ -84,7 +84,6 @@ Status InvertedIndexFileReader::_init_from(int32_t read_buffer_size, const io::I if (version == InvertedIndexStorageFormatPB::V2) { DCHECK(version == _storage_format); int32_t numIndices = _stream->readInt(); // Read number of indices - ReaderFileEntry* entry = nullptr; for (int32_t i = 0; i < numIndices; ++i) { int64_t indexId = _stream->readLong(); // Read index ID @@ -95,23 +94,19 @@ Status InvertedIndexFileReader::_init_from(int32_t read_buffer_size, const io::I int32_t numFiles = _stream->readInt(); // Read number of files in the index - // true, true means it will deconstruct key and value - auto fileEntries = std::make_unique(true, true); + auto fileEntries = std::make_unique(); + fileEntries->reserve(numFiles); for (int32_t j = 0; j < numFiles; ++j) { - entry = _CLNEW ReaderFileEntry(); - int32_t file_name_length = _stream->readInt(); - // aid will destruct in EntriesType map. - char* aid = (char*)malloc(file_name_length + 1); - _stream->readBytes(reinterpret_cast(aid), file_name_length); - aid[file_name_length] = '\0'; - //stream->readString(tid, CL_MAX_PATH); - entry->file_name = std::string(aid, file_name_length); + std::string file_name(file_name_length, '\0'); + _stream->readBytes(reinterpret_cast(file_name.data()), + file_name_length); + auto entry = std::make_unique(); + entry->file_name = std::move(file_name); entry->offset = _stream->readLong(); entry->length = _stream->readLong(); - - fileEntries->put(aid, entry); + fileEntries->emplace(entry->file_name, std::move(entry)); } _indices_entries.emplace(std::make_pair(indexId, std::move(suffix_str)), @@ -223,8 +218,8 @@ Result> InvertedIndexFileReader::_open( errMsg.str())); } // Need to clone resource here, because index searcher cache need it. - compound_reader = std::make_unique( - _stream->clone(), index_it->second.get(), _read_buffer_size, io_ctx); + compound_reader = std::make_unique(_stream->clone(), *index_it->second, + _read_buffer_size, io_ctx); } return compound_reader; } @@ -290,13 +285,14 @@ Status InvertedIndexFileReader::has_null(const TabletIndex* index_meta, bool* re if (index_it == _indices_entries.end()) { *res = false; } else { - auto* entries = index_it->second.get(); - ReaderFileEntry* e = - entries->get((char*)InvertedIndexDescriptor::get_temporary_null_bitmap_file_name()); - if (e == nullptr) { + const auto& entries = index_it->second; + auto entry_it = + entries->find(InvertedIndexDescriptor::get_temporary_null_bitmap_file_name()); + if (entry_it == entries->end()) { *res = false; return Status::OK(); } + const auto& e = entry_it->second; // roaring bitmap cookie header size is 5 if (e->length <= 5) { *res = false; @@ -309,11 +305,11 @@ Status InvertedIndexFileReader::has_null(const TabletIndex* index_meta, bool* re void InvertedIndexFileReader::debug_file_entries() { std::shared_lock lock(_mutex); // Lock for reading - for (auto& index : _indices_entries) { + for (const auto& index : _indices_entries) { LOG(INFO) << "index_id:" << index.first.first; - auto* index_entries = index.second.get(); - for (auto& entry : (*index_entries)) { - ReaderFileEntry* file_entry = entry.second; + const auto& index_entries = index.second; + for (const auto& entry : *index_entries) { + const auto& file_entry = entry.second; LOG(INFO) << "file entry name:" << file_entry->file_name << " length:" << file_entry->length << " offset:" << file_entry->offset; } diff --git a/be/src/olap/rowset/segment_v2/inverted_index_file_reader.h b/be/src/olap/rowset/segment_v2/inverted_index_file_reader.h index c4264f9b462983..4b8f7d18e012e3 100644 --- a/be/src/olap/rowset/segment_v2/inverted_index_file_reader.h +++ b/be/src/olap/rowset/segment_v2/inverted_index_file_reader.h @@ -42,11 +42,7 @@ class DorisCompoundReader; class InvertedIndexFileReader { public: - using EntriesType = - lucene::util::CLHashMap>; - // Map to hold the file entries for each index ID. + using EntriesType = std::unordered_map>; using IndicesEntriesMap = std::map, std::unique_ptr>; @@ -56,7 +52,7 @@ class InvertedIndexFileReader { : _fs(std::move(fs)), _index_path_prefix(std::move(index_path_prefix)), _storage_format(storage_format), - _idx_file_info(idx_file_info) {} + _idx_file_info(std::move(idx_file_info)) {} Status init(int32_t read_buffer_size = config::inverted_index_read_buffer_size, const io::IOContext* io_ctx = nullptr); diff --git a/be/src/olap/rowset/segment_v2/inverted_index_file_writer.cpp b/be/src/olap/rowset/segment_v2/inverted_index_file_writer.cpp index 36d82bad1f2d51..08f52920471285 100644 --- a/be/src/olap/rowset/segment_v2/inverted_index_file_writer.cpp +++ b/be/src/olap/rowset/segment_v2/inverted_index_file_writer.cpp @@ -200,6 +200,7 @@ void InvertedIndexFileWriter::copyFile(const char* fileName, lucene::store::Dire lucene::store::IndexInput* tmp = nullptr; CLuceneError err; auto open = dir->openInput(fileName, tmp, err); + std::unique_ptr input(tmp); DBUG_EXECUTE_IF("InvertedIndexFileWriter::copyFile_openInput_error", { open = false; err.set(CL_ERR_IO, "debug point: copyFile_openInput_error"); @@ -212,7 +213,6 @@ void InvertedIndexFileWriter::copyFile(const char* fileName, lucene::store::Dire throw err; } - std::unique_ptr input(tmp); int64_t start_ptr = output->getFilePointer(); int64_t length = input->length(); int64_t remainder = length; @@ -426,15 +426,14 @@ InvertedIndexFileWriter::create_output_stream_v1(int64_t index_id, out_dir->set_file_writer_opts(_opts); std::unique_ptr out_dir_ptr(out_dir); - auto* out = out_dir->createOutput(idx_name.c_str()); + std::unique_ptr output(out_dir->createOutput(idx_name.c_str())); DBUG_EXECUTE_IF("InvertedIndexFileWriter::write_v1_out_dir_createOutput_nullptr", - { out = nullptr; }); - if (out == nullptr) { + { output = nullptr; }); + if (output == nullptr) { LOG(WARNING) << "InvertedIndexFileWriter::create_output_stream_v1 error: CompoundDirectory " "output is nullptr."; _CLTHROWA(CL_ERR_IO, "Create CompoundDirectory output error"); } - std::unique_ptr output(out); return {std::move(out_dir_ptr), std::move(output)}; } @@ -484,8 +483,7 @@ InvertedIndexFileWriter::create_output_stream_v2() { std::unique_ptr out_dir_ptr(out_dir); DCHECK(_idx_v2_writer != nullptr) << "inverted index file writer v2 is nullptr"; - auto compound_file_output = std::unique_ptr( - out_dir->createOutputV2(_idx_v2_writer.get())); + auto compound_file_output = out_dir->createOutputV2(_idx_v2_writer.get()); return {std::move(out_dir_ptr), std::move(compound_file_output)}; } diff --git a/be/src/olap/rowset/segment_v2/inverted_index_fs_directory.cpp b/be/src/olap/rowset/segment_v2/inverted_index_fs_directory.cpp index c633d29a7fc0c0..7bdca1941fa346 100644 --- a/be/src/olap/rowset/segment_v2/inverted_index_fs_directory.cpp +++ b/be/src/olap/rowset/segment_v2/inverted_index_fs_directory.cpp @@ -23,6 +23,7 @@ #include "inverted_index_desc.h" #include "io/fs/file_reader.h" #include "io/fs/file_writer.h" +#include "olap/rowset/segment_v2/inverted_index_common.h" #include "olap/tablet_schema.h" #include "util/debug_points.h" #include "util/slice.h" @@ -119,6 +120,7 @@ class DorisFSDirectory::FSIndexOutputV2 : public lucene::store::BufferedIndexOut bool DorisFSDirectory::FSIndexInput::open(const io::FileSystemSPtr& fs, const char* path, IndexInput*& ret, CLuceneError& error, int32_t buffer_size, int64_t file_size) { + // no throw error CND_PRECONDITION(path != nullptr, "path is NULL"); if (buffer_size == -1) { @@ -704,21 +706,42 @@ lucene::store::IndexOutput* DorisFSDirectory::createOutput(const char* name) { assert(!exists); } auto* ret = _CLNEW FSIndexOutput(); + ErrorContext error_context; ret->set_file_writer_opts(_opts); try { ret->init(_fs, fl); } catch (CLuceneError& err) { - ret->close(); - _CLDELETE(ret) - LOG(WARNING) << "FSIndexOutput init error: " << err.what(); - _CLTHROWA(CL_ERR_IO, "FSIndexOutput init error"); - } + error_context.eptr = std::current_exception(); + error_context.err_msg.append("FSIndexOutput init error: "); + error_context.err_msg.append(err.what()); + LOG(ERROR) << error_context.err_msg; + } + FINALLY_EXCEPTION({ + if (error_context.eptr) { + FINALLY_CLOSE(ret); + _CLDELETE(ret); + } + }) return ret; } -lucene::store::IndexOutput* DorisFSDirectory::createOutputV2(io::FileWriter* file_writer) { - auto* ret = _CLNEW FSIndexOutputV2(); - ret->init(file_writer); +std::unique_ptr DorisFSDirectory::createOutputV2( + io::FileWriter* file_writer) { + auto ret = std::make_unique(); + ErrorContext error_context; + try { + ret->init(file_writer); + } catch (CLuceneError& err) { + error_context.eptr = std::current_exception(); + error_context.err_msg.append("FSIndexOutputV2 init error: "); + error_context.err_msg.append(err.what()); + LOG(ERROR) << error_context.err_msg; + } + FINALLY_EXCEPTION({ + if (error_context.eptr) { + FINALLY_CLOSE(ret); + } + }) return ret; } diff --git a/be/src/olap/rowset/segment_v2/inverted_index_fs_directory.h b/be/src/olap/rowset/segment_v2/inverted_index_fs_directory.h index 0bba5b49756070..60e3a132aa4efd 100644 --- a/be/src/olap/rowset/segment_v2/inverted_index_fs_directory.h +++ b/be/src/olap/rowset/segment_v2/inverted_index_fs_directory.h @@ -79,7 +79,7 @@ class CLUCENE_EXPORT DorisFSDirectory : public lucene::store::Directory { void renameFile(const char* from, const char* to) override; void touchFile(const char* name) override; lucene::store::IndexOutput* createOutput(const char* name) override; - lucene::store::IndexOutput* createOutputV2(io::FileWriter* file_writer); + std::unique_ptr createOutputV2(io::FileWriter* file_writer); void close() override; std::string toString() const override; static const char* getClassName(); diff --git a/be/src/olap/rowset/segment_v2/inverted_index_searcher.cpp b/be/src/olap/rowset/segment_v2/inverted_index_searcher.cpp index 8d56b913b31c67..a8538dabab8756 100644 --- a/be/src/olap/rowset/segment_v2/inverted_index_searcher.cpp +++ b/be/src/olap/rowset/segment_v2/inverted_index_searcher.cpp @@ -21,6 +21,7 @@ #include #include "common/config.h" +#include "olap/rowset/segment_v2/inverted_index_common.h" #include "olap/rowset/segment_v2/inverted_index_compound_reader.h" #include "olap/rowset/segment_v2/inverted_index_desc.h" #include "olap/rowset/segment_v2/inverted_index_fs_directory.h" @@ -29,10 +30,10 @@ namespace doris::segment_v2 { Status FulltextIndexSearcherBuilder::build(lucene::store::Directory* directory, OptionalIndexSearcherPtr& output_searcher) { auto close_directory = true; - lucene::index::IndexReader* reader = nullptr; + std::unique_ptr reader; try { - reader = lucene::index::IndexReader::open( - directory, config::inverted_index_read_buffer_size, close_directory); + reader = std::unique_ptr(lucene::index::IndexReader::open( + directory, config::inverted_index_read_buffer_size, close_directory)); } catch (const CLuceneError& e) { std::vector file_names; directory->list(&file_names); @@ -44,16 +45,15 @@ Status FulltextIndexSearcherBuilder::build(lucene::store::Directory* directory, return Status::Error(msg); } bool close_reader = true; - auto index_searcher = std::make_shared(reader, close_reader); + reader_size = reader->getTermInfosRAMUsed(); + auto index_searcher = + std::make_shared(reader.release(), close_reader); if (!index_searcher) { output_searcher = std::nullopt; return Status::Error( "FulltextIndexSearcherBuilder build index_searcher error."); } - reader_size = reader->getTermInfosRAMUsed(); - // NOTE: need to cl_refcount-- here, so that directory will be deleted when - // index_searcher is destroyed - _CLDECDELETE(directory) + // NOTE: IndexSearcher takes ownership of the reader, and directory cleanup is handled by caller output_searcher = index_searcher; return Status::OK(); } @@ -69,7 +69,6 @@ Status BKDIndexSearcherBuilder::build(lucene::store::Directory* directory, } reader_size = bkd_reader->ram_bytes_used(); output_searcher = bkd_reader; - _CLDECDELETE(directory) return Status::OK(); } catch (const CLuceneError& e) { return Status::Error( @@ -104,13 +103,13 @@ Result> IndexSearcherBuilder::create_index Result IndexSearcherBuilder::get_index_searcher( lucene::store::Directory* directory) { OptionalIndexSearcherPtr result; - auto st = build(directory, result); + std::unique_ptr directory_ptr(directory); + + auto st = build(directory_ptr.get(), result); if (!st.ok()) { - _CLDECDELETE(directory) return ResultError(st); } if (!result.has_value()) { - _CLDECDELETE(directory) return ResultError(Status::Error( "InvertedIndexSearcherCache build error.")); } diff --git a/be/src/olap/rowset/segment_v2/inverted_index_writer.cpp b/be/src/olap/rowset/segment_v2/inverted_index_writer.cpp index d8688a34acc8c6..827d8127d21d97 100644 --- a/be/src/olap/rowset/segment_v2/inverted_index_writer.cpp +++ b/be/src/olap/rowset/segment_v2/inverted_index_writer.cpp @@ -404,8 +404,6 @@ class InvertedIndexColumnWriterImpl : public InvertedIndexColumnWriter { Status add_array_values(size_t field_size, const void* value_ptr, const uint8_t* nested_null_map, const uint8_t* offsets_ptr, size_t count) override { - DBUG_EXECUTE_IF("InvertedIndexColumnWriterImpl::add_array_values_count_is_zero", - { count = 0; }) if (count == 0) { // no values to add inverted index return Status::OK(); @@ -425,7 +423,7 @@ class InvertedIndexColumnWriterImpl : public InvertedIndexColumnWriter { // every single array row element size to go through the nullmap & value ptr-array, and also can go through the every row in array to keep with _rid++ auto array_elem_size = offsets[i + 1] - offsets[i]; // TODO(Amory).later we use object pool to avoid field creation - lucene::document::Field* new_field = nullptr; + std::unique_ptr new_field; CL_NS(analysis)::TokenStream* ts = nullptr; for (auto j = start_off; j < start_off + array_elem_size; ++j) { if (nested_null_map && nested_null_map[j] == 1) { @@ -440,7 +438,9 @@ class InvertedIndexColumnWriterImpl : public InvertedIndexColumnWriter { continue; } else { // now we temp create field . later make a pool - Status st = create_field(&new_field); + lucene::document::Field* tmp_field = nullptr; + Status st = create_field(&tmp_field); + new_field.reset(tmp_field); DBUG_EXECUTE_IF( "InvertedIndexColumnWriterImpl::add_array_values_create_field_" "error", @@ -469,9 +469,9 @@ class InvertedIndexColumnWriterImpl : public InvertedIndexColumnWriter { char_string_reader.release()); new_field->setValue(ts, own_token_stream); } else { - new_field_char_value(v->get_data(), v->get_size(), new_field); + new_field_char_value(v->get_data(), v->get_size(), new_field.get()); } - _doc->add(*new_field); + _doc->add(*new_field.release()); } } start_off += array_elem_size; @@ -500,7 +500,9 @@ class InvertedIndexColumnWriterImpl : public InvertedIndexColumnWriter { // avoid to add doc which without any field which may make threadState init skip // init fieldDataArray, then will make error with next doc with fields in // resetCurrentFieldData - Status st = create_field(&new_field); + lucene::document::Field* tmp_field = nullptr; + Status st = create_field(&tmp_field); + new_field.reset(tmp_field); DBUG_EXECUTE_IF( "InvertedIndexColumnWriterImpl::add_array_values_create_field_error_2", { @@ -513,7 +515,7 @@ class InvertedIndexColumnWriterImpl : public InvertedIndexColumnWriter { << " error:" << st; return st; } - _doc->add(*new_field); + _doc->add(*new_field.release()); RETURN_IF_ERROR(add_null_document()); _doc->clear(); } @@ -774,7 +776,7 @@ Status InvertedIndexColumnWriter::create(const Field* field, } DBUG_EXECUTE_IF("InvertedIndexColumnWriter::create_unsupported_type_for_inverted_index", - { type = FieldType::OLAP_FIELD_TYPE_FLOAT; }) + { type = FieldType::OLAP_FIELD_TYPE_HLL; }) switch (type) { #define M(TYPE) \ case TYPE: \ diff --git a/regression-test/suites/fault_injection_p0/test_write_inverted_index_exception_fault_injection.groovy b/regression-test/suites/fault_injection_p0/test_write_inverted_index_exception_fault_injection.groovy index 9fbb245c243ea9..52ce8f87a7443a 100644 --- a/regression-test/suites/fault_injection_p0/test_write_inverted_index_exception_fault_injection.groovy +++ b/regression-test/suites/fault_injection_p0/test_write_inverted_index_exception_fault_injection.groovy @@ -219,7 +219,6 @@ suite("test_write_inverted_index_exception_fault_injection", "nonConcurrent") { "InvertedIndexColumnWriterImpl::new_char_token_stream__char_string_reader_init_error", "InvertedIndexColumnWriterImpl::add_values_field_is_nullptr", "InvertedIndexColumnWriterImpl::add_values_index_writer_is_nullptr", - "InvertedIndexColumnWriterImpl::add_array_values_count_is_zero", "InvertedIndexColumnWriterImpl::add_array_values_index_writer_is_nullptr", "InvertedIndexColumnWriterImpl::add_array_values_create_field_error", "InvertedIndexColumnWriterImpl::add_array_values_create_field_error_2", @@ -262,13 +261,7 @@ suite("test_write_inverted_index_exception_fault_injection", "nonConcurrent") { GetDebugPoint().enableDebugPointForAllBEs(debug_point) run_insert("${tableName}") check_count("${tableName}", 6) - // if debug_point equals InvertedIndexColumnWriterImpl::add_array_values_count_is_zero, run_select(false(abnormal)) - // else run_select(true(normal)) - if (debug_point == "InvertedIndexColumnWriterImpl::add_array_values_count_is_zero") { - run_select("${tableName}", false) - } else { - run_select("${tableName}", true) - } + run_select("${tableName}", true) sql "TRUNCATE TABLE ${tableName}" } catch (Exception e) { log.error("Caught exception: ${e}")