diff --git a/be/src/olap/rowset/segment_v2/inverted_index_common.h b/be/src/olap/rowset/segment_v2/inverted_index_common.h new file mode 100644 index 00000000000000..1fdb7df2931de4 --- /dev/null +++ b/be/src/olap/rowset/segment_v2/inverted_index_common.h @@ -0,0 +1,103 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#pragma once + +#include // IWYU pragma: keep + +#include + +#include "common/logging.h" + +namespace lucene::store { +class Directory; +} // namespace lucene::store + +namespace doris::segment_v2 { + +struct DirectoryDeleter { + void operator()(lucene::store::Directory* ptr) const { _CLDECDELETE(ptr); } +}; + +struct ErrorContext { + std::string err_msg; + std::exception_ptr eptr; +}; + +template +concept HasClose = requires(T t) { + { t->close() }; +}; + +template + requires HasClose +void finally_close(PtrType& resource, ErrorContext& error_context) { + if (resource) { + try { + resource->close(); + } catch (CLuceneError& err) { + error_context.eptr = std::current_exception(); + error_context.err_msg.append("Error occurred while closing resource: "); + error_context.err_msg.append(err.what()); + LOG(ERROR) << error_context.err_msg; + } catch (...) { + error_context.eptr = std::current_exception(); + error_context.err_msg.append("Error occurred while closing resource"); + LOG(ERROR) << error_context.err_msg; + } + } +} + +#if defined(__clang__) +#pragma clang diagnostic push +#pragma clang diagnostic ignored "-Wunused-macros" +#endif + +#define FINALLY_CLOSE(resource) \ + { \ + static_assert(sizeof(error_context) > 0, \ + "error_context must be defined before using FINALLY macro!"); \ + finally_close(resource, error_context); \ + } + +// Return ERROR after finally +#define FINALLY(finally_block) \ + { \ + static_assert(sizeof(error_context) > 0, \ + "error_context must be defined before using FINALLY macro!"); \ + finally_block; \ + if (error_context.eptr) { \ + return Status::Error(error_context.err_msg); \ + } \ + } + +// Re-throw the exception after finally +#define FINALLY_EXCEPTION(finally_block) \ + { \ + static_assert(sizeof(error_context) > 0, \ + "error_context must be defined before using FINALLY macro!"); \ + finally_block; \ + if (error_context.eptr) { \ + std::rethrow_exception(error_context.eptr); \ + } \ + } + +#if defined(__clang__) +#pragma clang diagnostic pop +#endif + +} // namespace doris::segment_v2 \ No newline at end of file diff --git a/be/src/olap/rowset/segment_v2/inverted_index_compound_reader.cpp b/be/src/olap/rowset/segment_v2/inverted_index_compound_reader.cpp index 08ab1b6cc889b1..4ea6524053473e 100644 --- a/be/src/olap/rowset/segment_v2/inverted_index_compound_reader.cpp +++ b/be/src/olap/rowset/segment_v2/inverted_index_compound_reader.cpp @@ -114,9 +114,9 @@ DorisCompoundReader::DorisCompoundReader(lucene::store::Directory* d, const char int32_t read_buffer_size, bool open_idx_file_cache) : readBufferSize(read_buffer_size), dir(d), - ram_dir(new lucene::store::RAMDirectory()), + ram_dir(std::make_unique()), file_name(name), - entries(_CLNEW EntriesType(true, true)) { + entries(std::make_unique(true, true)) { bool success = false; try { if (dir->fileLength(name) == 0) { @@ -203,7 +203,6 @@ DorisCompoundReader::~DorisCompoundReader() { LOG(ERROR) << "DorisCompoundReader finalize error:" << err.what(); } } - _CLDELETE(entries) } const char* DorisCompoundReader::getClassName() { @@ -314,7 +313,6 @@ void DorisCompoundReader::close() { } if (ram_dir) { ram_dir->close(); - _CLDELETE(ram_dir) } if (dir) { dir->close(); diff --git a/be/src/olap/rowset/segment_v2/inverted_index_compound_reader.h b/be/src/olap/rowset/segment_v2/inverted_index_compound_reader.h index bc5ae415052f4f..81b8544a1aed5f 100644 --- a/be/src/olap/rowset/segment_v2/inverted_index_compound_reader.h +++ b/be/src/olap/rowset/segment_v2/inverted_index_compound_reader.h @@ -68,12 +68,12 @@ class CLUCENE_EXPORT DorisCompoundReader : public lucene::store::Directory { int32_t readBufferSize; // base info lucene::store::Directory* dir = nullptr; - lucene::store::RAMDirectory* ram_dir = nullptr; + std::unique_ptr ram_dir; std::string directory; std::string file_name; CL_NS(store)::IndexInput* stream = nullptr; // The life cycle of entries should be consistent with that of the DorisCompoundReader. - EntriesType* entries = nullptr; + std::unique_ptr entries; std::mutex _this_lock; bool _closed = false; diff --git a/be/src/olap/rowset/segment_v2/inverted_index_file_writer.cpp b/be/src/olap/rowset/segment_v2/inverted_index_file_writer.cpp index 7b66ee70cbe8f2..87ace2580f2348 100644 --- a/be/src/olap/rowset/segment_v2/inverted_index_file_writer.cpp +++ b/be/src/olap/rowset/segment_v2/inverted_index_file_writer.cpp @@ -21,6 +21,7 @@ #include "io/fs/file_writer.h" #include "io/fs/local_file_system.h" #include "olap/rowset/segment_v2/inverted_index_cache.h" +#include "olap/rowset/segment_v2/inverted_index_common.h" #include "olap/rowset/segment_v2/inverted_index_desc.h" #include "olap/rowset/segment_v2/inverted_index_fs_directory.h" #include "olap/rowset/segment_v2/inverted_index_reader.h" @@ -137,15 +138,14 @@ Status InvertedIndexFileWriter::close() { if (_storage_format == InvertedIndexStorageFormatPB::V1) { for (const auto& entry : _indices_dirs) { const auto& dir = entry.second; - auto* cfsWriter = _CLNEW DorisCompoundFileWriter(dir.get()); + DorisCompoundFileWriter cfsWriter(dir.get()); // write compound file - _file_size += cfsWriter->writeCompoundFile(); + _file_size += cfsWriter.writeCompoundFile(); // delete index path, which contains separated inverted index files if (std::strcmp(dir->getObjectName(), "DorisFSDirectory") == 0) { auto* compound_dir = static_cast(dir.get()); compound_dir->deleteDirectory(); } - _CLDELETE(cfsWriter) } } else { _file_size = write(); @@ -337,50 +337,63 @@ size_t DorisCompoundFileWriter::writeCompoundFile() { ram_dir.close(); auto compound_fs = ((DorisFSDirectory*)directory)->getCompoundFileSystem(); - auto* out_dir = DorisFSDirectoryFactory::getDirectory(compound_fs, idx_path.c_str()); + std::unique_ptr out_dir; + std::unique_ptr output; - auto* out = out_dir->createOutput(idx_name.c_str()); - if (out == nullptr) { - LOG(WARNING) << "Write compound file error: CompoundDirectory output is nullptr."; - _CLTHROWA(CL_ERR_IO, "Create CompoundDirectory output error"); - } - std::unique_ptr output(out); - size_t start = output->getFilePointer(); - output->writeVInt(file_count); - // write file entries - int64_t data_offset = header_len; - uint8_t header_buffer[buffer_length]; - for (int i = 0; i < sorted_files.size(); ++i) { - auto file = sorted_files[i]; - output->writeString(file.filename); // FileName - // DataOffset - if (i < header_file_count) { - // file data write in header, so we set its offset to -1. - output->writeLong(-1); - } else { - output->writeLong(data_offset); + ErrorContext error_context; + size_t compound_file_size = 0; + try { + out_dir = std::unique_ptr( + DorisFSDirectoryFactory::getDirectory(compound_fs, idx_path.c_str())); + output = std::unique_ptr( + out_dir->createOutput(idx_name.c_str())); + if (output == nullptr) { + LOG(WARNING) << "Write compound file error: CompoundDirectory output is nullptr."; + _CLTHROWA(CL_ERR_IO, "Create CompoundDirectory output error"); } - output->writeLong(file.filesize); // FileLength - if (i < header_file_count) { - // append data - copyFile(file.filename.c_str(), directory, output.get(), header_buffer, buffer_length); - } else { - data_offset += file.filesize; + + size_t start = output->getFilePointer(); + output->writeVInt(file_count); + // write file entries + int64_t data_offset = header_len; + uint8_t header_buffer[buffer_length]; + for (int i = 0; i < sorted_files.size(); ++i) { + auto file = sorted_files[i]; + output->writeString(file.filename); // FileName + // DataOffset + if (i < header_file_count) { + // file data write in header, so we set its offset to -1. + output->writeLong(-1); + } else { + output->writeLong(data_offset); + } + output->writeLong(file.filesize); // FileLength + if (i < header_file_count) { + // append data + copyFile(file.filename.c_str(), directory, output.get(), header_buffer, + buffer_length); + } else { + data_offset += file.filesize; + } } + // write rest files' data + uint8_t data_buffer[buffer_length]; + for (int i = header_file_count; i < sorted_files.size(); ++i) { + auto file = sorted_files[i]; + copyFile(file.filename.c_str(), directory, output.get(), data_buffer, buffer_length); + } + + compound_file_size = output->getFilePointer() - start; + } catch (CLuceneError& err) { + error_context.eptr = std::current_exception(); + error_context.err_msg.append("writeCompoundFile exception, error msg: "); + error_context.err_msg.append(err.what()); + LOG(ERROR) << error_context.err_msg; } - // write rest files' data - uint8_t data_buffer[buffer_length]; - for (int i = header_file_count; i < sorted_files.size(); ++i) { - auto file = sorted_files[i]; - copyFile(file.filename.c_str(), directory, output.get(), data_buffer, buffer_length); - } - out_dir->close(); - // NOTE: need to decrease ref count, but not to delete here, - // because index cache may get the same directory from DIRECTORIES - _CLDECDELETE(out_dir) - auto compound_file_size = output->getFilePointer() - start; - output->close(); - //LOG(INFO) << (idx_path / idx_name).c_str() << " size:" << compound_file_size; + FINALLY_EXCEPTION({ + FINALLY_CLOSE(out_dir); + FINALLY_CLOSE(output); + }) return compound_file_size; } diff --git a/be/src/olap/rowset/segment_v2/inverted_index_fs_directory.cpp b/be/src/olap/rowset/segment_v2/inverted_index_fs_directory.cpp index 9e2e253c40471f..542d3f97eca7d3 100644 --- a/be/src/olap/rowset/segment_v2/inverted_index_fs_directory.cpp +++ b/be/src/olap/rowset/segment_v2/inverted_index_fs_directory.cpp @@ -23,6 +23,7 @@ #include "inverted_index_desc.h" #include "io/fs/file_reader.h" #include "io/fs/file_writer.h" +#include "olap/rowset/segment_v2/inverted_index_common.h" #include "olap/tablet_schema.h" #include "util/debug_points.h" #include "util/slice.h" @@ -516,14 +517,21 @@ lucene::store::IndexOutput* DorisFSDirectory::createOutput(const char* name) { assert(!exists); } auto* ret = _CLNEW FSIndexOutput(); + ErrorContext error_context; try { ret->init(fs, fl); } catch (CLuceneError& err) { - ret->close(); - _CLDELETE(ret) - LOG(WARNING) << "FSIndexOutput init error: " << err.what(); - _CLTHROWA(CL_ERR_IO, "FSIndexOutput init error"); - } + error_context.eptr = std::current_exception(); + error_context.err_msg.append("FSIndexOutput init error: "); + error_context.err_msg.append(err.what()); + LOG(ERROR) << error_context.err_msg; + } + FINALLY_EXCEPTION({ + if (error_context.eptr) { + FINALLY_CLOSE(ret); + _CLDELETE(ret); + } + }) return ret; }