Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
221 changes: 177 additions & 44 deletions cloud/src/recycler/recycler.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -24,12 +24,14 @@

#include <atomic>
#include <chrono>
#include <cstddef>
#include <cstdint>
#include <deque>
#include <string>
#include <string_view>

#include "common/stopwatch.h"
#include "meta-service/meta_service.h"
#include "meta-service/meta_service_schema.h"
#include "meta-service/txn_kv.h"
#include "meta-service/txn_kv_error.h"
Expand Down Expand Up @@ -249,15 +251,18 @@ void Recycler::recycle_callback() {
auto instance_recycler = std::make_shared<InstanceRecycler>(
txn_kv_, instance, _thread_pool_group, txn_lazy_committer_);

if (instance_recycler->init() != 0) {
LOG(WARNING) << "failed to init instance recycler, instance_id=" << instance_id;
if (int r = instance_recycler->init(); r != 0) {
LOG(WARNING) << "failed to init instance recycler, instance_id=" << instance_id
<< " ret=" << r;
continue;
}
std::string recycle_job_key;
job_recycle_key({instance_id}, &recycle_job_key);
int ret = prepare_instance_recycle_job(txn_kv_.get(), recycle_job_key, instance_id,
ip_port_, config::recycle_interval_seconds * 1000);
if (ret != 0) { // Prepare failed
LOG(WARNING) << "failed to prepare recycle_job, instance_id=" << instance_id
<< " ret=" << ret;
continue;
} else {
std::lock_guard lock(mtx_);
Expand All @@ -276,7 +281,12 @@ void Recycler::recycle_callback() {
std::lock_guard lock(mtx_);
recycling_instance_map_.erase(instance_id);
}
LOG_INFO("finish recycle instance").tag("instance_id", instance_id);
auto elpased_ms =
ctime_ms -
duration_cast<milliseconds>(system_clock::now().time_since_epoch()).count();
LOG_INFO("finish recycle instance")
.tag("instance_id", instance_id)
.tag("cost_ms", elpased_ms);
}
}

Expand Down Expand Up @@ -523,35 +533,37 @@ int InstanceRecycler::init_storage_vault_accessors() {
LOG(WARNING) << "malformed storage vault, unable to deserialize key=" << hex(k);
return -1;
}
TEST_SYNC_POINT_CALLBACK("InstanceRecycler::init_storage_vault_accessors.mock_vault",
&accessor_map_, &vault);

if (vault.has_hdfs_info()) {
auto accessor = std::make_shared<HdfsAccessor>(vault.hdfs_info());
int ret = accessor->init();
if (ret != 0) {
LOG(WARNING) << "failed to init hdfs accessor. instance_id=" << instance_id_
<< " resource_id=" << vault.id() << " name=" << vault.name();
return ret;
<< " resource_id=" << vault.id() << " name=" << vault.name()
<< " hdfs_vault=" << vault.hdfs_info().ShortDebugString();
continue;
}

accessor_map_.emplace(vault.id(), std::move(accessor));
} else if (vault.has_obj_info()) {
#ifdef UNIT_TEST
auto accessor = std::make_shared<MockAccessor>();
#else
auto s3_conf = S3Conf::from_obj_store_info(vault.obj_info());
if (!s3_conf) {
LOG(WARNING) << "failed to init object accessor, instance_id=" << instance_id_;
return -1;
LOG(WARNING) << "failed to init object accessor, invalid conf, instance_id="
<< instance_id_ << " s3_vault=" << vault.obj_info().ShortDebugString();
continue;
}

std::shared_ptr<S3Accessor> accessor;
int ret = S3Accessor::create(std::move(*s3_conf), &accessor);
if (ret != 0) {
LOG(WARNING) << "failed to init s3 accessor. instance_id=" << instance_id_
<< " resource_id=" << vault.id() << " name=" << vault.name();
return ret;
<< " resource_id=" << vault.id() << " name=" << vault.name()
<< " ret=" << ret
<< " s3_vault=" << vault.obj_info().ShortDebugString();
continue;
}
#endif

accessor_map_.emplace(vault.id(), std::move(accessor));
}
Expand All @@ -562,6 +574,13 @@ int InstanceRecycler::init_storage_vault_accessors() {
return -1;
}

if (accessor_map_.empty()) {
LOG(WARNING) << "no accessors for instance=" << instance_id_;
return -2;
}
LOG_INFO("finish init instance recycler number_accessors={} instance=", accessor_map_.size(),
instance_id_);

return 0;
}

Expand Down Expand Up @@ -1461,7 +1480,8 @@ int InstanceRecycler::delete_rowset_data(const std::vector<doris::RowsetMetaClou
}

auto it = accessor_map_.find(rs.resource_id());
if (it == accessor_map_.end()) [[unlikely]] { // impossible
// possible if the accessor is not initilized correctly
if (it == accessor_map_.end()) [[unlikely]] {
LOG_WARNING("instance has no such resource id")
.tag("instance_id", instance_id_)
.tag("resource_id", rs.resource_id());
Expand Down Expand Up @@ -1545,8 +1565,16 @@ int InstanceRecycler::delete_rowset_data(const std::vector<doris::RowsetMetaClou
[](const int& ret) { return ret != 0; });
for (auto& [resource_id, file_paths] : resource_file_paths) {
concurrent_delete_executor.add([&, rid = &resource_id, paths = &file_paths]() -> int {
DCHECK(accessor_map_.count(*rid))
<< "uninitilized accessor, instance_id=" << instance_id_
<< " resource_id=" << resource_id << " path[0]=" << (*paths)[0];
if (!accessor_map_.contains(*rid)) {
LOG_WARNING("delete rowset data accessor_map_ does not contains resouce id")
.tag("resource_id", resource_id)
.tag("instance_id", instance_id_);
return -1;
}
auto& accessor = accessor_map_[*rid];
DCHECK(accessor);
return accessor->delete_files(*paths);
});
}
Expand Down Expand Up @@ -1576,7 +1604,9 @@ int InstanceRecycler::delete_rowset_data(const std::string& resource_id, int64_t
if (it == accessor_map_.end()) {
LOG_WARNING("instance has no such resource id")
.tag("instance_id", instance_id_)
.tag("resource_id", resource_id);
.tag("resource_id", resource_id)
.tag("tablet_id", tablet_id)
.tag("rowset_id", rowset_id);
return -1;
}
auto& accessor = it->second;
Expand All @@ -1588,59 +1618,128 @@ int InstanceRecycler::recycle_tablet(int64_t tablet_id) {
.tag("instance_id", instance_id_)
.tag("tablet_id", tablet_id);

int ret = 0;
auto start_time = steady_clock::now();

// collect resource ids
std::string rs_key0 = meta_rowset_key({instance_id_, tablet_id, 0});
std::string rs_key1 = meta_rowset_key({instance_id_, tablet_id + 1, 0});
std::string recyc_rs_key0 = recycle_rowset_key({instance_id_, tablet_id, ""});
std::string recyc_rs_key1 = recycle_rowset_key({instance_id_, tablet_id + 1, ""});

std::set<std::string> resource_ids;
int64_t recycle_rowsets_number = 0;
int64_t recycle_segments_number = 0;
int64_t recycle_rowsets_data_size = 0;
int64_t recycle_rowsets_index_size = 0;
int64_t max_rowset_version = 0;
int64_t min_rowset_creation_time = INT64_MAX;
int64_t max_rowset_creation_time = 0;
int64_t min_rowset_expiration_time = INT64_MAX;
int64_t max_rowset_expiration_time = 0;

std::unique_ptr<int, std::function<void(int*)>> defer_log_statistics((int*)0x01, [&](int*) {
auto cost = duration<float>(steady_clock::now() - start_time).count();
LOG_INFO("recycle the rowsets of dropped tablet finished, cost={}s", cost)
.tag("instance_id", instance_id_)
.tag("tablet_id", tablet_id);
.tag("tablet_id", tablet_id)
.tag("recycle rowsets number", recycle_rowsets_number)
.tag("recycle segments number", recycle_segments_number)
.tag("all rowsets recycle data size", recycle_rowsets_data_size)
.tag("all rowsets recycle index size", recycle_rowsets_index_size)
.tag("max rowset version", max_rowset_version)
.tag("min rowset creation time", min_rowset_creation_time)
.tag("max rowset creation time", max_rowset_creation_time)
.tag("min rowset expiration time", min_rowset_expiration_time)
.tag("max rowset expiration time", max_rowset_expiration_time)
.tag("ret", ret);
});

// delete all rowset kv in this tablet
std::string rs_key0 = meta_rowset_key({instance_id_, tablet_id, 0});
std::string rs_key1 = meta_rowset_key({instance_id_, tablet_id + 1, 0});
std::string recyc_rs_key0 = recycle_rowset_key({instance_id_, tablet_id, ""});
std::string recyc_rs_key1 = recycle_rowset_key({instance_id_, tablet_id + 1, ""});

int ret = 0;
std::unique_ptr<Transaction> txn;
if (txn_kv_->create_txn(&txn) != TxnErrorCode::TXN_OK) {
LOG(WARNING) << "failed to delete rowset kv of tablet " << tablet_id;
LOG_WARNING("failed to recycle tablet ")
.tag("tablet id", tablet_id)
.tag("instance_id", instance_id_)
.tag("reason", "failed to create txn");
ret = -1;
}
txn->remove(rs_key0, rs_key1);
txn->remove(recyc_rs_key0, recyc_rs_key1);

// remove delete bitmap for MoW table
std::string pending_key = meta_pending_delete_bitmap_key({instance_id_, tablet_id});
txn->remove(pending_key);
std::string delete_bitmap_start = meta_delete_bitmap_key({instance_id_, tablet_id, "", 0, 0});
std::string delete_bitmap_end = meta_delete_bitmap_key({instance_id_, tablet_id + 1, "", 0, 0});
txn->remove(delete_bitmap_start, delete_bitmap_end);

TxnErrorCode err = txn->commit();
if (err != TxnErrorCode::TXN_OK) {
LOG(WARNING) << "failed to delete rowset kv of tablet " << tablet_id << ", err=" << err;
GetRowsetResponse resp;
std::string msg;
MetaServiceCode code = MetaServiceCode::OK;
// get rowsets in tablet
internal_get_rowset(txn.get(), 0, std::numeric_limits<int64_t>::max() - 1, instance_id_,
tablet_id, code, msg, &resp);
if (code != MetaServiceCode::OK) {
LOG_WARNING("failed to get rowsets of tablet when recycle tablet")
.tag("tablet id", tablet_id)
.tag("msg", msg)
.tag("code", code)
.tag("instance id", instance_id_);
ret = -1;
}
TEST_SYNC_POINT_CALLBACK("InstanceRecycler::recycle_tablet.create_rowset_meta", &resp);

for (const auto& rs_meta : resp.rowset_meta()) {
if (!rs_meta.has_resource_id()) {
LOG_WARNING("rowset meta does not have a resource id, impossible!")
.tag("rs_meta", rs_meta.ShortDebugString())
.tag("instance_id", instance_id_)
.tag("tablet_id", tablet_id);
return -1;
}
auto it = accessor_map_.find(rs_meta.resource_id());
// possible if the accessor is not initilized correctly
if (it == accessor_map_.end()) [[unlikely]] {
LOG_WARNING(
"failed to find resource id when recycle tablet, skip this vault accessor "
"recycle process")
.tag("tablet id", tablet_id)
.tag("instance_id", instance_id_)
.tag("resource_id", rs_meta.resource_id())
.tag("rowset meta pb", rs_meta.ShortDebugString());
return -1;
}
recycle_rowsets_number += 1;
recycle_segments_number += rs_meta.num_segments();
recycle_rowsets_data_size += rs_meta.data_disk_size();
recycle_rowsets_index_size += rs_meta.index_disk_size();
max_rowset_version = std::max(max_rowset_version, rs_meta.end_version());
min_rowset_creation_time = std::min(min_rowset_creation_time, rs_meta.creation_time());
max_rowset_creation_time = std::max(max_rowset_creation_time, rs_meta.creation_time());
min_rowset_expiration_time = std::min(min_rowset_expiration_time, rs_meta.txn_expiration());
max_rowset_expiration_time = std::max(max_rowset_expiration_time, rs_meta.txn_expiration());
resource_ids.emplace(rs_meta.resource_id());
}

LOG_INFO("recycle tablet start to delete object")
.tag("instance id", instance_id_)
.tag("tablet id", tablet_id)
.tag("recycle tablet resource ids are",
std::accumulate(resource_ids.begin(), resource_ids.end(), std::string(),
[](const std::string& a, const std::string& b) {
return a.empty() ? b : a + "," + b;
}));

SyncExecutor<int> concurrent_delete_executor(
_thread_pool_group.s3_producer_pool,
fmt::format("delete tablet {} s3 rowset", tablet_id),
[](const int& ret) { return ret != 0; });

// delete all rowset data in this tablet
for (auto& [_, accessor] : accessor_map_) {
concurrent_delete_executor.add([&, accessor_ptr = &accessor]() {
if ((*accessor_ptr)->delete_directory(tablet_path_prefix(tablet_id)) != 0) {
// ATTN: there may be data leak if not all accessor initilized successfully
// partial data deleted if the tablet is stored cross-storage vault
// vault id is not attached to TabletMeta...
for (const auto& resource_id : resource_ids) {
concurrent_delete_executor.add([&, accessor_ptr = accessor_map_[resource_id]]() {
if (accessor_ptr->delete_directory(tablet_path_prefix(tablet_id)) != 0) {
LOG(WARNING) << "failed to delete rowset data of tablet " << tablet_id
<< " s3_path=" << accessor->uri();
<< " path=" << accessor_ptr->uri();
return -1;
}
return 0;
});
}

bool finished = true;
std::vector<int> rets = concurrent_delete_executor.when_all(&finished);
for (int r : rets) {
Expand All @@ -1651,6 +1750,40 @@ int InstanceRecycler::recycle_tablet(int64_t tablet_id) {

ret = finished ? ret : -1;

if (ret != 0) { // failed recycle tablet data
LOG_WARNING("ret!=0")
.tag("finished", finished)
.tag("ret", ret)
.tag("instance_id", instance_id_)
.tag("tablet_id", tablet_id);
return ret;
}

txn.reset();
if (txn_kv_->create_txn(&txn) != TxnErrorCode::TXN_OK) {
LOG_WARNING("failed to recycle tablet ")
.tag("tablet id", tablet_id)
.tag("instance_id", instance_id_)
.tag("reason", "failed to create txn");
ret = -1;
}
// delete all rowset kv in this tablet
txn->remove(rs_key0, rs_key1);
txn->remove(recyc_rs_key0, recyc_rs_key1);

// remove delete bitmap for MoW table
std::string pending_key = meta_pending_delete_bitmap_key({instance_id_, tablet_id});
txn->remove(pending_key);
std::string delete_bitmap_start = meta_delete_bitmap_key({instance_id_, tablet_id, "", 0, 0});
std::string delete_bitmap_end = meta_delete_bitmap_key({instance_id_, tablet_id + 1, "", 0, 0});
txn->remove(delete_bitmap_start, delete_bitmap_end);

TxnErrorCode err = txn->commit();
if (err != TxnErrorCode::TXN_OK) {
LOG(WARNING) << "failed to delete rowset kv of tablet " << tablet_id << ", err=" << err;
ret = -1;
}

if (ret == 0) {
// All object files under tablet have been deleted
std::lock_guard lock(recycled_tablets_mtx_);
Expand Down Expand Up @@ -2233,7 +2366,7 @@ int InstanceRecycler::abort_timeout_txn() {
txn_info.set_status(TxnStatusPB::TXN_STATUS_ABORTED);
txn_info.set_finish_time(current_time);
txn_info.set_reason("timeout");
VLOG_DEBUG << "txn_info=" << txn_info.DebugString();
VLOG_DEBUG << "txn_info=" << txn_info.ShortDebugString();
txn_inf_val.clear();
if (!txn_info.SerializeToString(&txn_inf_val)) {
LOG_WARNING("failed to serialize txn info").tag("key", hex(k));
Expand Down Expand Up @@ -2911,7 +3044,7 @@ int InstanceRecycler::recycle_expired_stage_objects() {
const auto& old_obj = instance_info_.obj_info()[idx - 1];
auto s3_conf = S3Conf::from_obj_store_info(old_obj);
if (!s3_conf) {
LOG(WARNING) << "failed to init s3_conf with obj_info=" << old_obj.DebugString();
LOG(WARNING) << "failed to init s3_conf with obj_info=" << old_obj.ShortDebugString();
continue;
}

Expand Down
Loading
Loading