From f60009eb85a8db7bd5cd4ee44ba2899a867f071a Mon Sep 17 00:00:00 2001 From: Jeet Jain Date: Tue, 30 Aug 2022 15:22:14 +0530 Subject: [PATCH] rgw: [CORTX-32694] Motr GC Implementation for Multipart Objects (#400) Motr GC producer and consumer module implementation for multipart objects Signed-off-by: Jeet Jain Signed-off-by: Jeet Jain --- src/rgw/motr/gc/gc.cc | 189 ++++++++++++++++++++++++++++++---------- src/rgw/motr/gc/gc.h | 17 ++-- src/rgw/rgw_admin.cc | 1 - src/rgw/rgw_sal_motr.cc | 35 +++++++- 4 files changed, 181 insertions(+), 61 deletions(-) diff --git a/src/rgw/motr/gc/gc.cc b/src/rgw/motr/gc/gc.cc index a2f2d1eb50e75..47c9336febb38 100644 --- a/src/rgw/motr/gc/gc.cc +++ b/src/rgw/motr/gc/gc.cc @@ -31,7 +31,7 @@ void *MotrGC::GCWorker::entry() { do { std::string gc_log_prefix = "[" + gc_thread_prefix + std::to_string(worker_id) + "] "; - ldpp_dout(dpp, 10) << gc_log_prefix << __func__ << " Iteration Started" << dendl; + ldpp_dout(dpp, 10) << gc_log_prefix << __func__ << ": iteration started" << dendl; std::string iname = ""; // Get lock on an GC index @@ -49,8 +49,8 @@ void *MotrGC::GCWorker::entry() { // form the index name iname = motr_gc->index_names[my_index]; ldpp_dout(dpp, 10) << gc_log_prefix - << __func__ << ": Working on GC Queue: " << iname << dendl; - + << __func__ << ": working on GC queue: " << iname << dendl; + bufferlist bl; std::vector keys(motr_gc->max_count + 1); std::vector vals(motr_gc->max_count + 1); @@ -58,11 +58,11 @@ void *MotrGC::GCWorker::entry() { keys[0] = obj_exp_time_prefix; rc = motr_gc->store->next_query_by_name(iname, keys, vals, obj_exp_time_prefix); - ldpp_dout(dpp, 0) << gc_log_prefix + ldpp_dout(dpp, 20) << gc_log_prefix << __func__ <<": next_query_by_name() rc=" << rc << dendl; if (rc < 0) { // In case of failure, worker will keep retrying till end_time - ldpp_dout(dpp, 0) << gc_log_prefix + ldpp_dout(dpp, 0) << gc_log_prefix << __func__ <<": ERROR: NEXT query failed. rc=" << rc << dendl; continue; } @@ -80,26 +80,53 @@ void *MotrGC::GCWorker::entry() { } // delete motr object if(ginfo.is_multipart) { - // handle multipart object deletion + // Handle distribution of parts of a multipart object across available + // GC queues. With large number of parts, a gc thread may exhaust its + // alloted time, in that case it will be preempted and remaining parts + // will be processed in next cycle + rc = motr_gc->process_parts(ginfo, end_time); + if (rc < 0 && rc != -ETIMEDOUT) { + ldpp_dout(dpp, 0) << gc_log_prefix << __func__ + << ": ERROR: failed to distribute multipart object " + << ginfo.name << " with tag " << ginfo.tag << " rc: " << rc << dendl; + continue; // should continue processing next objects + } else if (rc == -ETIMEDOUT) { + ldpp_dout(dpp, 0) << gc_log_prefix << __func__ + << ": processing time's up for current cycle" << dendl; + } else { + ldpp_dout(dpp, 20) << gc_log_prefix + << "successfully distributed multipart object " + << ginfo.name << " with tag " << ginfo.tag << dendl; + } } else { // simple object - rc = motr_gc->delete_motr_obj_from_gc(ginfo); + rc = motr_gc->delete_obj_from_gc(ginfo); if (rc < 0) { ldpp_dout(dpp, 0) << gc_log_prefix - << __func__ <<"ERROR: Motr obj deletion failed for " + << __func__ <<": ERROR: motr obj deletion failed for " << ginfo.tag << " with rc: " << rc << dendl; continue; // should continue deletion for next objects } + ldpp_dout(dpp, 20) << gc_log_prefix + << "successfully deleted object " << ginfo.name + << " with tag " << ginfo.tag << dendl; + } + // delete entry from GC queue unless there is a timeout + if (rc != -ETIMEDOUT) { + rc = motr_gc->dequeue(iname, ginfo); + if (rc == 0) { + ldpp_dout(dpp, 20) << gc_log_prefix + << "successfully deleted GC entry for " << ginfo.name + << " with tag " << ginfo.tag << dendl; + } + processed_count++; } - // delete entry from GC queue - rc = motr_gc->dequeue(iname, ginfo); - processed_count++; - + // Update current time current_time = std::time(nullptr); // Exit the loop if required work is complete - if (processed_count >= motr_gc->max_count + if (processed_count >= motr_gc->max_count || current_time > end_time || motr_gc->going_down()) break; } @@ -107,7 +134,7 @@ void *MotrGC::GCWorker::entry() { motr_gc->un_lock_gc_index(my_index); } else { ldpp_dout(dpp, 0) << gc_log_prefix - << __func__ << "ERROR: No GC index is available for locking." + << __func__ << ": ERROR: no GC index is available for locking." << " Skipping GC processing" << " rc = " << rc << dendl; } my_index = (my_index + 1) % motr_gc->max_indices; @@ -119,7 +146,7 @@ void *MotrGC::GCWorker::entry() { } } while (!motr_gc->going_down()); - ldpp_dout(dpp, 0) << __func__ << ": Stop signal called for " + ldpp_dout(dpp, 0) << __func__ << ": stop signal called for " << gc_thread_prefix << worker_id << dendl; return nullptr; } @@ -133,7 +160,7 @@ int MotrGC::initialize() { std::string iname = gc_index_prefix + "." + std::to_string(ind_suf); rc = store->create_motr_idx_by_name(iname); if (rc < 0 && rc != -EEXIST){ - ldout(cct, 0) << "ERROR: GC index creation failed with rc: " << rc << dendl; + ldout(cct, 0) << ": ERROR: GC index creation failed with rc: " << rc << dendl; break; } if (rc == -EEXIST) rc = 0; @@ -154,7 +181,7 @@ int MotrGC::initialize() { rc = kv_lock_provider->initialize( this, store, global_lock_table); if (rc < 0) { - ldout(cct, 0) << "ERROR: Failed to initialize lock provider: " + ldout(cct, 0) << "ERROR: failed to initialize lock provider: " << rc << dendl; return rc; } @@ -223,27 +250,97 @@ uint32_t MotrGC::get_max_indices() { return gc_max_indices; } -int MotrGC::delete_motr_obj_from_gc(motr_gc_obj_info ginfo) { +int MotrGC::delete_obj_from_gc(motr_gc_obj_info ginfo) { + int rc = delete_motr_obj(ginfo.mobj); + return rc; +} + +int MotrGC::process_parts(motr_gc_obj_info ginfo, std::time_t end_time) { + int rc = 0; + int max_entries = 10000; + int number_of_parts = 0; + int processed_parts = 0; + std::vector keys(max_entries); + std::vector vals(max_entries); + + rc = store->next_query_by_name(ginfo.multipart_iname, keys, vals); + if (rc < 0) { + ldout(cct, 0) <<__func__<<": ERROR: next query failed. rc=" + << rc << dendl; + return rc; + } + number_of_parts = rc; + for (const auto& bl: vals) { + if (bl.length() == 0) + break; + + RGWUploadPartInfo info; + auto iter = bl.cbegin(); + info.decode(iter); + rgw::sal::Attrs attrs_dummy; + decode(attrs_dummy, iter); + Meta mobj; + mobj.decode(iter); + ldout(cct, 20) <<__func__<< ": part_num=" << info.num + << " part_size=" << info.size << dendl; + std::string tag = PRIx64 + std::to_string(mobj.oid.u_hi) + ":" + + PRIx64 + std::to_string(mobj.oid.u_lo); + std::string part_name = "part."; + char buff[32]; + snprintf(buff, sizeof(buff), "%08d", (int)info.num); + part_name.append(buff); + + std::string obj_fqdn = ginfo.name + "." + part_name; + motr_gc_obj_info gc_obj(tag, obj_fqdn, mobj, ginfo.deletion_time, + info.size, false, ""); + rc = enqueue(gc_obj); + if (rc < 0) { + ldout(cct, 0) <<__func__<< ": ERROR: failed to push " + << obj_fqdn << "into GC queue " << dendl; + continue; + } + processed_parts++; + bufferlist bl_del; + rc = store->do_idx_op_by_name(ginfo.multipart_iname, + M0_IC_DEL, part_name, bl_del); + if (rc < 0) { + ldout(cct, 0) <<__func__<< ": ERROR: failed to remove part " << part_name + << " from part index " << ginfo.multipart_iname << dendl; + } + if (std::time(nullptr) > end_time || going_down()) { + // processing time's up, so return now + return -ETIMEDOUT; + } + } + + if (processed_parts == number_of_parts) { + store->delete_motr_idx_by_name(ginfo.multipart_iname); + } + + return rc; +} + +int MotrGC::delete_motr_obj(Meta motr_obj) { int rc; struct m0_op *op = nullptr; char fid_str[M0_FID_STR_LEN]; - snprintf(fid_str, ARRAY_SIZE(fid_str), U128X_F, U128_P(&ginfo.mobj.oid)); + snprintf(fid_str, ARRAY_SIZE(fid_str), U128X_F, U128_P(&motr_obj.oid)); - if (!ginfo.mobj.oid.u_hi || !ginfo.mobj.oid.u_lo) { + if (!motr_obj.oid.u_hi || !motr_obj.oid.u_lo) { ldout(cct, 0) <<__func__<< ": invalid motr object oid=" << fid_str << dendl; return -EINVAL; } ldout(cct, 10) <<__func__<< ": deleting motr object oid=" << fid_str << dendl; // Open the object. - if (ginfo.mobj.layout_id == 0) { + if (motr_obj.layout_id == 0) { return -ENOENT; } auto mobj = new m0_obj(); memset(mobj, 0, sizeof *mobj); - m0_obj_init(mobj, &store->container.co_realm, &ginfo.mobj.oid, store->conf.mc_layout_id); - mobj->ob_attr.oa_layout_id = ginfo.mobj.layout_id; - mobj->ob_attr.oa_pver = ginfo.mobj.pver; + m0_obj_init(mobj, &store->container.co_realm, &motr_obj.oid, store->conf.mc_layout_id); + mobj->ob_attr.oa_layout_id = motr_obj.layout_id; + mobj->ob_attr.oa_pver = motr_obj.pver; mobj->ob_entity.en_flags |= M0_ENF_META; rc = m0_entity_open(&mobj->ob_entity, &op); if (rc != 0) { @@ -293,18 +390,17 @@ int MotrGC::delete_motr_obj_from_gc(motr_gc_obj_info ginfo) { m0_obj_fini(mobj); delete mobj; mobj = nullptr; } - ldout(cct, 10) <<__func__<< ": deleted motr object oid=" - << fid_str << " for tag=" << ginfo.tag << dendl; return 0; } int MotrGC::enqueue(motr_gc_obj_info obj) { int rc = 0; // create 🔑's: - // - 1_{obj.deletion_time + min_wait_time} + // - 1_{obj.deletion_time}_{obj.tag} // - 0_{obj.tag} std::string key1 = obj_exp_time_prefix + - std::to_string(obj.deletion_time + cct->_conf->rgw_gc_obj_min_wait); + std::to_string(obj.deletion_time) + "_" + + obj.tag; std::string key2 = obj_tag_prefix + obj.tag; bufferlist bl; @@ -337,19 +433,21 @@ int MotrGC::dequeue(std::string iname, motr_gc_obj_info obj) { bufferlist bl; std::string tag_key = obj_tag_prefix + obj.tag; std::string expiry_time_key = obj_exp_time_prefix + - std::to_string(obj.deletion_time + cct->_conf->rgw_gc_obj_min_wait); + std::to_string(obj.deletion_time) + + "_" + obj.tag; rc = store->do_idx_op_by_name(iname, M0_IC_DEL, tag_key, bl); if (rc < 0) { - ldout(cct, 0) << "ERROR: failed to delete tag entry " + ldout(cct, 0) << __func__ << ": ERROR: failed to delete tag entry " << tag_key << " rc: " << rc << dendl; } - ldout(cct, 10) << "Deleted tag entry "<< tag_key << dendl; + ldout(cct, 10) << __func__ << ": deleted tag entry " << tag_key << dendl; rc = store->do_idx_op_by_name(iname, M0_IC_DEL, expiry_time_key, bl); if (rc < 0 && rc != -EEXIST) { - ldout(cct, 0) << "ERROR: failed to delete time entry " + ldout(cct, 0) << __func__ << ": ERROR: failed to delete time entry " << expiry_time_key << " rc: " << rc << dendl; } - ldout(cct, 10) << "Deleted time entry "<< expiry_time_key << dendl; + ldout(cct, 10) << __func__ << ": deleted time entry " + << expiry_time_key << dendl; return rc; } @@ -372,10 +470,11 @@ int MotrGC::get_locked_gc_index(uint32_t& rand_ind, rc = gc_lock->lock(iname, MotrLockType::EXCLUSIVE, gc_lease_duration, caller_id); if (rc < 0) { - ldout(cct, 10) << "Failed to acquire lock: GC Queue =["<< iname << "]" - << "caller_id =[" << caller_id << "]" << "rc = " << rc << dendl; + ldout(cct, 10) << __func__ << ": failed to acquire lock: GC queue = [" << iname + << "]" << "caller_id =[" << caller_id << "]" << "rc = " << rc << dendl; } else { - ldout(cct, 10) << "Acquired lock for GC Queue =["<< iname << "]" << dendl; + ldout(cct, 10) << __func__ << ": acquired lock for GC queue = [" + << iname << "]" << dendl; } } if (rc == 0) @@ -396,7 +495,6 @@ int MotrGC::un_lock_gc_index(uint32_t& index) { } int MotrGC::list(std::vector> &gc_entries) { - int rc = 0; int max_entries = 1000; max_indices = get_max_indices(); for (uint32_t i = 0; i < max_indices; i++) { @@ -411,17 +509,17 @@ int MotrGC::list(std::vector> &gc_e if (!marker.empty()) { keys[0] = marker; } - rc = store->next_query_by_name(iname, keys, vals, obj_tag_prefix); + int rc = store->next_query_by_name(iname, keys, vals, obj_tag_prefix); if (rc < 0) { - ldout(cct, 0) <<__func__<<": ERROR: NEXT query failed. rc=" - << rc << dendl; - return rc; + ldout(cct, 0) << __func__ << ": ERROR: next query failed for " << iname + << " with rc=" << rc << dendl; + continue; } if (rc == max_entries + 1) { truncated = true; marker = keys.back(); } - for (int j = 0; j < max_entries && + for (int j = 0; j < max_entries && !keys[j].empty() && keys[j] != obj_tag_prefix; j++) { bufferlist::const_iterator blitr = vals[j].cbegin(); motr_gc_obj_info ginfo; @@ -429,17 +527,16 @@ int MotrGC::list(std::vector> &gc_e std::unordered_map mp; char t_str[100]; std::string deletion_time; - if (std::strftime(t_str, sizeof(t_str), "%Y-%m-%dT%H:%M:%S%z%Z", std::localtime(&ginfo.deletion_time))) { + if (std::strftime(t_str, sizeof(t_str), "%Y-%m-%dT%H:%M:%S%z%Z", + std::localtime(&ginfo.deletion_time))) { deletion_time = t_str; - } - else { + } else { deletion_time = std::to_string(ginfo.deletion_time); } mp["tag"] = ginfo.tag; mp["name"] = ginfo.name; mp["deletion_time"] = deletion_time; mp["size"] = std::to_string(ginfo.size); - mp["size_actual"] = std::to_string(ginfo.size_actual); mp["is_multipart"] = ginfo.is_multipart ? "true" : "false"; gc_entries.push_back(mp); ldout(cct, 70) << ginfo.tag << ", " diff --git a/src/rgw/motr/gc/gc.h b/src/rgw/motr/gc/gc.h index cf16489debee0..af8dff5b1e9e3 100644 --- a/src/rgw/motr/gc/gc.h +++ b/src/rgw/motr/gc/gc.h @@ -68,22 +68,19 @@ struct motr_gc_obj_info { Meta mobj; // motr obj std::time_t deletion_time; // time when Motr object was requested for deletion std::uint64_t size; // size of obj - std::uint64_t size_actual; // size of disk bool is_multipart; // flag to indicate if object is multipart std::string multipart_iname; // part index name motr_gc_obj_info() {} motr_gc_obj_info(const std::string& _tag, const std::string& _name, Meta& _mobj, const std::time_t& _deletion_time, const std::uint64_t& _size, - const std::uint64_t& _size_actual, bool _is_multipart, - const std::string& _multipart_iname) + bool _is_multipart, const std::string& _multipart_iname) : tag(_tag), name(_name), mobj(_mobj), deletion_time(_deletion_time), size(_size), - size_actual(_size_actual), is_multipart(_is_multipart), - multipart_iname(_multipart_iname) {} + is_multipart(_is_multipart), multipart_iname(_multipart_iname) {} void encode(bufferlist &bl) const { - ENCODE_START(12, 2, bl); + ENCODE_START(11, 2, bl); encode(tag, bl); encode(name, bl); encode(mobj.oid.u_hi, bl); @@ -93,14 +90,13 @@ struct motr_gc_obj_info { encode(mobj.layout_id, bl); encode(deletion_time, bl); encode(size, bl); - encode(size_actual, bl); encode(is_multipart, bl); encode(multipart_iname, bl); ENCODE_FINISH(bl); } void decode(bufferlist::const_iterator &bl) { - DECODE_START_LEGACY_COMPAT_LEN_32(12, 2, 2, bl); + DECODE_START_LEGACY_COMPAT_LEN_32(11, 2, 2, bl); decode(tag, bl); decode(name, bl); decode(mobj.oid.u_hi, bl); @@ -110,7 +106,6 @@ struct motr_gc_obj_info { decode(mobj.layout_id, bl); decode(deletion_time, bl); decode(size, bl); - decode(size_actual, bl); decode(is_multipart, bl); decode(multipart_iname, bl); DECODE_FINISH(bl); @@ -172,7 +167,9 @@ class MotrGC : public DoutPrefixProvider { int dequeue(std::string iname, motr_gc_obj_info obj); int list(std::vector> &gc_entries); - int delete_motr_obj_from_gc(motr_gc_obj_info ginfo); + int delete_obj_from_gc(motr_gc_obj_info ginfo); + int process_parts(motr_gc_obj_info ginfo, std::time_t end_time); + int delete_motr_obj(Meta motr_obj); int get_locked_gc_index(uint32_t& rand_ind, uint32_t& lease_duration); int un_lock_gc_index(uint32_t& index); diff --git a/src/rgw/rgw_admin.cc b/src/rgw/rgw_admin.cc index 93207b6aa4484..a344b3e1d3156 100644 --- a/src/rgw/rgw_admin.cc +++ b/src/rgw/rgw_admin.cc @@ -7828,7 +7828,6 @@ int main(int argc, const char **argv) formatter->dump_string("name", ginfo["name"]); formatter->dump_string("deletion_time", ginfo["deletion_time"]); formatter->dump_string("size", ginfo["size"]); - formatter->dump_string("size_actual", ginfo["size_actual"]); formatter->dump_string("is_multipart", ginfo["is_multipart"]); formatter->close_section(); } diff --git a/src/rgw/rgw_sal_motr.cc b/src/rgw/rgw_sal_motr.cc index 12496873420ad..86c8e841c0122 100644 --- a/src/rgw/rgw_sal_motr.cc +++ b/src/rgw/rgw_sal_motr.cc @@ -2216,7 +2216,33 @@ int MotrObject::remove_mobj_and_index_entry( if (ent.meta.size != 0) { if (ent.meta.category == RGWObjCategory::MultiMeta) { this->set_category(RGWObjCategory::MultiMeta); - rc = this->delete_part_objs(dpp, &size_rounded); + if (store->gc_enabled()) { + std::string upload_id; + rc = store->get_upload_id(bucket_name, delete_key, upload_id); + if (rc < 0) { + ldpp_dout(dpp, 0) <<__func__<< ": ERROR: get_upload_id failed. rc=" << rc << dendl; + } else { + std::string obj_key = delete_key; + obj_key = obj_key.erase(obj_key.size() - 1, 2); + std::string obj_fqdn = bucket_name + "/" + obj_key; + std::string obj_part_iname = "motr.rgw.object." + bucket_name + "." + + obj_key + "." + upload_id + ".parts"; + ldpp_dout(dpp, 20) << __func__ << ": object part index=" << obj_part_iname << dendl; + ::Meta *mobj = reinterpret_cast<::Meta*>(&this->meta); + motr_gc_obj_info gc_obj(upload_id, obj_fqdn, *mobj, std::time(nullptr), + ent.meta.size, true, obj_part_iname); + rc = store->get_gc()->enqueue(gc_obj); + if (rc == 0) { + pushed_to_gc = true; + ldpp_dout(dpp, 20) << __func__ << ": pushed object " << obj_fqdn + << " with tag " << upload_id + << " to motr garbage collector." << dendl; + } + } + } + if (!pushed_to_gc) { + rc = this->delete_part_objs(dpp, &size_rounded); + } } else { // Handling Simple Object Deletion // Open the object if not already open. @@ -2236,12 +2262,13 @@ int MotrObject::remove_mobj_and_index_entry( std::string obj_fqdn = bucket_name + "/" + delete_key; ::Meta *mobj = reinterpret_cast<::Meta*>(&this->meta); motr_gc_obj_info gc_obj(tag, obj_fqdn, *mobj, std::time(nullptr), - ent.meta.size, size_rounded, false, ""); + ent.meta.size, false, ""); rc = store->get_gc()->enqueue(gc_obj); if (rc == 0) { pushed_to_gc = true; - ldpp_dout(dpp, 20) <<__func__<< ": Pushed the delete req for OID=" - << tag << " to the motr garbage collector." << dendl; + ldpp_dout(dpp, 20) << __func__ << ": pushed object " << obj_fqdn + << " with tag " << tag + << " to motr garbage collector." << dendl; } } if (! pushed_to_gc)