Skip to content

Commit

Permalink
[Refactor] (inverted index) Refactor Inverted index file writer (#41625)
Browse files Browse the repository at this point in the history
## Proposed changes

1. After the normal segment is flushed, the `close_inverted_index` is
directly called to write the final composite file.
2. During compaction, in the first step, the `segment writer `writes the
`bkd index` while writing normal data. In the second step, the` index
compaction` writes the `string index`. In the third step,
`close_inverted_index` is uniformly called for all indexes to write the
final files.
3. The rowset writer uses `InvertedIndexFileCollection` to store all
inverted index file writers, ensuring their lifecycle exists throughout
the entire writing or compaction process.
4. When the rowset writer generates the final rowset through
`build(rowset)`, it can retrieve the index file sizes from the
`InvertedIndexFileCollection` and record them in the rowset meta.
  • Loading branch information
csun5285 authored Oct 30, 2024
1 parent edfa1b4 commit 4a08bae
Show file tree
Hide file tree
Showing 30 changed files with 3,133 additions and 340 deletions.
15 changes: 8 additions & 7 deletions be/src/cloud/cloud_rowset_writer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -115,13 +115,14 @@ Status CloudRowsetWriter::build(RowsetSharedPtr& rowset) {
} else {
_rowset_meta->add_segments_file_size(seg_file_size.value());
}

if (auto idx_files_info = _idx_files_info.get_inverted_files_info(_segment_start_id);
!idx_files_info.has_value()) [[unlikely]] {
LOG(ERROR) << "expected inverted index files info, but none presents: "
<< idx_files_info.error();
} else {
_rowset_meta->add_inverted_index_files_info(idx_files_info.value());
if (rowset_schema->has_inverted_index()) {
if (auto idx_files_info = _idx_files.inverted_index_file_info(_segment_start_id);
!idx_files_info.has_value()) [[unlikely]] {
LOG(ERROR) << "expected inverted index files info, but none presents: "
<< idx_files_info.error();
} else {
_rowset_meta->add_inverted_index_files_info(idx_files_info.value());
}
}

RETURN_NOT_OK_STATUS_WITH_WARN(RowsetFactory::create_rowset(rowset_schema, _context.tablet_path,
Expand Down
2 changes: 1 addition & 1 deletion be/src/common/config.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1039,7 +1039,7 @@ DEFINE_Int32(inverted_index_read_buffer_size, "4096");
// tree depth for bkd index
DEFINE_Int32(max_depth_in_bkd_tree, "32");
// index compaction
DEFINE_mBool(inverted_index_compaction_enable, "false");
DEFINE_mBool(inverted_index_compaction_enable, "true");
// Only for debug, do not use in production
DEFINE_mBool(debug_inverted_index_compaction, "false");
// index by RAM directory
Expand Down
158 changes: 39 additions & 119 deletions be/src/olap/compaction.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -188,6 +188,7 @@ Status Compaction::merge_input_rowsets() {
Status res;
{
SCOPED_TIMER(_merge_rowsets_latency_timer);
// 1. Merge segment files and write bkd inverted index
if (_is_vertical) {
res = Merger::vertical_merge_rowsets(_tablet, compaction_type(), *_cur_tablet_schema,
input_rs_readers, _output_rs_writer.get(),
Expand All @@ -200,17 +201,19 @@ Status Compaction::merge_input_rowsets() {
res = Merger::vmerge_rowsets(_tablet, compaction_type(), *_cur_tablet_schema,
input_rs_readers, _output_rs_writer.get(), &_stats);
}
}

_tablet->last_compaction_status = res;

if (!res.ok()) {
return res;
_tablet->last_compaction_status = res;
if (!res.ok()) {
return res;
}
// 2. Merge the remaining inverted index files of the string type
RETURN_IF_ERROR(do_inverted_index_compaction());
}

COUNTER_UPDATE(_merged_rows_counter, _stats.merged_rows);
COUNTER_UPDATE(_filtered_rows_counter, _stats.filtered_rows);

// 3. In the `build`, `_close_file_writers` is called to close the inverted index file writer and write the final compound index file.
RETURN_NOT_OK_STATUS_WITH_WARN(_output_rs_writer->build(_output_rowset),
fmt::format("rowset writer build failed. output_version: {}",
_output_version.to_string()));
Expand Down Expand Up @@ -456,8 +459,6 @@ Status CompactionMixin::execute_compact_impl(int64_t permits) {

RETURN_IF_ERROR(merge_input_rowsets());

RETURN_IF_ERROR(do_inverted_index_compaction());

RETURN_IF_ERROR(modify_rowsets());

auto* cumu_policy = tablet()->cumulative_compaction_policy();
Expand Down Expand Up @@ -613,58 +614,9 @@ Status Compaction::do_inverted_index_compaction() {

// dest index files
// format: rowsetId_segmentId
std::vector<std::unique_ptr<InvertedIndexFileWriter>> inverted_index_file_writers(
dest_segment_num);

// Some columns have already been indexed
// key: seg_id, value: inverted index file size
std::unordered_map<int, int64_t> compacted_idx_file_size;
for (int seg_id = 0; seg_id < dest_segment_num; ++seg_id) {
std::string index_path_prefix {
InvertedIndexDescriptor::get_index_file_path_prefix(ctx.segment_path(seg_id))};
auto inverted_index_file_reader = std::make_unique<InvertedIndexFileReader>(
ctx.fs(), index_path_prefix,
_cur_tablet_schema->get_inverted_index_storage_format());
bool open_idx_file_cache = false;
auto st = inverted_index_file_reader->init(config::inverted_index_read_buffer_size,
open_idx_file_cache);
if (st.ok()) {
auto index_not_need_to_compact =
DORIS_TRY(inverted_index_file_reader->get_all_directories());
// V1: each index is a separate file
// V2: all indexes are in a single file
if (_cur_tablet_schema->get_inverted_index_storage_format() !=
doris::InvertedIndexStorageFormatPB::V1) {
int64_t fsize = 0;
st = ctx.fs()->file_size(
InvertedIndexDescriptor::get_index_file_path_v2(index_path_prefix), &fsize);
if (!st.ok()) {
LOG(ERROR) << "file size error in index compaction, error:" << st.msg();
return st;
}
compacted_idx_file_size[seg_id] = fsize;
}
auto inverted_index_file_writer = std::make_unique<InvertedIndexFileWriter>(
ctx.fs(), index_path_prefix, ctx.rowset_id.to_string(), seg_id,
_cur_tablet_schema->get_inverted_index_storage_format());
RETURN_IF_ERROR(inverted_index_file_writer->initialize(index_not_need_to_compact));
inverted_index_file_writers[seg_id] = std::move(inverted_index_file_writer);
} else if (st.is<ErrorCode::INVERTED_INDEX_FILE_NOT_FOUND>()) {
auto inverted_index_file_writer = std::make_unique<InvertedIndexFileWriter>(
ctx.fs(), index_path_prefix, ctx.rowset_id.to_string(), seg_id,
_cur_tablet_schema->get_inverted_index_storage_format());
inverted_index_file_writers[seg_id] = std::move(inverted_index_file_writer);
// no index file
compacted_idx_file_size[seg_id] = 0;
} else {
LOG(ERROR) << "inverted_index_file_reader init failed in index compaction, error:"
<< st;
return st;
}
}
for (const auto& writer : inverted_index_file_writers) {
writer->set_file_writer_opts(ctx.get_file_writer_options());
}
auto& inverted_index_file_writers = dynamic_cast<BaseBetaRowsetWriter*>(_output_rs_writer.get())
->inverted_index_file_writers();
DCHECK_EQ(inverted_index_file_writers.size(), dest_segment_num);

// use tmp file dir to store index files
auto tmp_file_dir = ExecEnv::GetInstance()->get_tmp_file_dirs()->get_tmp_file_dir();
Expand All @@ -690,29 +642,6 @@ Status Compaction::do_inverted_index_compaction() {
auto col = _cur_tablet_schema->column_by_uid(column_uniq_id);
const auto* index_meta = _cur_tablet_schema->get_inverted_index(col);

// if index properties are different, index compaction maybe needs to be skipped.
bool is_continue = false;
std::optional<std::map<std::string, std::string>> first_properties;
for (const auto& rowset : _input_rowsets) {
const auto* tablet_index = rowset->tablet_schema()->get_inverted_index(col);
const auto& properties = tablet_index->properties();
if (!first_properties.has_value()) {
first_properties = properties;
} else {
if (properties != first_properties.value()) {
error_handler(index_meta->index_id(), column_uniq_id);
status = Status::Error<INVERTED_INDEX_COMPACTION_ERROR>(
"if index properties are different, index compaction needs to be "
"skipped.");
is_continue = true;
break;
}
}
}
if (is_continue) {
continue;
}

std::vector<lucene::store::Directory*> dest_index_dirs(dest_segment_num);
try {
std::vector<std::unique_ptr<DorisCompoundReader>> src_idx_dirs(src_segment_num);
Expand All @@ -737,40 +666,12 @@ Status Compaction::do_inverted_index_compaction() {
}
}

std::vector<InvertedIndexFileInfo> all_inverted_index_file_info(dest_segment_num);
uint64_t inverted_index_file_size = 0;
for (int seg_id = 0; seg_id < dest_segment_num; ++seg_id) {
auto inverted_index_file_writer = inverted_index_file_writers[seg_id].get();
if (Status st = inverted_index_file_writer->close(); !st.ok()) {
status = Status::Error<INVERTED_INDEX_COMPACTION_ERROR>(st.msg());
} else {
inverted_index_file_size += inverted_index_file_writer->get_index_file_total_size();
inverted_index_file_size -= compacted_idx_file_size[seg_id];
}
all_inverted_index_file_info[seg_id] = inverted_index_file_writer->get_index_file_info();
}
// check index compaction status. If status is not ok, we should return error and end this compaction round.
if (!status.ok()) {
return status;
}

// index compaction should update total disk size and index disk size
_output_rowset->rowset_meta()->set_data_disk_size(_output_rowset->data_disk_size() +
inverted_index_file_size);
_output_rowset->rowset_meta()->set_total_disk_size(_output_rowset->data_disk_size() +
inverted_index_file_size);
_output_rowset->rowset_meta()->set_index_disk_size(_output_rowset->index_disk_size() +
inverted_index_file_size);

_output_rowset->rowset_meta()->update_inverted_index_files_info(all_inverted_index_file_info);
COUNTER_UPDATE(_output_rowset_data_size_counter, _output_rowset->data_disk_size());

LOG(INFO) << "succeed to do index compaction"
<< ". tablet=" << _tablet->tablet_id() << ", input row number=" << _input_row_num
<< ", output row number=" << _output_rowset->num_rows()
<< ", input_rowset_size=" << _input_rowsets_size
<< ", output_rowset_size=" << _output_rowset->data_disk_size()
<< ", inverted index file size=" << inverted_index_file_size
<< ". tablet=" << _tablet->tablet_id()
<< ". elapsed time=" << inverted_watch.get_elapse_second() << "s.";

return Status::OK();
Expand All @@ -795,6 +696,31 @@ void Compaction::construct_index_compaction_columns(RowsetWriterContext& ctx) {
if (!field_is_slice_type(_cur_tablet_schema->column_by_uid(col_unique_id).type())) {
continue;
}

// if index properties are different, index compaction maybe needs to be skipped.
bool is_continue = false;
std::optional<std::map<std::string, std::string>> first_properties;
for (const auto& rowset : _input_rowsets) {
const auto* tablet_index =
rowset->tablet_schema()->get_inverted_index(col_unique_id, "");
// no inverted index or index id is different from current index id
if (tablet_index == nullptr || tablet_index->index_id() != index.index_id()) {
is_continue = true;
break;
}
const auto& properties = tablet_index->properties();
if (!first_properties.has_value()) {
first_properties = properties;
} else {
if (properties != first_properties.value()) {
is_continue = true;
break;
}
}
}
if (is_continue) {
continue;
}
auto has_inverted_index = [&](const RowsetSharedPtr& src_rs) {
auto* rowset = static_cast<BetaRowset*>(src_rs.get());
if (rowset->is_skip_index_compaction(col_unique_id)) {
Expand Down Expand Up @@ -887,9 +813,7 @@ Status CompactionMixin::construct_output_rowset_writer(RowsetWriterContext& ctx)
if (config::inverted_index_compaction_enable &&
(((_tablet->keys_type() == KeysType::UNIQUE_KEYS &&
_tablet->enable_unique_key_merge_on_write()) ||
_tablet->keys_type() == KeysType::DUP_KEYS)) &&
_cur_tablet_schema->get_inverted_index_storage_format() ==
InvertedIndexStorageFormatPB::V1) {
_tablet->keys_type() == KeysType::DUP_KEYS))) {
construct_index_compaction_columns(ctx);
}
ctx.version = _output_version;
Expand Down Expand Up @@ -1156,8 +1080,6 @@ Status CloudCompactionMixin::execute_compact_impl(int64_t permits) {

RETURN_IF_ERROR(merge_input_rowsets());

RETURN_IF_ERROR(do_inverted_index_compaction());

RETURN_IF_ERROR(_engine.meta_mgr().commit_rowset(*_output_rowset->rowset_meta().get()));

// 4. modify rowsets in memory
Expand All @@ -1184,9 +1106,7 @@ Status CloudCompactionMixin::construct_output_rowset_writer(RowsetWriterContext&
if (config::inverted_index_compaction_enable &&
(((_tablet->keys_type() == KeysType::UNIQUE_KEYS &&
_tablet->enable_unique_key_merge_on_write()) ||
_tablet->keys_type() == KeysType::DUP_KEYS)) &&
_cur_tablet_schema->get_inverted_index_storage_format() ==
InvertedIndexStorageFormatPB::V1) {
_tablet->keys_type() == KeysType::DUP_KEYS))) {
construct_index_compaction_columns(ctx);
}

Expand Down
1 change: 1 addition & 0 deletions be/src/olap/compaction.h
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,7 @@ class Compaction {
protected:
Status merge_input_rowsets();

// merge inverted index files
Status do_inverted_index_compaction();

void construct_index_compaction_columns(RowsetWriterContext& ctx);
Expand Down
Loading

0 comments on commit 4a08bae

Please sign in to comment.