Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
103 changes: 103 additions & 0 deletions be/src/olap/rowset/segment_v2/inverted_index_common.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,103 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

#pragma once

#include <CLucene.h> // IWYU pragma: keep

#include <memory>

#include "common/logging.h"

namespace lucene::store {
class Directory;
} // namespace lucene::store

namespace doris::segment_v2 {

struct DirectoryDeleter {
void operator()(lucene::store::Directory* ptr) const { _CLDECDELETE(ptr); }
};

struct ErrorContext {
std::string err_msg;
std::exception_ptr eptr;
};

template <typename T>
concept HasClose = requires(T t) {
{ t->close() };
};

template <typename PtrType>
requires HasClose<PtrType>
void finally_close(PtrType& resource, ErrorContext& error_context) {
if (resource) {
try {
resource->close();
} catch (CLuceneError& err) {
error_context.eptr = std::current_exception();
error_context.err_msg.append("Error occurred while closing resource: ");
error_context.err_msg.append(err.what());
LOG(ERROR) << error_context.err_msg;
} catch (...) {
error_context.eptr = std::current_exception();
error_context.err_msg.append("Error occurred while closing resource");
LOG(ERROR) << error_context.err_msg;
}
}
}

#if defined(__clang__)
#pragma clang diagnostic push
#pragma clang diagnostic ignored "-Wunused-macros"
#endif

#define FINALLY_CLOSE(resource) \
{ \
static_assert(sizeof(error_context) > 0, \
"error_context must be defined before using FINALLY macro!"); \
finally_close(resource, error_context); \
}

// Return ERROR after finally
#define FINALLY(finally_block) \
{ \
static_assert(sizeof(error_context) > 0, \
"error_context must be defined before using FINALLY macro!"); \
finally_block; \
if (error_context.eptr) { \
return Status::Error<ErrorCode::INVERTED_INDEX_CLUCENE_ERROR>(error_context.err_msg); \
} \
}

// Re-throw the exception after finally
#define FINALLY_EXCEPTION(finally_block) \
{ \
static_assert(sizeof(error_context) > 0, \
"error_context must be defined before using FINALLY macro!"); \
finally_block; \
if (error_context.eptr) { \
std::rethrow_exception(error_context.eptr); \
} \
}

#if defined(__clang__)
#pragma clang diagnostic pop
#endif

} // namespace doris::segment_v2
Original file line number Diff line number Diff line change
Expand Up @@ -114,9 +114,9 @@ DorisCompoundReader::DorisCompoundReader(lucene::store::Directory* d, const char
int32_t read_buffer_size, bool open_idx_file_cache)
: readBufferSize(read_buffer_size),
dir(d),
ram_dir(new lucene::store::RAMDirectory()),
ram_dir(std::make_unique<lucene::store::RAMDirectory>()),
file_name(name),
entries(_CLNEW EntriesType(true, true)) {
entries(std::make_unique<EntriesType>(true, true)) {
bool success = false;
try {
if (dir->fileLength(name) == 0) {
Expand Down Expand Up @@ -203,7 +203,6 @@ DorisCompoundReader::~DorisCompoundReader() {
LOG(ERROR) << "DorisCompoundReader finalize error:" << err.what();
}
}
_CLDELETE(entries)
}

const char* DorisCompoundReader::getClassName() {
Expand Down Expand Up @@ -314,7 +313,6 @@ void DorisCompoundReader::close() {
}
if (ram_dir) {
ram_dir->close();
_CLDELETE(ram_dir)
}
if (dir) {
dir->close();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -68,12 +68,12 @@ class CLUCENE_EXPORT DorisCompoundReader : public lucene::store::Directory {
int32_t readBufferSize;
// base info
lucene::store::Directory* dir = nullptr;
lucene::store::RAMDirectory* ram_dir = nullptr;
std::unique_ptr<lucene::store::RAMDirectory> ram_dir;
std::string directory;
std::string file_name;
CL_NS(store)::IndexInput* stream = nullptr;
// The life cycle of entries should be consistent with that of the DorisCompoundReader.
EntriesType* entries = nullptr;
std::unique_ptr<EntriesType> entries;
std::mutex _this_lock;
bool _closed = false;

Expand Down
99 changes: 56 additions & 43 deletions be/src/olap/rowset/segment_v2/inverted_index_file_writer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
#include "io/fs/file_writer.h"
#include "io/fs/local_file_system.h"
#include "olap/rowset/segment_v2/inverted_index_cache.h"
#include "olap/rowset/segment_v2/inverted_index_common.h"
#include "olap/rowset/segment_v2/inverted_index_desc.h"
#include "olap/rowset/segment_v2/inverted_index_fs_directory.h"
#include "olap/rowset/segment_v2/inverted_index_reader.h"
Expand Down Expand Up @@ -137,15 +138,14 @@ Status InvertedIndexFileWriter::close() {
if (_storage_format == InvertedIndexStorageFormatPB::V1) {
for (const auto& entry : _indices_dirs) {
const auto& dir = entry.second;
auto* cfsWriter = _CLNEW DorisCompoundFileWriter(dir.get());
DorisCompoundFileWriter cfsWriter(dir.get());
// write compound file
_file_size += cfsWriter->writeCompoundFile();
_file_size += cfsWriter.writeCompoundFile();
// delete index path, which contains separated inverted index files
if (std::strcmp(dir->getObjectName(), "DorisFSDirectory") == 0) {
auto* compound_dir = static_cast<DorisFSDirectory*>(dir.get());
compound_dir->deleteDirectory();
}
_CLDELETE(cfsWriter)
}
} else {
_file_size = write();
Expand Down Expand Up @@ -337,50 +337,63 @@ size_t DorisCompoundFileWriter::writeCompoundFile() {
ram_dir.close();

auto compound_fs = ((DorisFSDirectory*)directory)->getCompoundFileSystem();
auto* out_dir = DorisFSDirectoryFactory::getDirectory(compound_fs, idx_path.c_str());
std::unique_ptr<lucene::store::Directory, DirectoryDeleter> out_dir;
std::unique_ptr<lucene::store::IndexOutput> output;

auto* out = out_dir->createOutput(idx_name.c_str());
if (out == nullptr) {
LOG(WARNING) << "Write compound file error: CompoundDirectory output is nullptr.";
_CLTHROWA(CL_ERR_IO, "Create CompoundDirectory output error");
}
std::unique_ptr<lucene::store::IndexOutput> output(out);
size_t start = output->getFilePointer();
output->writeVInt(file_count);
// write file entries
int64_t data_offset = header_len;
uint8_t header_buffer[buffer_length];
for (int i = 0; i < sorted_files.size(); ++i) {
auto file = sorted_files[i];
output->writeString(file.filename); // FileName
// DataOffset
if (i < header_file_count) {
// file data write in header, so we set its offset to -1.
output->writeLong(-1);
} else {
output->writeLong(data_offset);
ErrorContext error_context;
size_t compound_file_size = 0;
try {
out_dir = std::unique_ptr<lucene::store::Directory, DirectoryDeleter>(
DorisFSDirectoryFactory::getDirectory(compound_fs, idx_path.c_str()));
output = std::unique_ptr<lucene::store::IndexOutput>(
out_dir->createOutput(idx_name.c_str()));
if (output == nullptr) {
LOG(WARNING) << "Write compound file error: CompoundDirectory output is nullptr.";
_CLTHROWA(CL_ERR_IO, "Create CompoundDirectory output error");
}
output->writeLong(file.filesize); // FileLength
if (i < header_file_count) {
// append data
copyFile(file.filename.c_str(), directory, output.get(), header_buffer, buffer_length);
} else {
data_offset += file.filesize;

size_t start = output->getFilePointer();
output->writeVInt(file_count);
// write file entries
int64_t data_offset = header_len;
uint8_t header_buffer[buffer_length];
for (int i = 0; i < sorted_files.size(); ++i) {
auto file = sorted_files[i];
output->writeString(file.filename); // FileName
// DataOffset
if (i < header_file_count) {
// file data write in header, so we set its offset to -1.
output->writeLong(-1);
} else {
output->writeLong(data_offset);
}
output->writeLong(file.filesize); // FileLength
if (i < header_file_count) {
// append data
copyFile(file.filename.c_str(), directory, output.get(), header_buffer,
buffer_length);
} else {
data_offset += file.filesize;
}
}
// write rest files' data
uint8_t data_buffer[buffer_length];
for (int i = header_file_count; i < sorted_files.size(); ++i) {
auto file = sorted_files[i];
copyFile(file.filename.c_str(), directory, output.get(), data_buffer, buffer_length);
}

compound_file_size = output->getFilePointer() - start;
} catch (CLuceneError& err) {
error_context.eptr = std::current_exception();
error_context.err_msg.append("writeCompoundFile exception, error msg: ");
error_context.err_msg.append(err.what());
LOG(ERROR) << error_context.err_msg;
}
// write rest files' data
uint8_t data_buffer[buffer_length];
for (int i = header_file_count; i < sorted_files.size(); ++i) {
auto file = sorted_files[i];
copyFile(file.filename.c_str(), directory, output.get(), data_buffer, buffer_length);
}
out_dir->close();
// NOTE: need to decrease ref count, but not to delete here,
// because index cache may get the same directory from DIRECTORIES
_CLDECDELETE(out_dir)
auto compound_file_size = output->getFilePointer() - start;
output->close();
//LOG(INFO) << (idx_path / idx_name).c_str() << " size:" << compound_file_size;
FINALLY_EXCEPTION({
FINALLY_CLOSE(out_dir);
FINALLY_CLOSE(output);
})
return compound_file_size;
}

Expand Down
18 changes: 13 additions & 5 deletions be/src/olap/rowset/segment_v2/inverted_index_fs_directory.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
#include "inverted_index_desc.h"
#include "io/fs/file_reader.h"
#include "io/fs/file_writer.h"
#include "olap/rowset/segment_v2/inverted_index_common.h"
#include "olap/tablet_schema.h"
#include "util/debug_points.h"
#include "util/slice.h"
Expand Down Expand Up @@ -516,14 +517,21 @@ lucene::store::IndexOutput* DorisFSDirectory::createOutput(const char* name) {
assert(!exists);
}
auto* ret = _CLNEW FSIndexOutput();
ErrorContext error_context;
try {
ret->init(fs, fl);
} catch (CLuceneError& err) {
ret->close();
_CLDELETE(ret)
LOG(WARNING) << "FSIndexOutput init error: " << err.what();
_CLTHROWA(CL_ERR_IO, "FSIndexOutput init error");
}
error_context.eptr = std::current_exception();
error_context.err_msg.append("FSIndexOutput init error: ");
error_context.err_msg.append(err.what());
LOG(ERROR) << error_context.err_msg;
}
FINALLY_EXCEPTION({
if (error_context.eptr) {
FINALLY_CLOSE(ret);
_CLDELETE(ret);
}
})
return ret;
}

Expand Down
Loading