Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[improvement](binlog)Support inverted index in CCR #31743

Merged
merged 5 commits into from
Mar 11, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
39 changes: 39 additions & 0 deletions be/src/http/action/download_binlog_action.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@ const std::string kTabletIdParameter = "tablet_id";
const std::string kBinlogVersionParameter = "binlog_version";
const std::string kRowsetIdParameter = "rowset_id";
const std::string kSegmentIndexParameter = "segment_index";
const std::string kSegmentIndexIdParameter = "segment_index_id";

// get http param, if no value throw exception
const auto& get_http_param(HttpRequest* req, const std::string& param_name) {
Expand Down Expand Up @@ -131,6 +132,42 @@ void handle_get_segment_file(StorageEngine& engine, HttpRequest* req,
do_file_response(segment_file_path, req, rate_limit_group);
}

/// handle get segment index file, need tablet_id, rowset_id, segment_index && segment_index_id
void handle_get_segment_index_file(StorageEngine& engine, HttpRequest* req,
bufferevent_rate_limit_group* rate_limit_group) {
// Step 1: get download file path
std::string segment_index_file_path;
try {
const auto& tablet_id = get_http_param(req, kTabletIdParameter);
auto tablet = get_tablet(engine, tablet_id);
const auto& rowset_id = get_http_param(req, kRowsetIdParameter);
const auto& segment_index = get_http_param(req, kSegmentIndexParameter);
const auto& segment_index_id = req->param(kSegmentIndexIdParameter);
segment_index_file_path =
tablet->get_segment_index_filepath(rowset_id, segment_index, segment_index_id);
} catch (const std::exception& e) {
HttpChannel::send_reply(req, HttpStatus::INTERNAL_SERVER_ERROR, e.what());
LOG(WARNING) << "get download file path failed, error: " << e.what();
return;
}

// Step 2: handle download
// check file exists
bool exists = false;
Status status = io::global_local_filesystem()->exists(segment_index_file_path, &exists);
if (!status.ok()) {
HttpChannel::send_reply(req, HttpStatus::INTERNAL_SERVER_ERROR, status.to_string());
LOG(WARNING) << "check file exists failed, error: " << status.to_string();
return;
}
if (!exists) {
HttpChannel::send_reply(req, HttpStatus::NOT_FOUND, "file not exist.");
LOG(WARNING) << "file not exist, file path: " << segment_index_file_path;
return;
}
do_file_response(segment_index_file_path, req, rate_limit_group);
}

void handle_get_rowset_meta(StorageEngine& engine, HttpRequest* req) {
try {
const auto& tablet_id = get_http_param(req, kTabletIdParameter);
Expand Down Expand Up @@ -185,6 +222,8 @@ void DownloadBinlogAction::handle(HttpRequest* req) {
handle_get_binlog_info(_engine, req);
} else if (method == "get_segment_file") {
handle_get_segment_file(_engine, req, _rate_limit_group.get());
} else if (method == "get_segment_index_file") {
handle_get_segment_index_file(_engine, req, _rate_limit_group.get());
} else if (method == "get_rowset_meta") {
handle_get_rowset_meta(_engine, req);
} else {
Expand Down
44 changes: 41 additions & 3 deletions be/src/olap/rowset/beta_rowset.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -439,14 +439,30 @@ Status BetaRowset::add_to_binlog() {
if (fs->type() != io::FileSystemType::LOCAL) {
return Status::InternalError("should be local file system");
}
io::LocalFileSystem* local_fs = static_cast<io::LocalFileSystem*>(fs.get());
auto* local_fs = static_cast<io::LocalFileSystem*>(fs.get());

// all segments are in the same directory, so cache binlog_dir without multi times check
std::string binlog_dir;

auto segments_num = num_segments();
VLOG_DEBUG << fmt::format("add rowset to binlog. rowset_id={}, segments_num={}",
rowset_id().to_string(), segments_num);

Status status;
std::vector<string> linked_success_files;
Defer remove_linked_files {[&]() { // clear linked files if errors happen
if (!status.ok()) {
LOG(WARNING) << "will delete linked success files due to error " << status;
std::vector<io::Path> paths;
for (auto& file : linked_success_files) {
paths.emplace_back(file);
LOG(WARNING) << "will delete linked success file " << file << " due to error";
}
static_cast<void>(local_fs->batch_delete(paths));
LOG(WARNING) << "done delete linked success files due to error " << status;
}
}};

for (int i = 0; i < segments_num; ++i) {
auto seg_file = segment_file_path(i);

Expand All @@ -465,8 +481,30 @@ Status BetaRowset::add_to_binlog() {
.string();
VLOG_DEBUG << "link " << seg_file << " to " << binlog_file;
if (!local_fs->link_file(seg_file, binlog_file).ok()) {
return Status::Error<OS_ERROR>("fail to create hard link. from={}, to={}, errno={}",
seg_file, binlog_file, Errno::no());
status = Status::Error<OS_ERROR>("fail to create hard link. from={}, to={}, errno={}",
seg_file, binlog_file, Errno::no());
return status;
}
linked_success_files.push_back(binlog_file);

for (const auto& index : _schema->indexes()) {
if (index.index_type() != IndexType::INVERTED) {
continue;
}
auto index_id = index.index_id();
auto index_file = InvertedIndexDescriptor::get_index_file_name(
seg_file, index_id, index.get_index_suffix());
auto binlog_index_file = (std::filesystem::path(binlog_dir) /
std::filesystem::path(index_file).filename())
.string();
VLOG_DEBUG << "link " << index_file << " to " << binlog_index_file;
if (!local_fs->link_file(index_file, binlog_index_file).ok()) {
status = Status::Error<OS_ERROR>(
"fail to create hard link. from={}, to={}, errno={}", index_file,
binlog_index_file, Errno::no());
return status;
}
linked_success_files.push_back(binlog_index_file);
}
}

Expand Down
53 changes: 52 additions & 1 deletion be/src/olap/snapshot_manager.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -613,11 +613,39 @@ Status SnapshotManager::_create_snapshot_files(const TabletSharedPtr& ref_tablet
break;
}

for (auto& rowset_binlog_meta : rowset_binlog_metas_pb.rowset_binlog_metas()) {
for (const auto& rowset_binlog_meta : rowset_binlog_metas_pb.rowset_binlog_metas()) {
std::string segment_file_path;
auto num_segments = rowset_binlog_meta.num_segments();
std::string_view rowset_id = rowset_binlog_meta.rowset_id();

RowsetMetaPB rowset_meta_pb;
if (!rowset_meta_pb.ParseFromString(rowset_binlog_meta.data())) {
auto err_msg = fmt::format("fail to parse binlog meta data value:{}",
rowset_binlog_meta.data());
res = Status::InternalError(err_msg);
LOG(WARNING) << err_msg;
return res;
}
const auto& tablet_schema_pb = rowset_meta_pb.tablet_schema();
TabletSchema tablet_schema;
tablet_schema.init_from_pb(tablet_schema_pb);

std::vector<string> linked_success_files;
Defer remove_linked_files {[&]() { // clear linked files if errors happen
if (!res.ok()) {
LOG(WARNING) << "will delete linked success files due to error " << res;
std::vector<io::Path> paths;
for (auto& file : linked_success_files) {
paths.emplace_back(file);
LOG(WARNING)
<< "will delete linked success file " << file << " due to error";
}
static_cast<void>(io::global_local_filesystem()->batch_delete(paths));
LOG(WARNING) << "done delete linked success files due to error " << res;
}
}};

// link segment files and index files
for (int64_t segment_index = 0; segment_index < num_segments; ++segment_index) {
segment_file_path = ref_tablet->get_segment_filepath(rowset_id, segment_index);
auto snapshot_segment_file_path =
Expand All @@ -630,6 +658,29 @@ Status SnapshotManager::_create_snapshot_files(const TabletSharedPtr& ref_tablet
<< ", dest=" << snapshot_segment_file_path << "]";
break;
}
linked_success_files.push_back(snapshot_segment_file_path);

for (const auto& index : tablet_schema.indexes()) {
if (index.index_type() != IndexType::INVERTED) {
continue;
}
auto index_id = index.index_id();
auto index_file = ref_tablet->get_segment_index_filepath(
rowset_id, segment_index, index_id);
auto snapshot_segment_index_file_path =
fmt::format("{}/{}_{}_{}.binlog-index", schema_full_path, rowset_id,
segment_index, index_id);
VLOG_DEBUG << "link " << index_file << " to "
<< snapshot_segment_index_file_path;
res = io::global_local_filesystem()->link_file(
index_file, snapshot_segment_index_file_path);
if (!res.ok()) {
LOG(WARNING) << "fail to link binlog index file. [src=" << index_file
<< ", dest=" << snapshot_segment_index_file_path << "]";
break;
}
linked_success_files.push_back(snapshot_segment_index_file_path);
}
}

if (!res.ok()) {
Expand Down
26 changes: 22 additions & 4 deletions be/src/olap/tablet.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -2657,6 +2657,19 @@ std::string Tablet::get_segment_filepath(std::string_view rowset_id, int64_t seg
return fmt::format("{}/_binlog/{}_{}.dat", _tablet_path, rowset_id, segment_index);
}

std::string Tablet::get_segment_index_filepath(std::string_view rowset_id,
std::string_view segment_index,
std::string_view index_id) const {
// TODO(qiye): support inverted index file format v2, when https://github.com/apache/doris/pull/30145 is merged
return fmt::format("{}/_binlog/{}_{}_{}.idx", _tablet_path, rowset_id, segment_index, index_id);
}

std::string Tablet::get_segment_index_filepath(std::string_view rowset_id, int64_t segment_index,
int64_t index_id) const {
// TODO(qiye): support inverted index file format v2, when https://github.com/apache/doris/pull/30145 is merged
return fmt::format("{}/_binlog/{}_{}_{}.idx", _tablet_path, rowset_id, segment_index, index_id);
}

std::vector<std::string> Tablet::get_binlog_filepath(std::string_view binlog_version) const {
const auto& [rowset_id, num_segments] = get_binlog_info(binlog_version);
std::vector<std::string> binlog_filepath;
Expand Down Expand Up @@ -2686,7 +2699,6 @@ void Tablet::gc_binlogs(int64_t version) {

const auto& tablet_uid = this->tablet_uid();
const auto tablet_id = this->tablet_id();
const auto& tablet_path = this->tablet_path();
std::string begin_key = make_binlog_meta_key_prefix(tablet_uid);
std::string end_key = make_binlog_meta_key_prefix(tablet_uid, version + 1);
LOG(INFO) << fmt::format("gc binlog meta, tablet_id:{}, begin_key:{}, end_key:{}", tablet_id,
Expand All @@ -2700,10 +2712,16 @@ void Tablet::gc_binlogs(int64_t version) {
wait_for_deleted_binlog_keys.emplace_back(key);
wait_for_deleted_binlog_keys.push_back(get_binlog_data_key_from_meta_key(key));

// add binlog segment files and index files
for (int64_t i = 0; i < num_segments; ++i) {
auto segment_file = fmt::format("{}_{}.dat", rowset_id, i);
wait_for_deleted_binlog_files.emplace_back(
fmt::format("{}/_binlog/{}", tablet_path, segment_file));
wait_for_deleted_binlog_files.emplace_back(get_segment_filepath(rowset_id, i));
for (const auto& index : this->tablet_schema()->indexes()) {
if (index.index_type() != IndexType::INVERTED) {
continue;
}
wait_for_deleted_binlog_files.emplace_back(
get_segment_index_filepath(rowset_id, i, index.index_id()));
}
}
};

Expand Down
5 changes: 5 additions & 0 deletions be/src/olap/tablet.h
Original file line number Diff line number Diff line change
Expand Up @@ -422,6 +422,11 @@ class Tablet final : public BaseTablet {
std::string get_segment_filepath(std::string_view rowset_id,
std::string_view segment_index) const;
std::string get_segment_filepath(std::string_view rowset_id, int64_t segment_index) const;
std::string get_segment_index_filepath(std::string_view rowset_id,
std::string_view segment_index,
std::string_view index_id) const;
std::string get_segment_index_filepath(std::string_view rowset_id, int64_t segment_index,
int64_t index_id) const;
bool can_add_binlog(uint64_t total_binlog_size) const;
void gc_binlogs(int64_t version);
Status ingest_binlog_metas(RowsetBinlogMetasPB* metas_pb);
Expand Down
17 changes: 13 additions & 4 deletions be/src/olap/tablet_manager.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -943,19 +943,28 @@ Status TabletManager::load_tablet_from_dir(DataDir* store, TTabletId tablet_id,
io::global_local_filesystem()->list(schema_hash_path, true, &files, &exists));
for (auto& file : files) {
auto& filename = file.file_name;
if (!filename.ends_with(".binlog")) {
std::string new_suffix;
std::string old_suffix;

if (filename.ends_with(".binlog")) {
old_suffix = ".binlog";
new_suffix = ".dat";
} else if (filename.ends_with(".binlog-index")) {
old_suffix = ".binlog-index";
new_suffix = ".idx";
} else {
continue;
}

// change clone_file suffix .binlog to .dat
std::string new_filename = filename;
new_filename.replace(filename.size() - 7, 7, ".dat");
new_filename.replace(filename.size() - old_suffix.size(), old_suffix.size(),
new_suffix);
auto from = fmt::format("{}/{}", schema_hash_path, filename);
auto to = fmt::format("{}/_binlog/{}", schema_hash_path, new_filename);
RETURN_IF_ERROR(io::global_local_filesystem()->rename(from, to));
}

auto meta = store->get_meta();
auto* meta = store->get_meta();
// if ingest binlog metas error, it will be gc in gc_unused_binlog_metas
RETURN_IF_ERROR(
RowsetMetaManager::ingest_binlog_metas(meta, tablet_uid, &rowset_binlog_metas_pb));
Expand Down
14 changes: 10 additions & 4 deletions be/src/olap/task/engine_clone_task.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -83,10 +83,16 @@ namespace {
/// return value: if binlog file not exist, then return to binlog file path
Result<std::string> check_dest_binlog_valid(const std::string& tablet_dir,
const std::string& clone_file, bool* skip_link_file) {
// change clone_file suffix .binlog to .dat
std::string to;
std::string new_clone_file = clone_file;
new_clone_file.replace(clone_file.size() - 7, 7, ".dat");
auto to = fmt::format("{}/_binlog/{}", tablet_dir, new_clone_file);
if (clone_file.ends_with(".binlog")) {
// change clone_file suffix from .binlog to .dat
new_clone_file.replace(clone_file.size() - 7, 7, ".dat");
} else if (clone_file.ends_with(".binlog-index")) {
// change clone_file suffix from .binlog-index to .idx
new_clone_file.replace(clone_file.size() - 13, 13, ".idx");
}
to = fmt::format("{}/_binlog/{}", tablet_dir, new_clone_file);

// check to to file exist
bool exists = true;
Expand Down Expand Up @@ -670,7 +676,7 @@ Status EngineCloneTask::_finish_clone(Tablet* tablet, const std::string& clone_d

auto from = fmt::format("{}/{}", clone_dir, clone_file);
std::string to;
if (clone_file.ends_with(".binlog")) {
if (clone_file.ends_with(".binlog") || clone_file.ends_with(".binlog-index")) {
if (!contain_binlog) {
LOG(WARNING) << "clone binlog file, but not contain binlog metas. "
<< "tablet=" << tablet->tablet_id() << ", clone_file=" << clone_file;
Expand Down
Loading
Loading