Skip to content
This repository has been archived by the owner on Feb 8, 2024. It is now read-only.

rgw_sal_motr: [CORTX-34181] allow multiparts with offline server #429

Merged
merged 12 commits into from
Sep 19, 2022
12 changes: 2 additions & 10 deletions src/rgw/motr/gc/gc.cc
Original file line number Diff line number Diff line change
Expand Up @@ -258,18 +258,16 @@ int MotrGC::delete_obj_from_gc(motr_gc_obj_info ginfo) {
int MotrGC::process_parts(motr_gc_obj_info ginfo, std::time_t end_time) {
int rc = 0;
int max_entries = 10000;
int number_of_parts = 0;
int processed_parts = 0;
std::vector<std::string> keys(max_entries);
std::vector<bufferlist> vals(max_entries);

keys[0] = ginfo.name;
rc = store->next_query_by_name(ginfo.multipart_iname, keys, vals);
if (rc < 0) {
ldout(cct, 0) <<__func__<<": ERROR: next query failed. rc="
<< rc << dendl;
return rc;
}
number_of_parts = rc;
for (const auto& bl: vals) {
if (bl.length() == 0)
break;
Expand All @@ -290,15 +288,13 @@ int MotrGC::process_parts(motr_gc_obj_info ginfo, std::time_t end_time) {
part_name.append(buff);

std::string obj_fqdn = ginfo.name + "." + part_name;
motr_gc_obj_info gc_obj(tag, obj_fqdn, mobj, ginfo.deletion_time,
info.size, false, "");
motr_gc_obj_info gc_obj(tag, obj_fqdn, mobj, ginfo.deletion_time, info.size);
rc = enqueue(gc_obj);
if (rc < 0) {
ldout(cct, 0) <<__func__<< ": ERROR: failed to push "
<< obj_fqdn << "into GC queue " << dendl;
continue;
}
processed_parts++;
bufferlist bl_del;
rc = store->do_idx_op_by_name(ginfo.multipart_iname,
M0_IC_DEL, part_name, bl_del);
Expand All @@ -312,10 +308,6 @@ int MotrGC::process_parts(motr_gc_obj_info ginfo, std::time_t end_time) {
}
}

if (processed_parts == number_of_parts) {
store->delete_motr_idx_by_name(ginfo.multipart_iname);
}

return rc;
}

Expand Down
8 changes: 4 additions & 4 deletions src/rgw/motr/gc/gc.h
Original file line number Diff line number Diff line change
Expand Up @@ -81,10 +81,10 @@ struct motr_gc_obj_info {
motr_gc_obj_info() {}
motr_gc_obj_info(const std::string& _tag, const std::string& _name, Meta& _mobj,
const std::time_t& _deletion_time, const std::uint64_t& _size,
bool _is_multipart, const std::string& _multipart_iname)
: tag(_tag), name(_name), mobj(_mobj),
deletion_time(_deletion_time), size(_size),
is_multipart(_is_multipart), multipart_iname(_multipart_iname) {}
const std::string& _multipart_iname="")
: tag(_tag), name(_name), mobj(_mobj), deletion_time(_deletion_time),
size(_size), is_multipart(_multipart_iname != ""),
multipart_iname(_multipart_iname) {}

void encode(bufferlist &bl) const {
ENCODE_START(11, 2, bl);
Expand Down
127 changes: 60 additions & 67 deletions src/rgw/rgw_sal_motr.cc
Original file line number Diff line number Diff line change
Expand Up @@ -877,8 +877,14 @@ int MotrBucket::remove_bucket(const DoutPrefixProvider *dpp, bool delete_childre
}

// 3. Remove mp index??
string bucket_multipart_iname = "motr.rgw.bucket." + tenant_bkt_name + ".multiparts";
ret = store->delete_motr_idx_by_name(bucket_multipart_iname);
string iname = "motr.rgw.bucket." + tenant_bkt_name + ".multiparts.in-progress";
ret = store->delete_motr_idx_by_name(iname);
if (ret < 0) {
ldpp_dout(dpp, LOG_ERROR) <<__func__ << ": ERROR: failed to remove multipart.in-progress index rc=" << ret << dendl;
return ret;
}
iname = "motr.rgw.bucket." + tenant_bkt_name + ".multiparts";
ret = store->delete_motr_idx_by_name(iname);
if (ret < 0) {
ldpp_dout(dpp, LOG_ERROR) <<__func__ << ": ERROR: failed to remove multipart index rc=" << ret << dendl;
return ret;
Expand Down Expand Up @@ -1096,18 +1102,30 @@ int MotrBucket::create_multipart_indices()
int rc;
string tenant_bkt_name = get_bucket_name(info.bucket.tenant, info.bucket.name);

// Bucket multipart index stores in-progress multipart uploads.
// There are two additional indexes per bucket for multiparts:
// one for in-progress uploads, another one for completed uploads.

// Key is the object name + upload_id, value is a rgw_bucket_dir_entry.
// An entry is inserted when a multipart upload is initialised (
// MotrMultipartUpload::init()) and will be removed when the upload
// is completed (MotrMultipartUpload::complete()).
// MotrBucket::list_multiparts() will scan this index to return all
// in-progress multipart uploads in the bucket.
string bucket_multipart_iname = "motr.rgw.bucket." + tenant_bkt_name + ".multiparts";
rc = store->create_motr_idx_by_name(bucket_multipart_iname);
string iname = "motr.rgw.bucket." + tenant_bkt_name + ".multiparts.in-progress";
rc = store->create_motr_idx_by_name(iname);
if (rc < 0) {
ldout(store->cctx, LOG_ERROR) <<__func__
<< ": ERROR: failed to create bucket in-progress multiparts index "
<< iname << ", rc=" << rc << dendl;
return rc;
}

iname = "motr.rgw.bucket." + tenant_bkt_name + ".multiparts";
rc = store->create_motr_idx_by_name(iname);
if (rc < 0) {
ldout(store->cctx, LOG_ERROR) <<__func__ << ": ERROR: failed to create bucket multipart index "
<< bucket_multipart_iname << dendl;
ldout(store->cctx, LOG_ERROR) <<__func__
<< ": ERROR: failed to create bucket multiparts index "
<< iname << ", rc=" << rc << dendl;
return rc;
}

Expand Down Expand Up @@ -1455,7 +1473,7 @@ int MotrBucket::list_multiparts(const DoutPrefixProvider *dpp,
string tenant_bkt_name = get_bucket_name(this->get_tenant(), this->get_name());

string bucket_multipart_iname =
"motr.rgw.bucket." + tenant_bkt_name + ".multiparts";
"motr.rgw.bucket." + tenant_bkt_name + ".multiparts.in-progress";
key_vec[0].clear();
key_vec[0].assign(marker.begin(), marker.end());
rc = store->next_query_by_name(bucket_multipart_iname, key_vec, val_vec, prefix, delim);
Expand Down Expand Up @@ -2212,7 +2230,8 @@ int MotrObject::MotrDeleteOp::create_delete_marker(const DoutPrefixProvider* dpp
int MotrObject::remove_mobj_and_index_entry(
const DoutPrefixProvider* dpp, rgw_bucket_dir_entry& ent,
std::string delete_key, std::string bucket_index_iname,
std::string bucket_name) {
std::string bucket_name)
{
int rc;
bufferlist bl;
uint64_t size_rounded = 0;
Expand All @@ -2228,14 +2247,12 @@ int MotrObject::remove_mobj_and_index_entry(
if (rc < 0) {
ldpp_dout(dpp, 0) <<__func__<< ": ERROR: get_upload_id failed. rc=" << rc << dendl;
} else {
std::string obj_fqdn = bucket_name + "/" + delete_key;
std::string obj_part_iname = "motr.rgw.object." + bucket_name + "." +
this->get_name() + "." + upload_id +
".parts";
ldpp_dout(dpp, 20) << __func__ << ": object part index=" << obj_part_iname << dendl;
std::string obj_fqdn = this->get_name() + "." + upload_id;
std::string iname = "motr.rgw.bucket." + bucket_name + ".multiparts";
ldpp_dout(dpp, 20) << __func__ << ": object part index=" << iname << dendl;
::Meta *mobj = reinterpret_cast<::Meta*>(&this->meta);
motr_gc_obj_info gc_obj(upload_id, obj_fqdn, *mobj, std::time(nullptr),
ent.meta.size, true, obj_part_iname);
ent.meta.size, iname);
rc = store->get_gc()->enqueue(gc_obj);
if (rc == 0) {
pushed_to_gc = true;
Expand Down Expand Up @@ -2266,7 +2283,7 @@ int MotrObject::remove_mobj_and_index_entry(
std::string obj_fqdn = bucket_name + "/" + delete_key;
::Meta *mobj = reinterpret_cast<::Meta*>(&this->meta);
motr_gc_obj_info gc_obj(tag, obj_fqdn, *mobj, std::time(nullptr),
ent.meta.size, false, "");
ent.meta.size);
rc = store->get_gc()->enqueue(gc_obj);
if (rc == 0) {
pushed_to_gc = true;
Expand Down Expand Up @@ -2697,10 +2714,9 @@ int MotrObject::create_mobj(const DoutPrefixProvider *dpp, uint64_t sz)
}
expected_obj_size = sz;
chunk_io_sz = expected_obj_size;
if (expected_obj_size > MAX_ACC_SIZE) {
if (expected_obj_size > MAX_ACC_SIZE)
// Cap it to MAX_ACC_SIZE
chunk_io_sz = MAX_ACC_SIZE;
}

ldpp_dout(dpp, 20) <<__func__ << ": key=" << this->get_key().to_str()
<< " size=" << sz << " meta:oid=[0x" << std::hex
Expand Down Expand Up @@ -2928,11 +2944,10 @@ int MotrObject::write_mobj(const DoutPrefixProvider *dpp, bufferlist&& in_buffer
available_data = io_ctxt.total_bufer_sz;
}
bs = this->get_optimal_bs(chunk_io_sz);
if (bs < chunk_io_sz) {
if (bs < chunk_io_sz)
chunk_io_sz = bs;
}
int64_t remaining_bytes =
expected_obj_size - processed_bytes;

int64_t remaining_bytes = expected_obj_size - processed_bytes;
// Check if this is the last io of the original object size
if (remaining_bytes <= 0)
last_io = true;
Expand Down Expand Up @@ -3975,10 +3990,7 @@ int MotrMultipartUpload::delete_parts(const DoutPrefixProvider *dpp, std::string
<< ", rc=" << rc << dendl;
}

// Delete object part index.
string obj_part_iname = "motr.rgw.object." + tenant_bkt_name + "." + mp_obj.get_key() +
"." + upload_id + ".parts";
return store->delete_motr_idx_by_name(obj_part_iname);
return rc;
}

int MotrMultipartUpload::abort(const DoutPrefixProvider *dpp, CephContext *cct,
Expand All @@ -3991,7 +4003,7 @@ int MotrMultipartUpload::abort(const DoutPrefixProvider *dpp, CephContext *cct,
meta_obj = get_meta_obj();
string tenant_bkt_name = get_bucket_name(meta_obj->get_bucket()->get_tenant(), meta_obj->get_bucket()->get_name());
string bucket_multipart_iname =
"motr.rgw.bucket." + tenant_bkt_name + ".multiparts";
"motr.rgw.bucket." + tenant_bkt_name + ".multiparts.in-progress";
rc = store->do_idx_op_by_name(bucket_multipart_iname,
M0_IC_GET, meta_obj->get_key().to_str(), bl);
if (rc < 0) {
Expand Down Expand Up @@ -4052,6 +4064,7 @@ int MotrMultipartUpload::init(const DoutPrefixProvider *dpp, optional_yield y,

owner = _owner;

string tenant_bkt_name = get_bucket_name(bucket->get_tenant(), bucket->get_name());
do {
char buf[33];
string tmp_obj_name;
Expand Down Expand Up @@ -4097,9 +4110,8 @@ int MotrMultipartUpload::init(const DoutPrefixProvider *dpp, optional_yield y,
encode(attrs, bl);
// Insert an entry into bucket multipart index so it is not shown
// when listing a bucket.
string tenant_bkt_name = get_bucket_name(obj->get_bucket()->get_tenant(), obj->get_bucket()->get_name());
string bucket_multipart_iname =
"motr.rgw.bucket." + tenant_bkt_name + ".multiparts";
"motr.rgw.bucket." + tenant_bkt_name + ".multiparts.in-progress";
rc = store->do_idx_op_by_name(bucket_multipart_iname,
M0_IC_PUT, obj->get_key().to_str(), bl);
} while (rc == -EEXIST);
Expand All @@ -4108,22 +4120,6 @@ int MotrMultipartUpload::init(const DoutPrefixProvider *dpp, optional_yield y,
ldpp_dout(dpp, LOG_ERROR) <<__func__ << ": ERROR: index opration failed, M0_IC_PUT rc="<< rc << dendl;
return rc;
}
string tenant_bkt_name = get_bucket_name(bucket->get_tenant(), bucket->get_name());
//This is multipart init, you will always have upload id here.
string upload_id = get_upload_id();

// Create object part index.
string obj_part_iname = "motr.rgw.object." + tenant_bkt_name + "." + mp_obj.get_key() +
"." + upload_id + ".parts";
ldpp_dout(dpp, 20) <<__func__ << ": object part index=" << obj_part_iname << dendl;
rc = store->create_motr_idx_by_name(obj_part_iname);
if (rc == -EEXIST)
rc = 0;
if (rc < 0) {
// TODO: clean the bucket index entry
ldpp_dout(dpp, LOG_ERROR) <<__func__ << ": ERROR: Failed to create object multipart index " << obj_part_iname << dendl;
return rc;
}

// Add one to the object_count of the current bucket stats
// Size will be added when parts are uploaded
Expand Down Expand Up @@ -4174,15 +4170,14 @@ int MotrMultipartUpload::list_parts(const DoutPrefixProvider *dpp, CephContext *
}
}

string obj_part_iname = "motr.rgw.object." + tenant_bkt_name + "." + mp_obj.get_key() +
"." + upload_id + ".parts";
ldpp_dout(dpp, 20) <<__func__ << ": object part index=" << obj_part_iname << dendl;
string iname = "motr.rgw.bucket." + tenant_bkt_name + ".multiparts";
ldpp_dout(dpp, 20) <<__func__ << ": object part index=" << iname << dendl;
key_vec[0].clear();
key_vec[0] = "part.";
key_vec[0] = mp_obj.get_key() + "." + upload_id;
char buf[32];
snprintf(buf, sizeof(buf), "%08d", marker + 1);
snprintf(buf, sizeof(buf), ".%08d", marker + 1);
key_vec[0].append(buf);
rc = store->next_query_by_name(obj_part_iname, key_vec, val_vec);
rc = store->next_query_by_name(iname, key_vec, val_vec);
andriytk marked this conversation as resolved.
Show resolved Hide resolved
if (rc < 0) {
ldpp_dout(dpp, LOG_ERROR) <<__func__ << ": ERROR: NEXT query failed. rc=" << rc << dendl;
return rc;
Expand Down Expand Up @@ -4377,7 +4372,7 @@ int MotrMultipartUpload::complete(const DoutPrefixProvider *dpp,
meta_obj = get_meta_obj();
string tenant_bkt_name = get_bucket_name(meta_obj->get_bucket()->get_tenant(), meta_obj->get_bucket()->get_name());
string bucket_multipart_iname =
"motr.rgw.bucket." + tenant_bkt_name + ".multiparts";
"motr.rgw.bucket." + tenant_bkt_name + ".multiparts.in-progress";
rc = this->store->do_idx_op_by_name(bucket_multipart_iname,
M0_IC_GET, meta_obj->get_key().to_str(), bl);
ldpp_dout(dpp, 20) <<__func__ << ": read entry from bucket multipart index rc=" << rc << dendl;
Expand Down Expand Up @@ -4481,7 +4476,7 @@ int MotrMultipartUpload::get_info(const DoutPrefixProvider *dpp, optional_yield
bufferlist bl;
string tenant_bkt_name = get_bucket_name(meta_obj->get_bucket()->get_tenant(), meta_obj->get_bucket()->get_name());
string bucket_multipart_iname =
"motr.rgw.bucket." + tenant_bkt_name + ".multiparts";
"motr.rgw.bucket." + tenant_bkt_name + ".multiparts.in-progress";
int rc = this->store->do_idx_op_by_name(bucket_multipart_iname,
M0_IC_GET, meta_obj->get_key().to_str(), bl);
if (rc < 0) {
Expand Down Expand Up @@ -4608,21 +4603,19 @@ int MotrMultipartWriter::complete(size_t accounted_size, const std::string& etag
encode(attrs, bl);
part_obj->meta.encode(bl);

string p = "part.";
char buf[32];
snprintf(buf, sizeof(buf), "%08d", (int)part_num);
p.append(buf);
string tenant_bkt_name = get_bucket_name(head_obj->get_bucket()->get_tenant(), head_obj->get_bucket()->get_name());
andriytk marked this conversation as resolved.
Show resolved Hide resolved
//This is a MultipartComplete operation so this should always have valid upload id.
string upload_id_str = upload_id;
string obj_part_iname = "motr.rgw.object." + tenant_bkt_name + "." +
head_obj->get_key().to_str() + "." + upload_id_str + ".parts";
ldpp_dout(dpp, 20) <<__func__ << ": object part index=" << obj_part_iname << dendl;
string part = head_obj->get_name() + "." + upload_id;
char buf[32];
snprintf(buf, sizeof(buf), ".%08d", (int)part_num);
part.append(buf);

// Before updating object part index with entry for new part, check if
// old part exists. Perform M0_IC_GET operation on object part index.
string tenant_bkt_name = get_bucket_name(head_obj->get_bucket()->get_tenant(),
head_obj->get_bucket()->get_name());
string iname = "motr.rgw.bucket." + tenant_bkt_name + ".multiparts";
bufferlist old_part_check_bl;
rc = store->do_idx_op_by_name(obj_part_iname, M0_IC_GET, p, old_part_check_bl);
rc = store->do_idx_op_by_name(iname, M0_IC_GET, part, old_part_check_bl);
if (rc == 0 && old_part_check_bl.length() > 0) {
// Old part exists. Try to delete it.
RGWUploadPartInfo old_part_info;
Expand All @@ -4631,7 +4624,7 @@ int MotrMultipartWriter::complete(size_t accounted_size, const std::string& etag
head_obj->get_key().to_str() +
".part." + std::to_string(part_num);
std::unique_ptr<MotrObject> old_part_obj =
std::make_unique<MotrObject>(this->store, rgw_obj_key(part_obj_name),head_obj->get_bucket());
std::make_unique<MotrObject>(this->store, rgw_obj_key(part_obj_name), head_obj->get_bucket());
if (old_part_obj == nullptr)
return -ENOMEM;

Expand All @@ -4648,14 +4641,14 @@ int MotrMultipartWriter::complete(size_t accounted_size, const std::string& etag
// Delete old object
rc = old_mobj->delete_mobj(dpp);
if (rc == 0) {
ldpp_dout(dpp, 20) <<__func__ << ": Old part [" << p << "] deleted succesfully" << dendl;
ldpp_dout(dpp, 20) <<__func__ << ": Old part [" << part << "] deleted succesfully" << dendl;
} else {
ldpp_dout(dpp, 0) <<__func__ << ": Failed to delete old part [" << p << "]. Error=" << rc << dendl;
ldpp_dout(dpp, 0) <<__func__ << ": Failed to delete old part [" << part << "], rc=" << rc << dendl;
return rc;
}
}

rc = store->do_idx_op_by_name(obj_part_iname, M0_IC_PUT, p, bl);
rc = store->do_idx_op_by_name(iname, M0_IC_PUT, part, bl);
if (rc < 0) {
ldpp_dout(dpp, 0) <<__func__ << ": failed to add part obj in part index, rc=" << rc << dendl;
return rc == -ENOENT ? -ERR_NO_SUCH_UPLOAD : rc;
Expand Down
2 changes: 1 addition & 1 deletion src/rgw/rgw_sal_motr.h
Original file line number Diff line number Diff line change
Expand Up @@ -563,7 +563,7 @@ class MotrObject : public Object {
uint64_t part_num;
// Object size as available from Content-Length header
uint64_t expected_obj_size = 0;
uint64_t chunk_io_sz = 0;
int64_t chunk_io_sz = 0;
// Total Number of bytes processed so far
uint64_t processed_bytes = 0;
struct AccumulateIOCtxt io_ctxt = {};
Expand Down