Skip to content
This repository has been archived by the owner on Feb 8, 2024. It is now read-only.

Commit

Permalink
rgw: [CORTX-32694] Motr GC Implementation for Multipart Objects
Browse files Browse the repository at this point in the history
Motr GC producer and consumer module implementation for multipart objects

Signed-off-by: Jeet Jain <jeet.jain@seagate.com>
  • Loading branch information
jjxsg committed Aug 30, 2022
1 parent ec121d3 commit a056c28
Show file tree
Hide file tree
Showing 4 changed files with 171 additions and 57 deletions.
175 changes: 133 additions & 42 deletions src/rgw/motr/gc/gc.cc
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ void *MotrGC::GCWorker::entry() {
do {
std::string gc_log_prefix = "[" + gc_thread_prefix
+ std::to_string(worker_id) + "] ";
ldpp_dout(dpp, 10) << gc_log_prefix << __func__ << " Iteration Started" << dendl;
ldpp_dout(dpp, 10) << gc_log_prefix << __func__ << ": iteration started" << dendl;

std::string iname = "";
// Get lock on an GC index
Expand All @@ -49,20 +49,20 @@ void *MotrGC::GCWorker::entry() {
// form the index name
iname = motr_gc->index_names[my_index];
ldpp_dout(dpp, 10) << gc_log_prefix
<< __func__ << ": Working on GC Queue: " << iname << dendl;
<< __func__ << ": working on GC queue: " << iname << dendl;

bufferlist bl;
std::vector<std::string> keys(motr_gc->max_count + 1);
std::vector<bufferlist> vals(motr_gc->max_count + 1);
uint32_t rgw_gc_obj_min_wait = cct->_conf->rgw_gc_obj_min_wait;
keys[0] = obj_exp_time_prefix;
rc = motr_gc->store->next_query_by_name(iname,
keys, vals, obj_exp_time_prefix);
ldpp_dout(dpp, 0) << gc_log_prefix
ldpp_dout(dpp, 20) << gc_log_prefix
<< __func__ <<": next_query_by_name() rc=" << rc << dendl;
if (rc < 0) {
// In case of failure, worker will keep retrying till end_time
ldpp_dout(dpp, 0) << gc_log_prefix
ldpp_dout(dpp, 0) << gc_log_prefix
<< __func__ <<": ERROR: NEXT query failed. rc=" << rc << dendl;
continue;
}
Expand All @@ -81,33 +81,54 @@ void *MotrGC::GCWorker::entry() {
// delete motr object
if(ginfo.is_multipart) {
// handle multipart object deletion
// with large number of parts, a gc thread may exhaust its alloted
// time, in that case it will be preempted and remaining parts will
// be processed in next cycle
rc = motr_gc->process_parts(ginfo, end_time);
if (rc < 0) {
ldpp_dout(dpp, 0) << gc_log_prefix << __func__
<< ": ERROR: failed to distribute multipart object "
<< ginfo.name << " with tag " << ginfo.tag << " rc: " << rc << dendl;
continue; // should continue deletion for next objects
}
ldpp_dout(dpp, 20) << gc_log_prefix
<< "successfully distributed multipart object "
<< ginfo.name << " with tag " << ginfo.tag << dendl;
}
else {
// simple object
rc = motr_gc->delete_motr_obj_from_gc(ginfo);
rc = motr_gc->delete_obj_from_gc(ginfo);
if (rc < 0) {
ldpp_dout(dpp, 0) << gc_log_prefix
<< __func__ <<"ERROR: Motr obj deletion failed for "
<< __func__ <<": ERROR: motr obj deletion failed for "
<< ginfo.tag << " with rc: " << rc << dendl;
continue; // should continue deletion for next objects
}
ldpp_dout(dpp, 20) << gc_log_prefix
<< "successfully deleted object " << ginfo.name
<< " with tag " << ginfo.tag << dendl;
}
// delete entry from GC queue
rc = motr_gc->dequeue(iname, ginfo);
if (rc == 0) {
ldpp_dout(dpp, 20) << gc_log_prefix
<< "successfully deleted GC entry for " << ginfo.name
<< " with tag " << ginfo.tag << dendl;
}
processed_count++;

// Update current time
current_time = std::time(nullptr);
// Exit the loop if required work is complete
if (processed_count >= motr_gc->max_count
if (processed_count >= motr_gc->max_count
|| current_time > end_time || motr_gc->going_down())
break;
}
// unlock the GC queue
motr_gc->un_lock_gc_index(my_index);
} else {
ldpp_dout(dpp, 0) << gc_log_prefix
<< __func__ << "ERROR: No GC index is available for locking."
<< __func__ << ": ERROR: no GC index is available for locking."
<< " Skipping GC processing" << " rc = " << rc << dendl;
}
my_index = (my_index + 1) % motr_gc->max_indices;
Expand All @@ -119,7 +140,7 @@ void *MotrGC::GCWorker::entry() {
}
} while (!motr_gc->going_down());

ldpp_dout(dpp, 0) << __func__ << ": Stop signal called for "
ldpp_dout(dpp, 0) << __func__ << ": stop signal called for "
<< gc_thread_prefix << worker_id << dendl;
return nullptr;
}
Expand All @@ -133,7 +154,7 @@ int MotrGC::initialize() {
std::string iname = gc_index_prefix + "." + std::to_string(ind_suf);
rc = store->create_motr_idx_by_name(iname);
if (rc < 0 && rc != -EEXIST){
ldout(cct, 0) << "ERROR: GC index creation failed with rc: " << rc << dendl;
ldout(cct, 0) << ": ERROR: GC index creation failed with rc: " << rc << dendl;
break;
}
if (rc == -EEXIST) rc = 0;
Expand All @@ -154,7 +175,7 @@ int MotrGC::initialize() {
rc = kv_lock_provider->initialize(
this, store, global_lock_table);
if (rc < 0) {
ldout(cct, 0) << "ERROR: Failed to initialize lock provider: "
ldout(cct, 0) << "ERROR: failed to initialize lock provider: "
<< rc << dendl;
return rc;
}
Expand Down Expand Up @@ -223,27 +244,97 @@ uint32_t MotrGC::get_max_indices() {
return gc_max_indices;
}

int MotrGC::delete_motr_obj_from_gc(motr_gc_obj_info ginfo) {
int MotrGC::delete_obj_from_gc(motr_gc_obj_info ginfo) {
int rc = delete_motr_obj(ginfo.mobj);
return rc;
}

int MotrGC::process_parts(motr_gc_obj_info ginfo, std::time_t end_time) {
int rc = 0;
int max_entries = 10000;
int number_of_parts = 0;
int processed_parts = 0;
std::vector<std::string> keys(max_entries);
std::vector<bufferlist> vals(max_entries);

rc = store->next_query_by_name(ginfo.multipart_iname, keys, vals);
if (rc < 0) {
ldout(cct, 0) <<__func__<<": ERROR: next query failed. rc="
<< rc << dendl;
return rc;
}
number_of_parts = rc;
for (const auto& bl: vals) {
if (bl.length() == 0)
break;

RGWUploadPartInfo info;
auto iter = bl.cbegin();
info.decode(iter);
rgw::sal::Attrs attrs_dummy;
decode(attrs_dummy, iter);
Meta mobj;
mobj.decode(iter);
ldout(cct, 20) <<__func__<< ": part_num=" << info.num
<< " part_size=" << info.size << dendl;
std::string tag = PRIx64 + std::to_string(mobj.oid.u_hi) + ":" +
PRIx64 + std::to_string(mobj.oid.u_lo);
std::string part_name = "part.";
char buff[32];
snprintf(buff, sizeof(buff), "%08d", (int)info.num);
part_name.append(buff);

std::string obj_fqdn = ginfo.name + "." + part_name;
motr_gc_obj_info gc_obj(tag, obj_fqdn, mobj, ginfo.deletion_time,
info.size, false, "");
rc = enqueue(gc_obj);
if (rc < 0) {
ldout(cct, 0) <<__func__<< ": ERROR: failed to push "
<< obj_fqdn << "into GC queue " << dendl;
continue;
}
processed_parts++;
bufferlist bl_del;
rc = store->do_idx_op_by_name(ginfo.multipart_iname,
M0_IC_DEL, part_name, bl_del);
if (rc < 0) {
ldout(cct, 0) <<__func__<< ": ERROR: failed to remove part " << part_name
<< " from part index " << ginfo.multipart_iname << dendl;
}
if (std::time(nullptr) > end_time || going_down()) {
// processing time's up, so return now
return 0;
}
}

if (processed_parts == number_of_parts) {
store->delete_motr_idx_by_name(ginfo.multipart_iname);
}

return rc;
}

int MotrGC::delete_motr_obj(Meta motr_obj) {
int rc;
struct m0_op *op = nullptr;
char fid_str[M0_FID_STR_LEN];
snprintf(fid_str, ARRAY_SIZE(fid_str), U128X_F, U128_P(&ginfo.mobj.oid));
snprintf(fid_str, ARRAY_SIZE(fid_str), U128X_F, U128_P(&motr_obj.oid));

if (!ginfo.mobj.oid.u_hi || !ginfo.mobj.oid.u_lo) {
if (!motr_obj.oid.u_hi || !motr_obj.oid.u_lo) {
ldout(cct, 0) <<__func__<< ": invalid motr object oid=" << fid_str << dendl;
return -EINVAL;
}
ldout(cct, 10) <<__func__<< ": deleting motr object oid=" << fid_str << dendl;

// Open the object.
if (ginfo.mobj.layout_id == 0) {
if (motr_obj.layout_id == 0) {
return -ENOENT;
}
auto mobj = new m0_obj();
memset(mobj, 0, sizeof *mobj);
m0_obj_init(mobj, &store->container.co_realm, &ginfo.mobj.oid, store->conf.mc_layout_id);
mobj->ob_attr.oa_layout_id = ginfo.mobj.layout_id;
mobj->ob_attr.oa_pver = ginfo.mobj.pver;
m0_obj_init(mobj, &store->container.co_realm, &motr_obj.oid, store->conf.mc_layout_id);
mobj->ob_attr.oa_layout_id = motr_obj.layout_id;
mobj->ob_attr.oa_pver = motr_obj.pver;
mobj->ob_entity.en_flags |= M0_ENF_META;
rc = m0_entity_open(&mobj->ob_entity, &op);
if (rc != 0) {
Expand Down Expand Up @@ -293,18 +384,17 @@ int MotrGC::delete_motr_obj_from_gc(motr_gc_obj_info ginfo) {
m0_obj_fini(mobj);
delete mobj; mobj = nullptr;
}
ldout(cct, 10) <<__func__<< ": deleted motr object oid="
<< fid_str << " for tag=" << ginfo.tag << dendl;
return 0;
}

int MotrGC::enqueue(motr_gc_obj_info obj) {
int rc = 0;
// create 🔑's:
// - 1_{obj.deletion_time + min_wait_time}
// - 1_{obj.deletion_time}_{obj.tag}
// - 0_{obj.tag}
std::string key1 = obj_exp_time_prefix +
std::to_string(obj.deletion_time + cct->_conf->rgw_gc_obj_min_wait);
std::to_string(obj.deletion_time) + "_" +
obj.tag;
std::string key2 = obj_tag_prefix + obj.tag;

bufferlist bl;
Expand Down Expand Up @@ -337,19 +427,21 @@ int MotrGC::dequeue(std::string iname, motr_gc_obj_info obj) {
bufferlist bl;
std::string tag_key = obj_tag_prefix + obj.tag;
std::string expiry_time_key = obj_exp_time_prefix +
std::to_string(obj.deletion_time + cct->_conf->rgw_gc_obj_min_wait);
std::to_string(obj.deletion_time) +
"_" + obj.tag;
rc = store->do_idx_op_by_name(iname, M0_IC_DEL, tag_key, bl);
if (rc < 0) {
ldout(cct, 0) << "ERROR: failed to delete tag entry "
ldout(cct, 0) << __func__ << ": ERROR: failed to delete tag entry "
<< tag_key << " rc: " << rc << dendl;
}
ldout(cct, 10) << "Deleted tag entry "<< tag_key << dendl;
ldout(cct, 10) << __func__ << ": deleted tag entry " << tag_key << dendl;
rc = store->do_idx_op_by_name(iname, M0_IC_DEL, expiry_time_key, bl);
if (rc < 0 && rc != -EEXIST) {
ldout(cct, 0) << "ERROR: failed to delete time entry "
ldout(cct, 0) << __func__ << ": ERROR: failed to delete time entry "
<< expiry_time_key << " rc: " << rc << dendl;
}
ldout(cct, 10) << "Deleted time entry "<< expiry_time_key << dendl;
ldout(cct, 10) << __func__ << ": deleted time entry "
<< expiry_time_key << dendl;
return rc;
}

Expand All @@ -372,10 +464,11 @@ int MotrGC::get_locked_gc_index(uint32_t& rand_ind,
rc = gc_lock->lock(iname, MotrLockType::EXCLUSIVE,
gc_lease_duration, caller_id);
if (rc < 0) {
ldout(cct, 10) << "Failed to acquire lock: GC Queue =["<< iname << "]"
<< "caller_id =[" << caller_id << "]" << "rc = " << rc << dendl;
ldout(cct, 10) << __func__ << ": failed to acquire lock: GC queue = [" << iname
<< "]" << "caller_id =[" << caller_id << "]" << "rc = " << rc << dendl;
} else {
ldout(cct, 10) << "Acquired lock for GC Queue =["<< iname << "]" << dendl;
ldout(cct, 10) << __func__ << ": acquired lock for GC queue = ["
<< iname << "]" << dendl;
}
}
if (rc == 0)
Expand All @@ -396,7 +489,6 @@ int MotrGC::un_lock_gc_index(uint32_t& index) {
}

int MotrGC::list(std::vector<std::unordered_map<std::string, std::string>> &gc_entries) {
int rc = 0;
int max_entries = 1000;
max_indices = get_max_indices();
for (uint32_t i = 0; i < max_indices; i++) {
Expand All @@ -411,35 +503,34 @@ int MotrGC::list(std::vector<std::unordered_map<std::string, std::string>> &gc_e
if (!marker.empty()) {
keys[0] = marker;
}
rc = store->next_query_by_name(iname, keys, vals, obj_tag_prefix);
int rc = store->next_query_by_name(iname, keys, vals, obj_tag_prefix);
if (rc < 0) {
ldout(cct, 0) <<__func__<<": ERROR: NEXT query failed. rc="
<< rc << dendl;
return rc;
ldout(cct, 0) << __func__ << ": ERROR: next query failed for " << iname
<< " with rc=" << rc << dendl;
continue;
}
if (rc == max_entries + 1) {
truncated = true;
marker = keys.back();
}
for (int j = 0; j < max_entries &&
for (int j = 0; j < max_entries &&
!keys[j].empty() && keys[j] != obj_tag_prefix; j++) {
bufferlist::const_iterator blitr = vals[j].cbegin();
motr_gc_obj_info ginfo;
ginfo.decode(blitr);
std::unordered_map<std::string, std::string> mp;
char t_str[100];
std::string deletion_time;
if (std::strftime(t_str, sizeof(t_str), "%Y-%m-%dT%H:%M:%S%z%Z", std::localtime(&ginfo.deletion_time))) {
if (std::strftime(t_str, sizeof(t_str), "%Y-%m-%dT%H:%M:%S%z%Z",
std::localtime(&ginfo.deletion_time))) {
deletion_time = t_str;
}
else {
} else {
deletion_time = std::to_string(ginfo.deletion_time);
}
mp["tag"] = ginfo.tag;
mp["name"] = ginfo.name;
mp["deletion_time"] = deletion_time;
mp["size"] = std::to_string(ginfo.size);
mp["size_actual"] = std::to_string(ginfo.size_actual);
mp["is_multipart"] = ginfo.is_multipart ? "true" : "false";
gc_entries.push_back(mp);
ldout(cct, 70) << ginfo.tag << ", "
Expand Down
Loading

0 comments on commit a056c28

Please sign in to comment.