Skip to content
This repository has been archived by the owner on Feb 8, 2024. It is now read-only.

Commit

Permalink
rgw: [CORTX-32694] Motr GC Implementation for Multipart Objects
Browse files Browse the repository at this point in the history
Motr GC producer and consumer module implementation for multipart objects

Signed-off-by: Jeet Jain <jeet.jain@seagate.com>
  • Loading branch information
jjxsg committed Aug 29, 2022
1 parent ec121d3 commit b172e2b
Show file tree
Hide file tree
Showing 3 changed files with 154 additions and 39 deletions.
160 changes: 123 additions & 37 deletions src/rgw/motr/gc/gc.cc
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ void *MotrGC::GCWorker::entry() {
do {
std::string gc_log_prefix = "[" + gc_thread_prefix
+ std::to_string(worker_id) + "] ";
ldpp_dout(dpp, 10) << gc_log_prefix << __func__ << " Iteration Started" << dendl;
ldpp_dout(dpp, 10) << gc_log_prefix << __func__ << ": iteration started" << dendl;

std::string iname = "";
// Get lock on an GC index
Expand All @@ -49,20 +49,20 @@ void *MotrGC::GCWorker::entry() {
// form the index name
iname = motr_gc->index_names[my_index];
ldpp_dout(dpp, 10) << gc_log_prefix
<< __func__ << ": Working on GC Queue: " << iname << dendl;
<< __func__ << ": working on GC queue: " << iname << dendl;

bufferlist bl;
std::vector<std::string> keys(motr_gc->max_count + 1);
std::vector<bufferlist> vals(motr_gc->max_count + 1);
uint32_t rgw_gc_obj_min_wait = cct->_conf->rgw_gc_obj_min_wait;
keys[0] = obj_exp_time_prefix;
rc = motr_gc->store->next_query_by_name(iname,
keys, vals, obj_exp_time_prefix);
ldpp_dout(dpp, 0) << gc_log_prefix
ldpp_dout(dpp, 0) << gc_log_prefix
<< __func__ <<": next_query_by_name() rc=" << rc << dendl;
if (rc < 0) {
// In case of failure, worker will keep retrying till end_time
ldpp_dout(dpp, 0) << gc_log_prefix
ldpp_dout(dpp, 0) << gc_log_prefix
<< __func__ <<": ERROR: NEXT query failed. rc=" << rc << dendl;
continue;
}
Expand All @@ -81,33 +81,48 @@ void *MotrGC::GCWorker::entry() {
// delete motr object
if(ginfo.is_multipart) {
// handle multipart object deletion
// with large number of parts, a gc thread may exhaust its alloted
// time, in that case it will be preempted and remaining parts will
// be processed in next cycle
rc = motr_gc->process_parts(ginfo, end_time);
if (rc < 0) {
ldpp_dout(dpp, 0) << gc_log_prefix << __func__
<< ": ERROR: failed to distribute multipart object "
<< ginfo.name << " with tag " << ginfo.tag << " rc: " << rc << dendl;
continue; // should continue deletion for next objects
}
}
else {
// simple object
rc = motr_gc->delete_motr_obj_from_gc(ginfo);
rc = motr_gc->delete_obj_from_gc(ginfo);
if (rc < 0) {
ldpp_dout(dpp, 0) << gc_log_prefix
<< __func__ <<"ERROR: Motr obj deletion failed for "
<< __func__ <<": ERROR: motr obj deletion failed for "
<< ginfo.tag << " with rc: " << rc << dendl;
continue; // should continue deletion for next objects
}
}
// delete entry from GC queue
rc = motr_gc->dequeue(iname, ginfo);
if (rc == 0) {
ldpp_dout(dpp, 20) << gc_log_prefix << __func__
<< ": successfully deleted object " << ginfo.name
<< " with tag " << ginfo.tag << dendl;
}
processed_count++;

// Update current time
current_time = std::time(nullptr);
// Exit the loop if required work is complete
if (processed_count >= motr_gc->max_count
if (processed_count >= motr_gc->max_count
|| current_time > end_time || motr_gc->going_down())
break;
}
// unlock the GC queue
motr_gc->un_lock_gc_index(my_index);
} else {
ldpp_dout(dpp, 0) << gc_log_prefix
<< __func__ << "ERROR: No GC index is available for locking."
<< __func__ << ": ERROR: no GC index is available for locking."
<< " Skipping GC processing" << " rc = " << rc << dendl;
}
my_index = (my_index + 1) % motr_gc->max_indices;
Expand All @@ -119,7 +134,7 @@ void *MotrGC::GCWorker::entry() {
}
} while (!motr_gc->going_down());

ldpp_dout(dpp, 0) << __func__ << ": Stop signal called for "
ldpp_dout(dpp, 0) << __func__ << ": stop signal called for "
<< gc_thread_prefix << worker_id << dendl;
return nullptr;
}
Expand All @@ -133,7 +148,7 @@ int MotrGC::initialize() {
std::string iname = gc_index_prefix + "." + std::to_string(ind_suf);
rc = store->create_motr_idx_by_name(iname);
if (rc < 0 && rc != -EEXIST){
ldout(cct, 0) << "ERROR: GC index creation failed with rc: " << rc << dendl;
ldout(cct, 0) << ": ERROR: GC index creation failed with rc: " << rc << dendl;
break;
}
if (rc == -EEXIST) rc = 0;
Expand All @@ -154,7 +169,7 @@ int MotrGC::initialize() {
rc = kv_lock_provider->initialize(
this, store, global_lock_table);
if (rc < 0) {
ldout(cct, 0) << "ERROR: Failed to initialize lock provider: "
ldout(cct, 0) << "ERROR: failed to initialize lock provider: "
<< rc << dendl;
return rc;
}
Expand Down Expand Up @@ -223,27 +238,97 @@ uint32_t MotrGC::get_max_indices() {
return gc_max_indices;
}

int MotrGC::delete_motr_obj_from_gc(motr_gc_obj_info ginfo) {
int MotrGC::delete_obj_from_gc(motr_gc_obj_info ginfo) {
int rc = delete_motr_obj(ginfo.mobj);
return rc;
}

int MotrGC::process_parts(motr_gc_obj_info ginfo, std::time_t end_time) {
int rc = 0;
int max_entries = 10000;
int number_of_parts = 0;
int processed_parts = 0;
std::vector<std::string> keys(max_entries);
std::vector<bufferlist> vals(max_entries);

rc = store->next_query_by_name(ginfo.multipart_iname, keys, vals);
if (rc < 0) {
ldout(cct, 0) <<__func__<<": ERROR: next query failed. rc="
<< rc << dendl;
return rc;
}
number_of_parts = rc;
for (const auto& bl: vals) {
if (bl.length() == 0)
break;

RGWUploadPartInfo info;
auto iter = bl.cbegin();
info.decode(iter);
rgw::sal::Attrs attrs_dummy;
decode(attrs_dummy, iter);
Meta mobj;
mobj.decode(iter);
ldout(cct, 20) <<__func__<< ": part_num=" << info.num
<< " part_size=" << info.size << dendl;
std::string tag = PRIx64 + std::to_string(mobj.oid.u_hi) + ":" +
PRIx64 + std::to_string(mobj.oid.u_lo);
std::string part_name = "part.";
char buff[32];
snprintf(buff, sizeof(buff), "%08d", (int)info.num);
part_name.append(buff);

std::string obj_fqdn = ginfo.name + "." + part_name;
motr_gc_obj_info gc_obj(tag, obj_fqdn, mobj, ginfo.deletion_time,
info.size, info.size_rounded, false, "");
rc = enqueue(gc_obj);
if (rc < 0) {
ldout(cct, 0) <<__func__<< ": ERROR: failed to push "
<< obj_fqdn << "into GC queue " << dendl;
continue;
}
processed_parts++;
bufferlist bl_del;
rc = store->do_idx_op_by_name(ginfo.multipart_iname,
M0_IC_DEL, part_name, bl_del);
if (rc < 0) {
ldout(cct, 0) <<__func__<< ": ERROR: failed to remove part " << part_name
<< " from part index " << ginfo.multipart_iname << dendl;
}
if (std::time(nullptr) > end_time) {
// processing time's up, so return now
return 0;
}
}

if (processed_parts == number_of_parts) {
store->delete_motr_idx_by_name(ginfo.multipart_iname);
}

return rc;
}

int MotrGC::delete_motr_obj(Meta motr_obj) {
int rc;
struct m0_op *op = nullptr;
char fid_str[M0_FID_STR_LEN];
snprintf(fid_str, ARRAY_SIZE(fid_str), U128X_F, U128_P(&ginfo.mobj.oid));
snprintf(fid_str, ARRAY_SIZE(fid_str), U128X_F, U128_P(&motr_obj.oid));

if (!ginfo.mobj.oid.u_hi || !ginfo.mobj.oid.u_lo) {
if (!motr_obj.oid.u_hi || !motr_obj.oid.u_lo) {
ldout(cct, 0) <<__func__<< ": invalid motr object oid=" << fid_str << dendl;
return -EINVAL;
}
ldout(cct, 10) <<__func__<< ": deleting motr object oid=" << fid_str << dendl;

// Open the object.
if (ginfo.mobj.layout_id == 0) {
if (motr_obj.layout_id == 0) {
return -ENOENT;
}
auto mobj = new m0_obj();
memset(mobj, 0, sizeof *mobj);
m0_obj_init(mobj, &store->container.co_realm, &ginfo.mobj.oid, store->conf.mc_layout_id);
mobj->ob_attr.oa_layout_id = ginfo.mobj.layout_id;
mobj->ob_attr.oa_pver = ginfo.mobj.pver;
m0_obj_init(mobj, &store->container.co_realm, &motr_obj.oid, store->conf.mc_layout_id);
mobj->ob_attr.oa_layout_id = motr_obj.layout_id;
mobj->ob_attr.oa_pver = motr_obj.pver;
mobj->ob_entity.en_flags |= M0_ENF_META;
rc = m0_entity_open(&mobj->ob_entity, &op);
if (rc != 0) {
Expand Down Expand Up @@ -293,18 +378,17 @@ int MotrGC::delete_motr_obj_from_gc(motr_gc_obj_info ginfo) {
m0_obj_fini(mobj);
delete mobj; mobj = nullptr;
}
ldout(cct, 10) <<__func__<< ": deleted motr object oid="
<< fid_str << " for tag=" << ginfo.tag << dendl;
return 0;
}

int MotrGC::enqueue(motr_gc_obj_info obj) {
int rc = 0;
// create 🔑's:
// - 1_{obj.deletion_time + min_wait_time}
// - 1_{obj.deletion_time}_{obj.tag}
// - 0_{obj.tag}
std::string key1 = obj_exp_time_prefix +
std::to_string(obj.deletion_time + cct->_conf->rgw_gc_obj_min_wait);
std::to_string(obj.deletion_time) + "_" +
obj.tag;
std::string key2 = obj_tag_prefix + obj.tag;

bufferlist bl;
Expand Down Expand Up @@ -337,19 +421,21 @@ int MotrGC::dequeue(std::string iname, motr_gc_obj_info obj) {
bufferlist bl;
std::string tag_key = obj_tag_prefix + obj.tag;
std::string expiry_time_key = obj_exp_time_prefix +
std::to_string(obj.deletion_time + cct->_conf->rgw_gc_obj_min_wait);
std::to_string(obj.deletion_time) +
"_" + obj.tag;
rc = store->do_idx_op_by_name(iname, M0_IC_DEL, tag_key, bl);
if (rc < 0) {
ldout(cct, 0) << "ERROR: failed to delete tag entry "
ldout(cct, 0) << __func__ << ": ERROR: failed to delete tag entry "
<< tag_key << " rc: " << rc << dendl;
}
ldout(cct, 10) << "Deleted tag entry "<< tag_key << dendl;
ldout(cct, 10) << __func__ << ": deleted tag entry " << tag_key << dendl;
rc = store->do_idx_op_by_name(iname, M0_IC_DEL, expiry_time_key, bl);
if (rc < 0 && rc != -EEXIST) {
ldout(cct, 0) << "ERROR: failed to delete time entry "
ldout(cct, 0) << __func__ << ": ERROR: failed to delete time entry "
<< expiry_time_key << " rc: " << rc << dendl;
}
ldout(cct, 10) << "Deleted time entry "<< expiry_time_key << dendl;
ldout(cct, 10) << __func__ << ": deleted time entry "
<< expiry_time_key << dendl;
return rc;
}

Expand All @@ -372,10 +458,11 @@ int MotrGC::get_locked_gc_index(uint32_t& rand_ind,
rc = gc_lock->lock(iname, MotrLockType::EXCLUSIVE,
gc_lease_duration, caller_id);
if (rc < 0) {
ldout(cct, 10) << "Failed to acquire lock: GC Queue =["<< iname << "]"
<< "caller_id =[" << caller_id << "]" << "rc = " << rc << dendl;
ldout(cct, 10) << __func__ << ": failed to acquire lock: GC queue = [" << iname
<< "]" << "caller_id =[" << caller_id << "]" << "rc = " << rc << dendl;
} else {
ldout(cct, 10) << "Acquired lock for GC Queue =["<< iname << "]" << dendl;
ldout(cct, 10) << __func__ << ": acquired lock for GC queue = ["
<< iname << "]" << dendl;
}
}
if (rc == 0)
Expand All @@ -396,7 +483,6 @@ int MotrGC::un_lock_gc_index(uint32_t& index) {
}

int MotrGC::list(std::vector<std::unordered_map<std::string, std::string>> &gc_entries) {
int rc = 0;
int max_entries = 1000;
max_indices = get_max_indices();
for (uint32_t i = 0; i < max_indices; i++) {
Expand All @@ -411,17 +497,17 @@ int MotrGC::list(std::vector<std::unordered_map<std::string, std::string>> &gc_e
if (!marker.empty()) {
keys[0] = marker;
}
rc = store->next_query_by_name(iname, keys, vals, obj_tag_prefix);
int rc = store->next_query_by_name(iname, keys, vals, obj_tag_prefix);
if (rc < 0) {
ldout(cct, 0) <<__func__<<": ERROR: NEXT query failed. rc="
<< rc << dendl;
ldout(cct, 0) << __func__ << ": ERROR: next query failed for " << iname
<< " with rc=" << rc << dendl;
return rc;
}
if (rc == max_entries + 1) {
truncated = true;
marker = keys.back();
}
for (int j = 0; j < max_entries &&
for (int j = 0; j < max_entries &&
!keys[j].empty() && keys[j] != obj_tag_prefix; j++) {
bufferlist::const_iterator blitr = vals[j].cbegin();
motr_gc_obj_info ginfo;
Expand Down
4 changes: 3 additions & 1 deletion src/rgw/motr/gc/gc.h
Original file line number Diff line number Diff line change
Expand Up @@ -172,7 +172,9 @@ class MotrGC : public DoutPrefixProvider {
int dequeue(std::string iname, motr_gc_obj_info obj);

int list(std::vector<std::unordered_map<std::string, std::string>> &gc_entries);
int delete_motr_obj_from_gc(motr_gc_obj_info ginfo);
int delete_obj_from_gc(motr_gc_obj_info ginfo);
int process_parts(motr_gc_obj_info ginfo, std::time_t end_time);
int delete_motr_obj(Meta motr_obj);
int get_locked_gc_index(uint32_t& rand_ind, uint32_t& lease_duration);
int un_lock_gc_index(uint32_t& index);

Expand Down
29 changes: 28 additions & 1 deletion src/rgw/rgw_sal_motr.cc
Original file line number Diff line number Diff line change
Expand Up @@ -2215,7 +2215,34 @@ int MotrObject::remove_mobj_and_index_entry(
if (ent.meta.size != 0) {
if (ent.meta.category == RGWObjCategory::MultiMeta) {
this->set_category(RGWObjCategory::MultiMeta);
rc = this->delete_part_objs(dpp, &size_rounded);
if (store->gc_enabled()) {
ldpp_dout(dpp, 0) <<__func__<< ": trying to put obj into GC=" << dendl;
std::string upload_id;
rc = store->get_upload_id(bucket_name, delete_key, upload_id);
if (rc < 0) {
ldpp_dout(dpp, 0) <<__func__<< ": ERROR: get_upload_id failed. rc=" << rc << dendl;
}
else {
std::string obj_key = delete_key;
obj_key = obj_key.erase(obj_key.size() - 1, 2);
std::string obj_fqdn = bucket_name + "/" + obj_key;
std::string obj_part_iname = "motr.rgw.object." + bucket_name + "." +
obj_key + "." + upload_id + ".parts";
ldpp_dout(dpp, 20) << __func__ << ": object part index=" << obj_part_iname << dendl;
::Meta *mobj = reinterpret_cast<::Meta*>(&this->meta);
motr_gc_obj_info gc_obj(upload_id, obj_fqdn, *mobj, std::time(nullptr),
ent.meta.size, size_rounded, true, obj_part_iname);
rc = store->get_gc()->enqueue(gc_obj);
if (rc == 0) {
pushed_to_gc = true;
ldpp_dout(dpp, 20) << __func__ << ": Pushed the delete req with tag="
<< upload_id << " to the motr garbage collector." << dendl;
}
}
}
if (!pushed_to_gc) {
rc = this->delete_part_objs(dpp, &size_rounded);
}
} else {
// Handling Simple Object Deletion
// Open the object if not already open.
Expand Down

0 comments on commit b172e2b

Please sign in to comment.