From c257c3c1ab2d21d127863d1f4bb3c54850d6261b Mon Sep 17 00:00:00 2001 From: Gavin Chou Date: Wed, 25 Dec 2024 18:29:05 +0800 Subject: [PATCH] [fix](recycler) Fix premature exit recycling when there is an invalid storage vault --- cloud/src/recycler/recycler.cpp | 110 ++++++++++++++++++++------------ 1 file changed, 69 insertions(+), 41 deletions(-) diff --git a/cloud/src/recycler/recycler.cpp b/cloud/src/recycler/recycler.cpp index 04476704bd36c7..1368624d7f3303 100644 --- a/cloud/src/recycler/recycler.cpp +++ b/cloud/src/recycler/recycler.cpp @@ -249,8 +249,8 @@ void Recycler::recycle_callback() { auto instance_recycler = std::make_shared( txn_kv_, instance, _thread_pool_group, txn_lazy_committer_); - if (instance_recycler->init() != 0) { - LOG(WARNING) << "failed to init instance recycler, instance_id=" << instance_id; + if (int r = instance_recycler->init(); r != 0) { + LOG(WARNING) << "failed to init instance recycler, instance_id=" << instance_id << " ret=" << r; continue; } std::string recycle_job_key; @@ -258,6 +258,7 @@ void Recycler::recycle_callback() { int ret = prepare_instance_recycle_job(txn_kv_.get(), recycle_job_key, instance_id, ip_port_, config::recycle_interval_seconds * 1000); if (ret != 0) { // Prepare failed + LOG(WARNING) << "failed to prepare recycle_job, instance_id=" << instance_id << " ret=" << ret; continue; } else { std::lock_guard lock(mtx_); @@ -276,7 +277,8 @@ void Recycler::recycle_callback() { std::lock_guard lock(mtx_); recycling_instance_map_.erase(instance_id); } - LOG_INFO("finish recycle instance").tag("instance_id", instance_id); + auto elpased_ms = ctime_ms - duration_cast(system_clock::now().time_since_epoch()).count(); + LOG_INFO("finish recycle instance").tag("instance_id", instance_id).tag("cost_ms", elpased_ms); } } @@ -529,8 +531,9 @@ int InstanceRecycler::init_storage_vault_accessors() { int ret = accessor->init(); if (ret != 0) { LOG(WARNING) << "failed to init hdfs accessor. instance_id=" << instance_id_ - << " resource_id=" << vault.id() << " name=" << vault.name(); - return ret; + << " resource_id=" << vault.id() << " name=" << vault.name() + << " hdfs_vault=" << vault.hdfs_info().DebugString(); + continue; } accessor_map_.emplace(vault.id(), std::move(accessor)); @@ -540,16 +543,18 @@ int InstanceRecycler::init_storage_vault_accessors() { #else auto s3_conf = S3Conf::from_obj_store_info(vault.obj_info()); if (!s3_conf) { - LOG(WARNING) << "failed to init object accessor, instance_id=" << instance_id_; - return -1; + LOG(WARNING) << "failed to init object accessor, invalid conf, instance_id=" + << instance_id_ << " s3_vault=" << vault.obj_info().DebugString(); + continue; } std::shared_ptr accessor; int ret = S3Accessor::create(std::move(*s3_conf), &accessor); if (ret != 0) { LOG(WARNING) << "failed to init s3 accessor. instance_id=" << instance_id_ - << " resource_id=" << vault.id() << " name=" << vault.name(); - return ret; + << " resource_id=" << vault.id() << " name=" << vault.name() + << " ret=" << ret << " s3_vault=" << vault.obj_info().DebugString(); + return continue; } #endif @@ -562,6 +567,13 @@ int InstanceRecycler::init_storage_vault_accessors() { return -1; } + if (accessor_map_.empty()) { + LOG(WARNING) << "no accessors for instance=" << instance_id_; + return -2; + } + LOG_INFO("finish init instance recycler number_accessors={} instance=", accessor_map_.size(), + instance_id_); + return 0; } @@ -1461,7 +1473,8 @@ int InstanceRecycler::delete_rowset_data(const std::vector int { + DCHECK(accessor_map_.count(*rid)) << "uninitilized accessor, instance_id=" << instance_id_ << " resource_id=" << resource_id << " path[0]=" << (*paths)[0]; auto& accessor = accessor_map_[*rid]; - DCHECK(accessor); return accessor->delete_files(*paths); }); } @@ -1577,6 +1590,8 @@ int InstanceRecycler::delete_rowset_data(const std::string& resource_id, int64_t LOG_WARNING("instance has no such resource id") .tag("instance_id", instance_id_) .tag("resource_id", resource_id); + .tag("tablet_id", tablet_id); + .tag("rowset_id", rowset_id); return -1; } auto& accessor = it->second; @@ -1588,42 +1603,24 @@ int InstanceRecycler::recycle_tablet(int64_t tablet_id) { .tag("instance_id", instance_id_) .tag("tablet_id", tablet_id); + int ret = 0; auto start_time = steady_clock::now(); - std::unique_ptr> defer_log_statistics((int*)0x01, [&](int*) { - auto cost = duration(steady_clock::now() - start_time).count(); - LOG_INFO("recycle the rowsets of dropped tablet finished, cost={}s", cost) - .tag("instance_id", instance_id_) - .tag("tablet_id", tablet_id); - }); - - // delete all rowset kv in this tablet + // collect resource ids std::string rs_key0 = meta_rowset_key({instance_id_, tablet_id, 0}); std::string rs_key1 = meta_rowset_key({instance_id_, tablet_id + 1, 0}); std::string recyc_rs_key0 = recycle_rowset_key({instance_id_, tablet_id, ""}); std::string recyc_rs_key1 = recycle_rowset_key({instance_id_, tablet_id + 1, ""}); - int ret = 0; - std::unique_ptr txn; - if (txn_kv_->create_txn(&txn) != TxnErrorCode::TXN_OK) { - LOG(WARNING) << "failed to delete rowset kv of tablet " << tablet_id; - ret = -1; - } - txn->remove(rs_key0, rs_key1); - txn->remove(recyc_rs_key0, recyc_rs_key1); + std::set resource_ids; - // remove delete bitmap for MoW table - std::string pending_key = meta_pending_delete_bitmap_key({instance_id_, tablet_id}); - txn->remove(pending_key); - std::string delete_bitmap_start = meta_delete_bitmap_key({instance_id_, tablet_id, "", 0, 0}); - std::string delete_bitmap_end = meta_delete_bitmap_key({instance_id_, tablet_id + 1, "", 0, 0}); - txn->remove(delete_bitmap_start, delete_bitmap_end); - - TxnErrorCode err = txn->commit(); - if (err != TxnErrorCode::TXN_OK) { - LOG(WARNING) << "failed to delete rowset kv of tablet " << tablet_id << ", err=" << err; - ret = -1; - } + std::unique_ptr> defer_log_statistics((int*)0x01, [&](int*) { + auto cost = duration(steady_clock::now() - start_time).count(); + LOG_INFO("recycle the rowsets of dropped tablet finished, cost={}s", cost) + .tag("instance_id", instance_id_) + .tag("tablet_id", tablet_id); + .tag("ret", ret); + }); SyncExecutor concurrent_delete_executor( _thread_pool_group.s3_producer_pool, @@ -1631,16 +1628,21 @@ int InstanceRecycler::recycle_tablet(int64_t tablet_id) { [](const int& ret) { return ret != 0; }); // delete all rowset data in this tablet - for (auto& [_, accessor] : accessor_map_) { + // ATTN: there may be data leak if not all accessor initilized successfully + // partial data deleted if the tablet is stored cross-storage vault + // vault id is not attached to TabletMeta... + for (auto& [_, accessor] : accessor_map_) { concurrent_delete_executor.add([&, accessor_ptr = &accessor]() { if ((*accessor_ptr)->delete_directory(tablet_path_prefix(tablet_id)) != 0) { LOG(WARNING) << "failed to delete rowset data of tablet " << tablet_id - << " s3_path=" << accessor->uri(); + << " path=" << accessor->uri(); return -1; } return 0; }); } + // FIXME(gavin): read all RowsetMeta and make a resource id set and then delete_directory + bool finished = true; std::vector rets = concurrent_delete_executor.when_all(&finished); for (int r : rets) { @@ -1651,6 +1653,32 @@ int InstanceRecycler::recycle_tablet(int64_t tablet_id) { ret = finished ? ret : -1; + if (ret != 0) { // failed recycle tablet data + return ret; + } + + // delete all rowset kv in this tablet + std::unique_ptr txn; + if (txn_kv_->create_txn(&txn) != TxnErrorCode::TXN_OK) { + LOG(WARNING) << "failed to delete rowset kv of tablet " << tablet_id; + ret = -1; + } + txn->remove(rs_key0, rs_key1); + txn->remove(recyc_rs_key0, recyc_rs_key1); + + // remove delete bitmap for MoW table + std::string pending_key = meta_pending_delete_bitmap_key({instance_id_, tablet_id}); + txn->remove(pending_key); + std::string delete_bitmap_start = meta_delete_bitmap_key({instance_id_, tablet_id, "", 0, 0}); + std::string delete_bitmap_end = meta_delete_bitmap_key({instance_id_, tablet_id + 1, "", 0, 0}); + txn->remove(delete_bitmap_start, delete_bitmap_end); + + TxnErrorCode err = txn->commit(); + if (err != TxnErrorCode::TXN_OK) { + LOG(WARNING) << "failed to delete rowset kv of tablet " << tablet_id << ", err=" << err; + ret = -1; + } + if (ret == 0) { // All object files under tablet have been deleted std::lock_guard lock(recycled_tablets_mtx_);