Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 7 additions & 9 deletions be/src/olap/rowset/segment_v2/inverted_index_compaction.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@

#include "inverted_index_compaction.h"

#include "inverted_index_common.h"
#include "inverted_index_file_writer.h"
#include "inverted_index_fs_directory.h"
#include "io/fs/local_file_system.h"
Expand All @@ -41,15 +42,17 @@ Status compact_column(int64_t index_id,
"debug point: index compaction error");
}
})

bool can_use_ram_dir = true;
lucene::store::Directory* dir = DorisFSDirectoryFactory::getDirectory(
io::global_local_filesystem(), tmp_path.data(), can_use_ram_dir);
std::unique_ptr<lucene::store::Directory, DirectoryDeleter> dir(
DorisFSDirectoryFactory::getDirectory(io::global_local_filesystem(), tmp_path.data(),
can_use_ram_dir));
DBUG_EXECUTE_IF("compact_column_getDirectory_error", {
_CLTHROWA(CL_ERR_IO, "debug point: compact_column_getDirectory_error in index compaction");
})
lucene::analysis::SimpleAnalyzer<char> analyzer;
auto* index_writer = _CLNEW lucene::index::IndexWriter(dir, &analyzer, true /* create */,
true /* closeDirOnShutdown */);
std::unique_ptr<lucene::index::IndexWriter> index_writer(_CLNEW lucene::index::IndexWriter(
dir.get(), &analyzer, true /* create */, true /* closeDirOnShutdown */));
DBUG_EXECUTE_IF("compact_column_create_index_writer_error", {
_CLTHROWA(CL_ERR_IO,
"debug point: compact_column_create_index_writer_error in index compaction");
Expand All @@ -71,11 +74,6 @@ Status compact_column(int64_t index_id,
_CLTHROWA(CL_ERR_IO,
"debug point: compact_column_index_writer_close_error in index compaction");
})
_CLDELETE(index_writer);
// NOTE: need to ref_cnt-- for dir,
// when index_writer is destroyed, if closeDir is set, dir will be close
// _CLDECDELETE(dir) will try to ref_cnt--, when it decreases to 1, dir will be destroyed.
_CLDECDELETE(dir)

// delete temporary segment_path, only when inverted_index_ram_dir_enable is false
if (!config::inverted_index_ram_dir_enable) {
Expand Down
98 changes: 47 additions & 51 deletions be/src/olap/rowset/segment_v2/inverted_index_compound_reader.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -22,10 +22,10 @@
#include <CLucene/debug/mem.h>
#include <CLucene/store/RAMDirectory.h>
#include <CLucene/util/Misc.h>
#include <stdio.h>
#include <string.h>
#include <wchar.h>

#include <cstdio>
#include <cstring>
#include <cwchar>
#include <memory>
#include <utility>

Expand All @@ -34,22 +34,13 @@
#include "olap/tablet_schema.h"
#include "util/debug_points.h"

namespace doris {
namespace io {
namespace doris::io {
class FileWriter;
} // namespace io
} // namespace doris
} // namespace doris::io

#define BUFFER_LENGTH 16384
#define CL_MAX_PATH 4096

#define STRDUP_WtoA(x) CL_NS(util)::Misc::_wideToChar(x)
#define STRDUP_TtoA STRDUP_WtoA

using FileWriterPtr = std::unique_ptr<doris::io::FileWriter>;

namespace doris {
namespace segment_v2 {
namespace doris::segment_v2 {

/** Implementation of an IndexInput that reads from a portion of the
* compound file.
Expand Down Expand Up @@ -94,7 +85,7 @@ CSIndexInput::CSIndexInput(CL_NS(store)::IndexInput* base, const std::string& fi
void CSIndexInput::readInternal(uint8_t* b, const int32_t len) {
std::lock_guard wlock(((DorisFSDirectory::FSIndexInput*)base)->_this_lock);

int64_t start = getFilePointer();
auto start = getFilePointer();
if (start + len > _length) {
_CLTHROWA(CL_ERR_IO, "read past EOF");
}
Expand Down Expand Up @@ -153,56 +144,63 @@ void CSIndexInput::setIoContext(const void* io_ctx) {
}

DorisCompoundReader::DorisCompoundReader(CL_NS(store)::IndexInput* stream,
EntriesType* entries_clone, int32_t read_buffer_size,
const EntriesType& entries_clone, int32_t read_buffer_size,
const io::IOContext* io_ctx)
: _stream(stream),
_entries(_CLNEW EntriesType(true, true)),
_entries(std::make_unique<EntriesType>()),
_read_buffer_size(read_buffer_size) {
// After stream clone, the io_ctx needs to be reconfigured.
initialize(io_ctx);

for (auto& e : *entries_clone) {
auto* origin_entry = e.second;
auto* entry = _CLNEW ReaderFileEntry();
char* aid = strdup(e.first);
for (const auto& e : entries_clone) {
const auto& origin_entry = e.second;
auto entry = std::make_unique<ReaderFileEntry>();
entry->file_name = origin_entry->file_name;
entry->offset = origin_entry->offset;
entry->length = origin_entry->length;
_entries->put(aid, entry);
(*_entries)[e.first] = std::move(entry);
}
};

DorisCompoundReader::DorisCompoundReader(CL_NS(store)::IndexInput* stream, int32_t read_buffer_size,
const io::IOContext* io_ctx)
: _ram_dir(new lucene::store::RAMDirectory()),
: _ram_dir(std::make_unique<lucene::store::RAMDirectory>()),
_stream(stream),
_entries(_CLNEW EntriesType(true, true)),
_entries(std::make_unique<EntriesType>()),
_read_buffer_size(read_buffer_size) {
// After stream clone, the io_ctx needs to be reconfigured.
initialize(io_ctx);

try {
int32_t count = _stream->readVInt();
ReaderFileEntry* entry = nullptr;
TCHAR tid[CL_MAX_PATH];
uint8_t buffer[BUFFER_LENGTH];
for (int32_t i = 0; i < count; i++) {
entry = _CLNEW ReaderFileEntry();
stream->readString(tid, CL_MAX_PATH);
char* aid = STRDUP_TtoA(tid);
entry->file_name = aid;
auto entry = std::make_unique<ReaderFileEntry>();
// Read the string length first
int32_t string_length = stream->readVInt();
// Allocate appropriate buffer for the string
std::wstring tid;
tid.resize(string_length);
// Read the string characters directly
stream->readChars(tid.data(), 0, string_length);
std::string file_name_str(tid.begin(), tid.end());
entry->file_name = file_name_str;
entry->offset = stream->readLong();
entry->length = stream->readLong();
VLOG_DEBUG << "string_length:" << string_length << " file_name:" << entry->file_name
<< " offset:" << entry->offset << " length:" << entry->length;
DBUG_EXECUTE_IF("construct_DorisCompoundReader_failed", {
CLuceneError err;
err.set(CL_ERR_IO, "construct_DorisCompoundReader_failed");
throw err;
})
_entries->put(aid, entry);
// read header file data
if (entry->offset < 0) {
copyFile(entry->file_name.c_str(), entry->length, buffer, BUFFER_LENGTH);
//if offset is -1, it means it's size is lower than DorisFSDirectory::MAX_HEADER_DATA_SIZE, which is 128k.
copyFile(entry->file_name.c_str(), static_cast<int64_t>(entry->length), buffer,
BUFFER_LENGTH);
}
_entries->emplace(std::move(file_name_str), std::move(entry));
}
} catch (...) {
try {
Expand All @@ -212,11 +210,9 @@ DorisCompoundReader::DorisCompoundReader(CL_NS(store)::IndexInput* stream, int32
}
if (_entries != nullptr) {
_entries->clear();
_CLDELETE(_entries);
}
if (_ram_dir) {
_ram_dir->close();
_CLDELETE(_ram_dir)
}
} catch (CLuceneError& err) {
if (err.number() != CL_ERR_IO) {
Expand All @@ -231,11 +227,12 @@ void DorisCompoundReader::copyFile(const char* file, int64_t file_length, uint8_
int64_t buffer_length) {
std::unique_ptr<lucene::store::IndexOutput> output(_ram_dir->createOutput(file));
int64_t start_ptr = output->getFilePointer();
int64_t remainder = file_length;
int64_t chunk = buffer_length;
auto remainder = file_length;
auto chunk = buffer_length;
auto batch_len = file_length < chunk ? file_length : chunk;

while (remainder > 0) {
int64_t len = std::min(std::min(chunk, file_length), remainder);
auto len = remainder < batch_len ? remainder : batch_len;
_stream->readBytes(buffer, len);
output->writeBytes(buffer, len);
remainder -= len;
Expand Down Expand Up @@ -270,7 +267,6 @@ DorisCompoundReader::~DorisCompoundReader() {
LOG(ERROR) << "DorisCompoundReader finalize error:" << err.what();
}
}
_CLDELETE(_entries)
}

const char* DorisCompoundReader::getClassName() {
Expand All @@ -284,8 +280,8 @@ bool DorisCompoundReader::list(std::vector<std::string>* names) const {
if (_closed || _entries == nullptr) {
_CLTHROWA(CL_ERR_IO, "DorisCompoundReader is already closed");
}
for (EntriesType::const_iterator i = _entries->begin(); i != _entries->end(); i++) {
names->push_back(i->first);
for (const auto& entry : *_entries) {
names->push_back(entry.first);
}
return true;
}
Expand All @@ -294,7 +290,7 @@ bool DorisCompoundReader::fileExists(const char* name) const {
if (_closed || _entries == nullptr) {
_CLTHROWA(CL_ERR_IO, "DorisCompoundReader is already closed");
}
return _entries->exists((char*)name);
return _entries->find(std::string(name)) != _entries->end();
}

int64_t DorisCompoundReader::fileModified(const char* name) const {
Expand All @@ -305,15 +301,15 @@ int64_t DorisCompoundReader::fileLength(const char* name) const {
if (_closed || _entries == nullptr) {
_CLTHROWA(CL_ERR_IO, "DorisCompoundReader is already closed");
}
ReaderFileEntry* e = _entries->get((char*)name);
if (e == nullptr) {
auto it = _entries->find(std::string(name));
if (it == _entries->end()) {
char buf[CL_MAX_PATH + 30];
strcpy(buf, "File ");
strncat(buf, name, CL_MAX_PATH);
strcat(buf, " does not exist");
_CLTHROWA(CL_ERR_IO, buf);
}
return e->length;
return it->second->length;
}

bool DorisCompoundReader::openInput(const char* name,
Expand All @@ -338,14 +334,16 @@ bool DorisCompoundReader::openInput(const char* name, lucene::store::IndexInput*
return false;
}

const ReaderFileEntry* entry = _entries->get((char*)name);
if (entry == nullptr) {
auto it = _entries->find(std::string(name));
if (it == _entries->end()) {
char buf[CL_MAX_PATH + 26];
snprintf(buf, CL_MAX_PATH + 26, "No sub-file with id %s found", name);
error.set(CL_ERR_IO, buf);
return false;
}

const auto& entry = it->second;

// If file is in RAM, just return.
if (_ram_dir && _ram_dir->fileExists(name)) {
return _ram_dir->openInput(name, ret, error, bufferSize);
Expand Down Expand Up @@ -374,7 +372,6 @@ void DorisCompoundReader::close() {
}
if (_ram_dir) {
_ram_dir->close();
_CLDELETE(_ram_dir)
}
_closed = true;
}
Expand All @@ -400,7 +397,7 @@ lucene::store::IndexOutput* DorisCompoundReader::createOutput(const char* /*name
}

std::string DorisCompoundReader::toString() const {
return std::string("DorisCompoundReader@");
return "DorisCompoundReader@";
}

CL_NS(store)::IndexInput* DorisCompoundReader::getDorisIndexInput() {
Expand All @@ -412,5 +409,4 @@ void DorisCompoundReader::initialize(const io::IOContext* io_ctx) {
_stream->setIdxFileCache(true);
}

} // namespace segment_v2
} // namespace doris
} // namespace doris::segment_v2
22 changes: 9 additions & 13 deletions be/src/olap/rowset/segment_v2/inverted_index_compound_reader.h
Original file line number Diff line number Diff line change
Expand Up @@ -24,12 +24,13 @@
#include <CLucene/store/IndexOutput.h>
#include <CLucene/util/Equators.h>
#include <CLucene/util/VoidMap.h>
#include <stdint.h>

#include <cstdint>
#include <map>
#include <memory>
#include <mutex>
#include <string>
#include <unordered_map>
#include <utility>
#include <vector>

Expand All @@ -43,9 +44,7 @@ namespace lucene::store {
class RAMDirectory;
} // namespace lucene::store

namespace doris {
class TabletIndex;
namespace segment_v2 {
namespace doris::segment_v2 {

class ReaderFileEntry : LUCENE_BASE {
public:
Expand All @@ -60,16 +59,14 @@ class ReaderFileEntry : LUCENE_BASE {
~ReaderFileEntry() override = default;
};

using EntriesType =
lucene::util::CLHashMap<char*, ReaderFileEntry*, lucene::util::Compare::Char,
lucene::util::Equals::Char, lucene::util::Deletor::acArray,
lucene::util::Deletor::Object<ReaderFileEntry>>;
using EntriesType = std::unordered_map<std::string, std::unique_ptr<ReaderFileEntry>>;

class CLUCENE_EXPORT DorisCompoundReader : public lucene::store::Directory {
private:
lucene::store::RAMDirectory* _ram_dir = nullptr;
std::unique_ptr<lucene::store::RAMDirectory> _ram_dir;
CL_NS(store)::IndexInput* _stream = nullptr;
// The life cycle of _entries should be consistent with that of the DorisCompoundReader.
EntriesType* _entries = nullptr;
std::unique_ptr<EntriesType> _entries;
std::mutex _this_lock;
bool _closed = false;
int32_t _read_buffer_size = CL_NS(store)::BufferedIndexInput::BUFFER_SIZE;
Expand All @@ -79,7 +76,7 @@ class CLUCENE_EXPORT DorisCompoundReader : public lucene::store::Directory {
bool doDeleteFile(const char* name) override;

public:
DorisCompoundReader(CL_NS(store)::IndexInput* stream, EntriesType* entries_clone,
DorisCompoundReader(CL_NS(store)::IndexInput* stream, const EntriesType& entries_clone,
int32_t read_buffer_size = CL_NS(store)::BufferedIndexInput::BUFFER_SIZE,
const io::IOContext* io_ctx = nullptr);
DorisCompoundReader(CL_NS(store)::IndexInput* stream,
Expand Down Expand Up @@ -109,5 +106,4 @@ class CLUCENE_EXPORT DorisCompoundReader : public lucene::store::Directory {
void initialize(const io::IOContext* io_ctx);
};

} // namespace segment_v2
} // namespace doris
} // namespace doris::segment_v2
Loading
Loading