Skip to content
Closed
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
110 changes: 69 additions & 41 deletions cloud/src/recycler/recycler.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -249,15 +249,16 @@ void Recycler::recycle_callback() {
auto instance_recycler = std::make_shared<InstanceRecycler>(
txn_kv_, instance, _thread_pool_group, txn_lazy_committer_);

if (instance_recycler->init() != 0) {
LOG(WARNING) << "failed to init instance recycler, instance_id=" << instance_id;
if (int r = instance_recycler->init(); r != 0) {
LOG(WARNING) << "failed to init instance recycler, instance_id=" << instance_id << " ret=" << r;
continue;
}
std::string recycle_job_key;
job_recycle_key({instance_id}, &recycle_job_key);
int ret = prepare_instance_recycle_job(txn_kv_.get(), recycle_job_key, instance_id,
ip_port_, config::recycle_interval_seconds * 1000);
if (ret != 0) { // Prepare failed
LOG(WARNING) << "failed to prepare recycle_job, instance_id=" << instance_id << " ret=" << ret;
continue;
} else {
std::lock_guard lock(mtx_);
Expand All @@ -276,7 +277,8 @@ void Recycler::recycle_callback() {
std::lock_guard lock(mtx_);
recycling_instance_map_.erase(instance_id);
}
LOG_INFO("finish recycle instance").tag("instance_id", instance_id);
auto elpased_ms = ctime_ms - duration_cast<milliseconds>(system_clock::now().time_since_epoch()).count();
LOG_INFO("finish recycle instance").tag("instance_id", instance_id).tag("cost_ms", elpased_ms);
}
}

Expand Down Expand Up @@ -529,8 +531,9 @@ int InstanceRecycler::init_storage_vault_accessors() {
int ret = accessor->init();
if (ret != 0) {
LOG(WARNING) << "failed to init hdfs accessor. instance_id=" << instance_id_
<< " resource_id=" << vault.id() << " name=" << vault.name();
return ret;
<< " resource_id=" << vault.id() << " name=" << vault.name()
<< " hdfs_vault=" << vault.hdfs_info().DebugString();
continue;
}

accessor_map_.emplace(vault.id(), std::move(accessor));
Expand All @@ -540,16 +543,18 @@ int InstanceRecycler::init_storage_vault_accessors() {
#else
auto s3_conf = S3Conf::from_obj_store_info(vault.obj_info());
if (!s3_conf) {
LOG(WARNING) << "failed to init object accessor, instance_id=" << instance_id_;
return -1;
LOG(WARNING) << "failed to init object accessor, invalid conf, instance_id="
<< instance_id_ << " s3_vault=" << vault.obj_info().DebugString();
continue;
}

std::shared_ptr<S3Accessor> accessor;
int ret = S3Accessor::create(std::move(*s3_conf), &accessor);
if (ret != 0) {
LOG(WARNING) << "failed to init s3 accessor. instance_id=" << instance_id_
<< " resource_id=" << vault.id() << " name=" << vault.name();
return ret;
<< " resource_id=" << vault.id() << " name=" << vault.name()
<< " ret=" << ret << " s3_vault=" << vault.obj_info().DebugString();
return continue;
}
#endif

Expand All @@ -562,6 +567,13 @@ int InstanceRecycler::init_storage_vault_accessors() {
return -1;
}

if (accessor_map_.empty()) {
LOG(WARNING) << "no accessors for instance=" << instance_id_;
return -2;
}
LOG_INFO("finish init instance recycler number_accessors={} instance=", accessor_map_.size(),
instance_id_);

return 0;
}

Expand Down Expand Up @@ -1461,7 +1473,8 @@ int InstanceRecycler::delete_rowset_data(const std::vector<doris::RowsetMetaClou
}

auto it = accessor_map_.find(rs.resource_id());
if (it == accessor_map_.end()) [[unlikely]] { // impossible
// possible if the accessor is not initilized correctly
if (it == accessor_map_.end()) [[unlikely]] {
LOG_WARNING("instance has no such resource id")
.tag("instance_id", instance_id_)
.tag("resource_id", rs.resource_id());
Expand Down Expand Up @@ -1545,8 +1558,8 @@ int InstanceRecycler::delete_rowset_data(const std::vector<doris::RowsetMetaClou
[](const int& ret) { return ret != 0; });
for (auto& [resource_id, file_paths] : resource_file_paths) {
concurrent_delete_executor.add([&, rid = &resource_id, paths = &file_paths]() -> int {
DCHECK(accessor_map_.count(*rid)) << "uninitilized accessor, instance_id=" << instance_id_ << " resource_id=" << resource_id << " path[0]=" << (*paths)[0];
auto& accessor = accessor_map_[*rid];
DCHECK(accessor);
return accessor->delete_files(*paths);
});
}
Expand Down Expand Up @@ -1577,6 +1590,8 @@ int InstanceRecycler::delete_rowset_data(const std::string& resource_id, int64_t
LOG_WARNING("instance has no such resource id")
.tag("instance_id", instance_id_)
.tag("resource_id", resource_id);
.tag("tablet_id", tablet_id);
.tag("rowset_id", rowset_id);
return -1;
}
auto& accessor = it->second;
Expand All @@ -1588,59 +1603,46 @@ int InstanceRecycler::recycle_tablet(int64_t tablet_id) {
.tag("instance_id", instance_id_)
.tag("tablet_id", tablet_id);

int ret = 0;
auto start_time = steady_clock::now();

std::unique_ptr<int, std::function<void(int*)>> defer_log_statistics((int*)0x01, [&](int*) {
auto cost = duration<float>(steady_clock::now() - start_time).count();
LOG_INFO("recycle the rowsets of dropped tablet finished, cost={}s", cost)
.tag("instance_id", instance_id_)
.tag("tablet_id", tablet_id);
});

// delete all rowset kv in this tablet
// collect resource ids
std::string rs_key0 = meta_rowset_key({instance_id_, tablet_id, 0});
std::string rs_key1 = meta_rowset_key({instance_id_, tablet_id + 1, 0});
std::string recyc_rs_key0 = recycle_rowset_key({instance_id_, tablet_id, ""});
std::string recyc_rs_key1 = recycle_rowset_key({instance_id_, tablet_id + 1, ""});

int ret = 0;
std::unique_ptr<Transaction> txn;
if (txn_kv_->create_txn(&txn) != TxnErrorCode::TXN_OK) {
LOG(WARNING) << "failed to delete rowset kv of tablet " << tablet_id;
ret = -1;
}
txn->remove(rs_key0, rs_key1);
txn->remove(recyc_rs_key0, recyc_rs_key1);
std::set<std::string> resource_ids;

// remove delete bitmap for MoW table
std::string pending_key = meta_pending_delete_bitmap_key({instance_id_, tablet_id});
txn->remove(pending_key);
std::string delete_bitmap_start = meta_delete_bitmap_key({instance_id_, tablet_id, "", 0, 0});
std::string delete_bitmap_end = meta_delete_bitmap_key({instance_id_, tablet_id + 1, "", 0, 0});
txn->remove(delete_bitmap_start, delete_bitmap_end);

TxnErrorCode err = txn->commit();
if (err != TxnErrorCode::TXN_OK) {
LOG(WARNING) << "failed to delete rowset kv of tablet " << tablet_id << ", err=" << err;
ret = -1;
}
std::unique_ptr<int, std::function<void(int*)>> defer_log_statistics((int*)0x01, [&](int*) {
auto cost = duration<float>(steady_clock::now() - start_time).count();
LOG_INFO("recycle the rowsets of dropped tablet finished, cost={}s", cost)
.tag("instance_id", instance_id_)
.tag("tablet_id", tablet_id);
.tag("ret", ret);
});

SyncExecutor<int> concurrent_delete_executor(
_thread_pool_group.s3_producer_pool,
fmt::format("delete tablet {} s3 rowset", tablet_id),
[](const int& ret) { return ret != 0; });

// delete all rowset data in this tablet
for (auto& [_, accessor] : accessor_map_) {
// ATTN: there may be data leak if not all accessor initilized successfully
// partial data deleted if the tablet is stored cross-storage vault
// vault id is not attached to TabletMeta...
for (auto& [_, accessor] : accessor_map_) {
concurrent_delete_executor.add([&, accessor_ptr = &accessor]() {
if ((*accessor_ptr)->delete_directory(tablet_path_prefix(tablet_id)) != 0) {
LOG(WARNING) << "failed to delete rowset data of tablet " << tablet_id
<< " s3_path=" << accessor->uri();
<< " path=" << accessor->uri();
return -1;
}
return 0;
});
}
// FIXME(gavin): read all RowsetMeta and make a resource id set and then delete_directory

bool finished = true;
std::vector<int> rets = concurrent_delete_executor.when_all(&finished);
for (int r : rets) {
Expand All @@ -1651,6 +1653,32 @@ int InstanceRecycler::recycle_tablet(int64_t tablet_id) {

ret = finished ? ret : -1;

if (ret != 0) { // failed recycle tablet data
return ret;
}

// delete all rowset kv in this tablet
std::unique_ptr<Transaction> txn;
if (txn_kv_->create_txn(&txn) != TxnErrorCode::TXN_OK) {
LOG(WARNING) << "failed to delete rowset kv of tablet " << tablet_id;
ret = -1;
}
txn->remove(rs_key0, rs_key1);
txn->remove(recyc_rs_key0, recyc_rs_key1);

// remove delete bitmap for MoW table
std::string pending_key = meta_pending_delete_bitmap_key({instance_id_, tablet_id});
txn->remove(pending_key);
std::string delete_bitmap_start = meta_delete_bitmap_key({instance_id_, tablet_id, "", 0, 0});
std::string delete_bitmap_end = meta_delete_bitmap_key({instance_id_, tablet_id + 1, "", 0, 0});
txn->remove(delete_bitmap_start, delete_bitmap_end);

TxnErrorCode err = txn->commit();
if (err != TxnErrorCode::TXN_OK) {
LOG(WARNING) << "failed to delete rowset kv of tablet " << tablet_id << ", err=" << err;
ret = -1;
}

if (ret == 0) {
// All object files under tablet have been deleted
std::lock_guard lock(recycled_tablets_mtx_);
Expand Down
Loading