Skip to content

Commit

Permalink
[Refact](inverted index) refact inverted index file writer to make it…
Browse files Browse the repository at this point in the history
… more clear (apache#35459)

## Proposed changes

Issue Number: close #xxx

1. This PR refactors path handling and logging in the
InvertedIndexDescriptor and related components, ensuring clarity and
consistency across the codebase.
2. Updated initialization logic for DorisFSDirectory and
DorisRAMFSDirectory, removing confusing cfs and fs logic.
3. Removed DorisCompoundFileWriter and consolidated its logic into
InvertedIndexFileWriter.
3. Removed unused methods get_inverted_index_size and file_size from
several classes.


## Further comments

If this is a relatively large or complex change, kick off the discussion
at [dev@doris.apache.org](mailto:dev@doris.apache.org) by explaining why
you chose the solution you did and what alternatives you considered,
etc...
  • Loading branch information
airborne12 authored Jun 19, 2024
1 parent 5b6d04a commit 23cd498
Show file tree
Hide file tree
Showing 27 changed files with 516 additions and 594 deletions.
31 changes: 12 additions & 19 deletions be/src/olap/compaction.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -615,18 +615,13 @@ Status Compaction::do_inverted_index_compaction() {

auto seg_path = DORIS_TRY(rowset->segment_path(seg_id));
auto inverted_index_file_reader = std::make_unique<InvertedIndexFileReader>(
fs, std::string {InvertedIndexDescriptor::get_index_path_prefix(seg_path)},
fs, std::string {InvertedIndexDescriptor::get_index_file_path_prefix(seg_path)},
_cur_tablet_schema->get_inverted_index_storage_format());
bool open_idx_file_cache = false;
auto st = inverted_index_file_reader->init(config::inverted_index_read_buffer_size,
open_idx_file_cache);
if (!st.ok()) {
LOG(ERROR) << "init inverted index "
<< InvertedIndexDescriptor::get_index_path_v2(
InvertedIndexDescriptor::get_index_path_prefix(seg_path))
<< " failed in compaction when init inverted index file reader";
return st;
}
RETURN_NOT_OK_STATUS_WITH_WARN(
inverted_index_file_reader->init(config::inverted_index_read_buffer_size,
open_idx_file_cache),
"inverted_index_file_reader init failed");
inverted_index_file_readers[m.second] = std::move(inverted_index_file_reader);
}

Expand All @@ -636,7 +631,7 @@ Status Compaction::do_inverted_index_compaction() {
dest_segment_num);
for (int i = 0; i < dest_segment_num; ++i) {
std::string index_path_prefix {
InvertedIndexDescriptor::get_index_path_prefix(ctx.segment_path(i))};
InvertedIndexDescriptor::get_index_file_path_prefix(ctx.segment_path(i))};
auto inverted_index_file_reader = std::make_unique<InvertedIndexFileReader>(
ctx.fs(), index_path_prefix,
_cur_tablet_schema->get_inverted_index_storage_format());
Expand All @@ -649,20 +644,16 @@ Status Compaction::do_inverted_index_compaction() {
auto inverted_index_file_writer = std::make_unique<InvertedIndexFileWriter>(
ctx.fs(), index_path_prefix, ctx.rowset_id.to_string(), i,
_cur_tablet_schema->get_inverted_index_storage_format());
RETURN_NOT_OK_STATUS_WITH_WARN(
inverted_index_file_writer->initialize(index_not_need_to_compact),
"failed to initialize inverted_index_file_writer for " +
inverted_index_file_writer->get_index_file_path());
RETURN_IF_ERROR(inverted_index_file_writer->initialize(index_not_need_to_compact));
inverted_index_file_writers[i] = std::move(inverted_index_file_writer);
} else if (st.is<ErrorCode::INVERTED_INDEX_FILE_NOT_FOUND>()) {
auto inverted_index_file_writer = std::make_unique<InvertedIndexFileWriter>(
ctx.fs(), index_path_prefix, ctx.rowset_id.to_string(), i,
_cur_tablet_schema->get_inverted_index_storage_format());
inverted_index_file_writers[i] = std::move(inverted_index_file_writer);
} else {
LOG(ERROR) << "init inverted index "
<< InvertedIndexDescriptor::get_index_path_v2(index_path_prefix)
<< " failed in compaction when create inverted index file writer";
LOG(ERROR) << "inverted_index_file_reader init failed in index compaction, error:"
<< st;
return st;
}
}
Expand Down Expand Up @@ -797,7 +788,9 @@ void Compaction::construct_skip_inverted_index(RowsetWriterContext& ctx) {
}

auto inverted_index_file_reader = std::make_unique<InvertedIndexFileReader>(
fs, std::string {InvertedIndexDescriptor::get_index_path_prefix(*seg_path)},
fs,
std::string {
InvertedIndexDescriptor::get_index_file_path_prefix(*seg_path)},
_cur_tablet_schema->get_inverted_index_storage_format());
bool open_idx_file_cache = false;
auto st = inverted_index_file_reader->init(config::inverted_index_read_buffer_size,
Expand Down
39 changes: 20 additions & 19 deletions be/src/olap/delta_writer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -278,12 +278,29 @@ void DeltaWriter::_request_slave_tablet_pull_rowset(const PNodeInfo& node_info)
local_segment_path(tablet_path, cur_rowset->rowset_id().to_string(), segment_id);
int64_t segment_size = std::filesystem::file_size(seg_path);
request->mutable_segments_size()->insert({segment_id, segment_size});
auto index_path_prefix = InvertedIndexDescriptor::get_index_path_prefix(seg_path);
auto index_path_prefix = InvertedIndexDescriptor::get_index_file_path_prefix(seg_path);
if (!indices_ids.empty()) {
if (tablet_schema->get_inverted_index_storage_format() !=
if (tablet_schema->get_inverted_index_storage_format() ==
InvertedIndexStorageFormatPB::V1) {
for (auto index_meta : indices_ids) {
std::string inverted_index_file =
InvertedIndexDescriptor::get_index_file_path_v1(
index_path_prefix, index_meta.first, index_meta.second);
int64_t size = std::filesystem::file_size(inverted_index_file);
PTabletWriteSlaveRequest::IndexSize index_size;
index_size.set_indexid(index_meta.first);
index_size.set_size(size);
index_size.set_suffix_path(index_meta.second);
// Fetch the map value for the current segment_id.
// If it doesn't exist, this will insert a new default-constructed IndexSizeMapValue
auto& index_size_map_value =
(*(request->mutable_inverted_indices_size()))[segment_id];
// Add the new index size to the map value.
*index_size_map_value.mutable_index_sizes()->Add() = std::move(index_size);
}
} else {
std::string inverted_index_file =
InvertedIndexDescriptor::get_index_path_v2(index_path_prefix);
InvertedIndexDescriptor::get_index_file_path_v2(index_path_prefix);
int64_t size = std::filesystem::file_size(inverted_index_file);
PTabletWriteSlaveRequest::IndexSize index_size;
// special id for non-V1 format
Expand All @@ -296,22 +313,6 @@ void DeltaWriter::_request_slave_tablet_pull_rowset(const PNodeInfo& node_info)
(*(request->mutable_inverted_indices_size()))[segment_id];
// Add the new index size to the map value.
*index_size_map_value.mutable_index_sizes()->Add() = std::move(index_size);
} else {
for (auto index_id : indices_ids) {
std::string inverted_index_file = InvertedIndexDescriptor::get_index_path_v1(
seg_path, index_id.first, index_id.second);
int64_t size = std::filesystem::file_size(inverted_index_file);
PTabletWriteSlaveRequest::IndexSize index_size;
index_size.set_indexid(index_id.first);
index_size.set_size(size);
index_size.set_suffix_path(index_id.second);
// Fetch the map value for the current segment_id.
// If it doesn't exist, this will insert a new default-constructed IndexSizeMapValue
auto& index_size_map_value =
(*(request->mutable_inverted_indices_size()))[segment_id];
// Add the new index size to the map value.
*index_size_map_value.mutable_index_sizes()->Add() = std::move(index_size);
}
}
}
}
Expand Down
Loading

0 comments on commit 23cd498

Please sign in to comment.