From 0a8186823637b20dbcdb027269e8f262cbee49b1 Mon Sep 17 00:00:00 2001 From: qiye Date: Mon, 11 Mar 2024 11:07:30 +0800 Subject: [PATCH] [improvement](binlog)Support inverted index in CCR (#31743) --- be/src/http/action/download_binlog_action.cpp | 39 ++++++ be/src/olap/rowset/beta_rowset.cpp | 44 +++++- be/src/olap/snapshot_manager.cpp | 53 ++++++- be/src/olap/tablet.cpp | 26 +++- be/src/olap/tablet.h | 5 + be/src/olap/tablet_manager.cpp | 17 ++- be/src/olap/task/engine_clone_task.cpp | 14 +- be/src/service/backend_service.cpp | 132 +++++++++++++++++- 8 files changed, 308 insertions(+), 22 deletions(-) diff --git a/be/src/http/action/download_binlog_action.cpp b/be/src/http/action/download_binlog_action.cpp index 697512b2a301ee..dbe2880d3b4c02 100644 --- a/be/src/http/action/download_binlog_action.cpp +++ b/be/src/http/action/download_binlog_action.cpp @@ -47,6 +47,7 @@ const std::string kTabletIdParameter = "tablet_id"; const std::string kBinlogVersionParameter = "binlog_version"; const std::string kRowsetIdParameter = "rowset_id"; const std::string kSegmentIndexParameter = "segment_index"; +const std::string kSegmentIndexIdParameter = "segment_index_id"; // get http param, if no value throw exception const auto& get_http_param(HttpRequest* req, const std::string& param_name) { @@ -130,6 +131,42 @@ void handle_get_segment_file(HttpRequest* req, bufferevent_rate_limit_group* rat do_file_response(segment_file_path, req, rate_limit_group); } +/// handle get segment index file, need tablet_id, rowset_id, segment_index && segment_index_id +void handle_get_segment_index_file(HttpRequest* req, + bufferevent_rate_limit_group* rate_limit_group) { + // Step 1: get download file path + std::string segment_index_file_path; + try { + const auto& tablet_id = get_http_param(req, kTabletIdParameter); + auto tablet = get_tablet(tablet_id); + const auto& rowset_id = get_http_param(req, kRowsetIdParameter); + const auto& segment_index = get_http_param(req, kSegmentIndexParameter); + const auto& segment_index_id = req->param(kSegmentIndexIdParameter); + segment_index_file_path = + tablet->get_segment_index_filepath(rowset_id, segment_index, segment_index_id); + } catch (const std::exception& e) { + HttpChannel::send_reply(req, HttpStatus::INTERNAL_SERVER_ERROR, e.what()); + LOG(WARNING) << "get download file path failed, error: " << e.what(); + return; + } + + // Step 2: handle download + // check file exists + bool exists = false; + Status status = io::global_local_filesystem()->exists(segment_index_file_path, &exists); + if (!status.ok()) { + HttpChannel::send_reply(req, HttpStatus::INTERNAL_SERVER_ERROR, status.to_string()); + LOG(WARNING) << "check file exists failed, error: " << status.to_string(); + return; + } + if (!exists) { + HttpChannel::send_reply(req, HttpStatus::NOT_FOUND, "file not exist."); + LOG(WARNING) << "file not exist, file path: " << segment_index_file_path; + return; + } + do_file_response(segment_index_file_path, req, rate_limit_group); +} + void handle_get_rowset_meta(HttpRequest* req) { try { const auto& tablet_id = get_http_param(req, kTabletIdParameter); @@ -183,6 +220,8 @@ void DownloadBinlogAction::handle(HttpRequest* req) { handle_get_binlog_info(req); } else if (method == "get_segment_file") { handle_get_segment_file(req, _rate_limit_group.get()); + } else if (method == "get_segment_index_file") { + handle_get_segment_index_file(req, _rate_limit_group.get()); } else if (method == "get_rowset_meta") { handle_get_rowset_meta(req); } else { diff --git a/be/src/olap/rowset/beta_rowset.cpp b/be/src/olap/rowset/beta_rowset.cpp index e3f28726c4d68a..58e495be22265c 100644 --- a/be/src/olap/rowset/beta_rowset.cpp +++ b/be/src/olap/rowset/beta_rowset.cpp @@ -439,7 +439,7 @@ Status BetaRowset::add_to_binlog() { if (fs->type() != io::FileSystemType::LOCAL) { return Status::InternalError("should be local file system"); } - io::LocalFileSystem* local_fs = static_cast(fs.get()); + auto* local_fs = static_cast(fs.get()); // all segments are in the same directory, so cache binlog_dir without multi times check std::string binlog_dir; @@ -447,6 +447,22 @@ Status BetaRowset::add_to_binlog() { auto segments_num = num_segments(); VLOG_DEBUG << fmt::format("add rowset to binlog. rowset_id={}, segments_num={}", rowset_id().to_string(), segments_num); + + Status status; + std::vector linked_success_files; + Defer remove_linked_files {[&]() { // clear linked files if errors happen + if (!status.ok()) { + LOG(WARNING) << "will delete linked success files due to error " << status; + std::vector paths; + for (auto& file : linked_success_files) { + paths.emplace_back(file); + LOG(WARNING) << "will delete linked success file " << file << " due to error"; + } + static_cast(local_fs->batch_delete(paths)); + LOG(WARNING) << "done delete linked success files due to error " << status; + } + }}; + for (int i = 0; i < segments_num; ++i) { auto seg_file = segment_file_path(i); @@ -465,8 +481,30 @@ Status BetaRowset::add_to_binlog() { .string(); VLOG_DEBUG << "link " << seg_file << " to " << binlog_file; if (!local_fs->link_file(seg_file, binlog_file).ok()) { - return Status::Error("fail to create hard link. from={}, to={}, errno={}", - seg_file, binlog_file, Errno::no()); + status = Status::Error("fail to create hard link. from={}, to={}, errno={}", + seg_file, binlog_file, Errno::no()); + return status; + } + linked_success_files.push_back(binlog_file); + + for (const auto& index : _schema->indexes()) { + if (index.index_type() != IndexType::INVERTED) { + continue; + } + auto index_id = index.index_id(); + auto index_file = InvertedIndexDescriptor::get_index_file_name( + seg_file, index_id, index.get_index_suffix()); + auto binlog_index_file = (std::filesystem::path(binlog_dir) / + std::filesystem::path(index_file).filename()) + .string(); + VLOG_DEBUG << "link " << index_file << " to " << binlog_index_file; + if (!local_fs->link_file(index_file, binlog_index_file).ok()) { + status = Status::Error( + "fail to create hard link. from={}, to={}, errno={}", index_file, + binlog_index_file, Errno::no()); + return status; + } + linked_success_files.push_back(binlog_index_file); } } diff --git a/be/src/olap/snapshot_manager.cpp b/be/src/olap/snapshot_manager.cpp index e1127fa16106be..0fa4e5918543b8 100644 --- a/be/src/olap/snapshot_manager.cpp +++ b/be/src/olap/snapshot_manager.cpp @@ -640,11 +640,39 @@ Status SnapshotManager::_create_snapshot_files(const TabletSharedPtr& ref_tablet break; } - for (auto& rowset_binlog_meta : rowset_binlog_metas_pb.rowset_binlog_metas()) { + for (const auto& rowset_binlog_meta : rowset_binlog_metas_pb.rowset_binlog_metas()) { std::string segment_file_path; auto num_segments = rowset_binlog_meta.num_segments(); std::string_view rowset_id = rowset_binlog_meta.rowset_id(); + RowsetMetaPB rowset_meta_pb; + if (!rowset_meta_pb.ParseFromString(rowset_binlog_meta.data())) { + auto err_msg = fmt::format("fail to parse binlog meta data value:{}", + rowset_binlog_meta.data()); + res = Status::InternalError(err_msg); + LOG(WARNING) << err_msg; + return res; + } + const auto& tablet_schema_pb = rowset_meta_pb.tablet_schema(); + TabletSchema tablet_schema; + tablet_schema.init_from_pb(tablet_schema_pb); + + std::vector linked_success_files; + Defer remove_linked_files {[&]() { // clear linked files if errors happen + if (!res.ok()) { + LOG(WARNING) << "will delete linked success files due to error " << res; + std::vector paths; + for (auto& file : linked_success_files) { + paths.emplace_back(file); + LOG(WARNING) + << "will delete linked success file " << file << " due to error"; + } + static_cast(io::global_local_filesystem()->batch_delete(paths)); + LOG(WARNING) << "done delete linked success files due to error " << res; + } + }}; + + // link segment files and index files for (int64_t segment_index = 0; segment_index < num_segments; ++segment_index) { segment_file_path = ref_tablet->get_segment_filepath(rowset_id, segment_index); auto snapshot_segment_file_path = @@ -657,6 +685,29 @@ Status SnapshotManager::_create_snapshot_files(const TabletSharedPtr& ref_tablet << ", dest=" << snapshot_segment_file_path << "]"; break; } + linked_success_files.push_back(snapshot_segment_file_path); + + for (const auto& index : tablet_schema.indexes()) { + if (index.index_type() != IndexType::INVERTED) { + continue; + } + auto index_id = index.index_id(); + auto index_file = ref_tablet->get_segment_index_filepath( + rowset_id, segment_index, index_id); + auto snapshot_segment_index_file_path = + fmt::format("{}/{}_{}_{}.binlog-index", schema_full_path, rowset_id, + segment_index, index_id); + VLOG_DEBUG << "link " << index_file << " to " + << snapshot_segment_index_file_path; + res = io::global_local_filesystem()->link_file( + index_file, snapshot_segment_index_file_path); + if (!res.ok()) { + LOG(WARNING) << "fail to link binlog index file. [src=" << index_file + << ", dest=" << snapshot_segment_index_file_path << "]"; + break; + } + linked_success_files.push_back(snapshot_segment_index_file_path); + } } if (!res.ok()) { diff --git a/be/src/olap/tablet.cpp b/be/src/olap/tablet.cpp index bfe31843ed1c31..2bf1cd209ca06e 100644 --- a/be/src/olap/tablet.cpp +++ b/be/src/olap/tablet.cpp @@ -3572,6 +3572,19 @@ std::string Tablet::get_segment_filepath(std::string_view rowset_id, int64_t seg return fmt::format("{}/_binlog/{}_{}.dat", _tablet_path, rowset_id, segment_index); } +std::string Tablet::get_segment_index_filepath(std::string_view rowset_id, + std::string_view segment_index, + std::string_view index_id) const { + // TODO(qiye): support inverted index file format v2, when https://github.com/apache/doris/pull/30145 is merged + return fmt::format("{}/_binlog/{}_{}_{}.idx", _tablet_path, rowset_id, segment_index, index_id); +} + +std::string Tablet::get_segment_index_filepath(std::string_view rowset_id, int64_t segment_index, + int64_t index_id) const { + // TODO(qiye): support inverted index file format v2, when https://github.com/apache/doris/pull/30145 is merged + return fmt::format("{}/_binlog/{}_{}_{}.idx", _tablet_path, rowset_id, segment_index, index_id); +} + std::vector Tablet::get_binlog_filepath(std::string_view binlog_version) const { const auto& [rowset_id, num_segments] = get_binlog_info(binlog_version); std::vector binlog_filepath; @@ -3601,7 +3614,6 @@ void Tablet::gc_binlogs(int64_t version) { const auto& tablet_uid = this->tablet_uid(); const auto tablet_id = this->tablet_id(); - const auto& tablet_path = this->tablet_path(); std::string begin_key = make_binlog_meta_key_prefix(tablet_uid); std::string end_key = make_binlog_meta_key_prefix(tablet_uid, version + 1); LOG(INFO) << fmt::format("gc binlog meta, tablet_id:{}, begin_key:{}, end_key:{}", tablet_id, @@ -3615,10 +3627,16 @@ void Tablet::gc_binlogs(int64_t version) { wait_for_deleted_binlog_keys.emplace_back(key); wait_for_deleted_binlog_keys.push_back(get_binlog_data_key_from_meta_key(key)); + // add binlog segment files and index files for (int64_t i = 0; i < num_segments; ++i) { - auto segment_file = fmt::format("{}_{}.dat", rowset_id, i); - wait_for_deleted_binlog_files.emplace_back( - fmt::format("{}/_binlog/{}", tablet_path, segment_file)); + wait_for_deleted_binlog_files.emplace_back(get_segment_filepath(rowset_id, i)); + for (const auto& index : this->tablet_schema()->indexes()) { + if (index.index_type() != IndexType::INVERTED) { + continue; + } + wait_for_deleted_binlog_files.emplace_back( + get_segment_index_filepath(rowset_id, i, index.index_id())); + } } }; diff --git a/be/src/olap/tablet.h b/be/src/olap/tablet.h index 22721a835a3ba3..1784e006d71468 100644 --- a/be/src/olap/tablet.h +++ b/be/src/olap/tablet.h @@ -519,6 +519,11 @@ class Tablet final : public BaseTablet { std::string get_segment_filepath(std::string_view rowset_id, std::string_view segment_index) const; std::string get_segment_filepath(std::string_view rowset_id, int64_t segment_index) const; + std::string get_segment_index_filepath(std::string_view rowset_id, + std::string_view segment_index, + std::string_view index_id) const; + std::string get_segment_index_filepath(std::string_view rowset_id, int64_t segment_index, + int64_t index_id) const; bool can_add_binlog(uint64_t total_binlog_size) const; void gc_binlogs(int64_t version); Status ingest_binlog_metas(RowsetBinlogMetasPB* metas_pb); diff --git a/be/src/olap/tablet_manager.cpp b/be/src/olap/tablet_manager.cpp index baafa7f3c707cf..5293a13bf84bc7 100644 --- a/be/src/olap/tablet_manager.cpp +++ b/be/src/olap/tablet_manager.cpp @@ -944,19 +944,28 @@ Status TabletManager::load_tablet_from_dir(DataDir* store, TTabletId tablet_id, io::global_local_filesystem()->list(schema_hash_path, true, &files, &exists)); for (auto& file : files) { auto& filename = file.file_name; - if (!filename.ends_with(".binlog")) { + std::string new_suffix; + std::string old_suffix; + + if (filename.ends_with(".binlog")) { + old_suffix = ".binlog"; + new_suffix = ".dat"; + } else if (filename.ends_with(".binlog-index")) { + old_suffix = ".binlog-index"; + new_suffix = ".idx"; + } else { continue; } - // change clone_file suffix .binlog to .dat std::string new_filename = filename; - new_filename.replace(filename.size() - 7, 7, ".dat"); + new_filename.replace(filename.size() - old_suffix.size(), old_suffix.size(), + new_suffix); auto from = fmt::format("{}/{}", schema_hash_path, filename); auto to = fmt::format("{}/_binlog/{}", schema_hash_path, new_filename); RETURN_IF_ERROR(io::global_local_filesystem()->rename(from, to)); } - auto meta = store->get_meta(); + auto* meta = store->get_meta(); // if ingest binlog metas error, it will be gc in gc_unused_binlog_metas RETURN_IF_ERROR( RowsetMetaManager::ingest_binlog_metas(meta, tablet_uid, &rowset_binlog_metas_pb)); diff --git a/be/src/olap/task/engine_clone_task.cpp b/be/src/olap/task/engine_clone_task.cpp index b0b2e3e3873d3a..571b3208aec5af 100644 --- a/be/src/olap/task/engine_clone_task.cpp +++ b/be/src/olap/task/engine_clone_task.cpp @@ -83,10 +83,16 @@ namespace { /// return value: if binlog file not exist, then return to binlog file path Result check_dest_binlog_valid(const std::string& tablet_dir, const std::string& clone_file, bool* skip_link_file) { - // change clone_file suffix .binlog to .dat + std::string to; std::string new_clone_file = clone_file; - new_clone_file.replace(clone_file.size() - 7, 7, ".dat"); - auto to = fmt::format("{}/_binlog/{}", tablet_dir, new_clone_file); + if (clone_file.ends_with(".binlog")) { + // change clone_file suffix from .binlog to .dat + new_clone_file.replace(clone_file.size() - 7, 7, ".dat"); + } else if (clone_file.ends_with(".binlog-index")) { + // change clone_file suffix from .binlog-index to .idx + new_clone_file.replace(clone_file.size() - 13, 13, ".idx"); + } + to = fmt::format("{}/_binlog/{}", tablet_dir, new_clone_file); // check to to file exist bool exists = true; @@ -671,7 +677,7 @@ Status EngineCloneTask::_finish_clone(Tablet* tablet, const std::string& clone_d auto from = fmt::format("{}/{}", clone_dir, clone_file); std::string to; - if (clone_file.ends_with(".binlog")) { + if (clone_file.ends_with(".binlog") || clone_file.ends_with(".binlog-index")) { if (!contain_binlog) { LOG(WARNING) << "clone binlog file, but not contain binlog metas. " << "tablet=" << tablet->tablet_id() << ", clone_file=" << clone_file; diff --git a/be/src/service/backend_service.cpp b/be/src/service/backend_service.cpp index 643588c8e333b9..d7a5c8584cd1ea 100644 --- a/be/src/service/backend_service.cpp +++ b/be/src/service/backend_service.cpp @@ -104,12 +104,22 @@ void _ingest_binlog(IngestBinlogArg* arg) { auto& request = arg->request; TStatus tstatus; + std::vector download_success_files; Defer defer {[=, &tstatus, ingest_binlog_tstatus = arg->tstatus]() { LOG(INFO) << "ingest binlog. result: " << apache::thrift::ThriftDebugString(tstatus); if (tstatus.status_code != TStatusCode::OK) { // abort txn StorageEngine::instance()->txn_manager()->abort_txn(partition_id, txn_id, local_tablet_id, local_tablet_uid); + // delete all successfully downloaded files + LOG(WARNING) << "will delete downloaded success files due to error " << tstatus; + std::vector paths; + for (const auto& file : download_success_files) { + paths.emplace_back(file); + LOG(WARNING) << "will delete downloaded success file " << file << " due to error"; + } + static_cast(io::global_local_filesystem()->batch_delete(paths)); + LOG(WARNING) << "done delete downloaded success files due to error " << tstatus; } if (ingest_binlog_tstatus) { @@ -226,7 +236,7 @@ void _ingest_binlog(IngestBinlogArg* arg) { // Step 5.2: check data capacity uint64_t total_size = std::accumulate(segment_file_sizes.begin(), segment_file_sizes.end(), - 0); // NOLINT(bugprone-fold-init-type) + 0ULL); // NOLINT(bugprone-fold-init-type) if (!local_tablet->can_add_binlog(total_size)) { LOG(WARNING) << "failed to add binlog, no enough space, total_size=" << total_size << ", tablet=" << local_tablet->tablet_id(); @@ -251,10 +261,11 @@ void _ingest_binlog(IngestBinlogArg* arg) { LOG(INFO) << fmt::format("download segment file from {} to {}", get_segment_file_url, local_segment_path); auto get_segment_file_cb = [&get_segment_file_url, &local_segment_path, segment_file_size, - estimate_timeout](HttpClient* client) { + estimate_timeout, &download_success_files](HttpClient* client) { RETURN_IF_ERROR(client->init(get_segment_file_url)); client->set_timeout_ms(estimate_timeout * 1000); RETURN_IF_ERROR(client->download(local_segment_path)); + download_success_files.push_back(local_segment_path); std::error_code ec; // Check file length @@ -284,8 +295,117 @@ void _ingest_binlog(IngestBinlogArg* arg) { } } - // Step 6: create rowset && calculate delete bitmap && commit - // Step 6.1: create rowset + // Step 6: get all segment index files + // Step 6.1: get all segment index files size + std::vector segment_index_file_urls; + std::vector segment_index_file_sizes; + std::vector segment_index_file_names; + auto tablet_schema = rowset_meta->tablet_schema(); + for (const auto& index : tablet_schema->indexes()) { + if (index.index_type() != IndexType::INVERTED) { + continue; + } + auto index_id = index.index_id(); + for (int64_t segment_index = 0; segment_index < num_segments; ++segment_index) { + auto get_segment_index_file_size_url = fmt::format( + "{}?method={}&tablet_id={}&rowset_id={}&segment_index={}&segment_index_id={" + "}", + binlog_api_url, "get_segment_index_file", request.remote_tablet_id, + remote_rowset_id, segment_index, index_id); + uint64_t segment_index_file_size; + auto get_segment_index_file_size_cb = [&get_segment_index_file_size_url, + &segment_index_file_size](HttpClient* client) { + RETURN_IF_ERROR(client->init(get_segment_index_file_size_url)); + client->set_timeout_ms(kMaxTimeoutMs); + RETURN_IF_ERROR(client->head()); + return client->get_content_length(&segment_index_file_size); + }; + auto index_file = InvertedIndexDescriptor::inverted_index_file_path( + local_tablet->tablet_path(), rowset_meta->rowset_id(), segment_index, index_id, + index.get_index_suffix()); + segment_index_file_names.push_back(index_file); + + status = HttpClient::execute_with_retry(max_retry, 1, get_segment_index_file_size_cb); + if (!status.ok()) { + LOG(WARNING) << "failed to get segment file size from " + << get_segment_index_file_size_url + << ", status=" << status.to_string(); + status.to_thrift(&tstatus); + return; + } + + segment_index_file_sizes.push_back(segment_index_file_size); + segment_index_file_urls.push_back(std::move(get_segment_index_file_size_url)); + } + } + + // Step 6.2: check data capacity + uint64_t total_index_size = + std::accumulate(segment_index_file_sizes.begin(), segment_index_file_sizes.end(), + 0ULL); // NOLINT(bugprone-fold-init-type) + if (!local_tablet->can_add_binlog(total_index_size)) { + LOG(WARNING) << "failed to add binlog, no enough space, total_index_size=" + << total_index_size << ", tablet=" << local_tablet->tablet_id(); + status = Status::InternalError("no enough space"); + status.to_thrift(&tstatus); + return; + } + + // Step 6.3: get all segment index files + DCHECK(segment_index_file_sizes.size() == segment_index_file_names.size()); + DCHECK(segment_index_file_names.size() == segment_index_file_urls.size()); + for (int64_t i = 0; i < segment_index_file_urls.size(); ++i) { + auto segment_index_file_size = segment_index_file_sizes[i]; + auto get_segment_index_file_url = segment_index_file_urls[i]; + + uint64_t estimate_timeout = + segment_index_file_size / config::download_low_speed_limit_kbps / 1024; + if (estimate_timeout < config::download_low_speed_time) { + estimate_timeout = config::download_low_speed_time; + } + + auto local_segment_index_path = segment_index_file_names[i]; + LOG(INFO) << fmt::format("download segment index file from {} to {}", + get_segment_index_file_url, local_segment_index_path); + auto get_segment_index_file_cb = [&get_segment_index_file_url, &local_segment_index_path, + segment_index_file_size, estimate_timeout, + &download_success_files](HttpClient* client) { + RETURN_IF_ERROR(client->init(get_segment_index_file_url)); + client->set_timeout_ms(estimate_timeout * 1000); + RETURN_IF_ERROR(client->download(local_segment_index_path)); + download_success_files.push_back(local_segment_index_path); + + std::error_code ec; + // Check file length + uint64_t local_index_file_size = + std::filesystem::file_size(local_segment_index_path, ec); + if (ec) { + LOG(WARNING) << "download index file error" << ec.message(); + return Status::IOError("can't retrive file_size of {}, due to {}", + local_segment_index_path, ec.message()); + } + if (local_index_file_size != segment_index_file_size) { + LOG(WARNING) << "download index file length error" + << ", get_segment_index_file_url=" << get_segment_index_file_url + << ", index_file_size=" << segment_index_file_size + << ", local_index_file_size=" << local_index_file_size; + return Status::InternalError("downloaded index file size is not equal"); + } + return io::global_local_filesystem()->permission(local_segment_index_path, + io::LocalFileSystem::PERMS_OWNER_RW); + }; + + status = HttpClient::execute_with_retry(max_retry, 1, get_segment_index_file_cb); + if (!status.ok()) { + LOG(WARNING) << "failed to get segment index file from " << get_segment_index_file_url + << ", status=" << status.to_string(); + status.to_thrift(&tstatus); + return; + } + } + + // Step 7: create rowset && calculate delete bitmap && commit + // Step 7.1: create rowset RowsetSharedPtr rowset; status = RowsetFactory::create_rowset(local_tablet->tablet_schema(), local_tablet->tablet_path(), rowset_meta, &rowset); @@ -300,7 +420,7 @@ void _ingest_binlog(IngestBinlogArg* arg) { return; } - // Step 6.2 calculate delete bitmap before commit + // Step 7.2 calculate delete bitmap before commit auto calc_delete_bitmap_token = StorageEngine::instance()->calc_delete_bitmap_executor()->create_token(); DeleteBitmapPtr delete_bitmap = std::make_shared(local_tablet_id); @@ -336,7 +456,7 @@ void _ingest_binlog(IngestBinlogArg* arg) { static_cast(calc_delete_bitmap_token->wait()); } - // Step 6.3: commit txn + // Step 7.3: commit txn Status commit_txn_status = StorageEngine::instance()->txn_manager()->commit_txn( local_tablet->data_dir()->get_meta(), rowset_meta->partition_id(), rowset_meta->txn_id(), rowset_meta->tablet_id(), local_tablet->tablet_uid(),