Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

DAOS-7136 object: merged recxs during punch migration #5300

Merged
merged 2 commits into from
Apr 9, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
293 changes: 152 additions & 141 deletions src/object/srv_obj_migrate.c
Original file line number Diff line number Diff line change
Expand Up @@ -1487,16 +1487,152 @@ migrate_one_ult(void *arg)
}
}

/* If src_iod is NULL, it will try to merge the recxs inside dst_iod */
static int
migrate_one_iod_merge_recx(daos_unit_oid_t oid, daos_iod_t *dst_iod,
daos_iod_t *src_iod);
migrate_merge_iod_recx(daos_iod_t *dst_iod, daos_iod_t *src_iod)
{
struct obj_auxi_list_recx *recx;
struct obj_auxi_list_recx *tmp;
daos_recx_t *recxs;
d_list_t merge_list;
int nr_recxs = 0;
int i;
int rc = 0;

D_INIT_LIST_HEAD(&merge_list);
if (src_iod != NULL) {
recxs = src_iod->iod_recxs;
for (i = 0; i < src_iod->iod_nr; i++) {
D_DEBUG(DB_REBUILD, "src merge "DF_U64"/"DF_U64"\n",
recxs[i].rx_idx, recxs[i].rx_nr);
rc = merge_recx(&merge_list, recxs[i].rx_idx,
recxs[i].rx_nr);
if (rc)
D_GOTO(out, rc);
}
}

D_ASSERT(dst_iod != NULL);
recxs = dst_iod->iod_recxs;
for (i = 0; i < dst_iod->iod_nr; i++) {
D_DEBUG(DB_REBUILD, "dst merge "DF_U64"/"DF_U64"\n",
recxs[i].rx_idx, recxs[i].rx_nr);
rc = merge_recx(&merge_list, recxs[i].rx_idx, recxs[i].rx_nr);
if (rc)
D_GOTO(out, rc);
}

d_list_for_each_entry(recx, &merge_list, recx_list)
nr_recxs++;

if (nr_recxs > dst_iod->iod_nr) {
D_ALLOC_ARRAY(recxs, nr_recxs);
if (recxs == NULL)
D_GOTO(out, rc = -DER_NOMEM);
} else {
recxs = dst_iod->iod_recxs;
}

i = 0;
d_list_for_each_entry_safe(recx, tmp, &merge_list, recx_list) {
recxs[i++] = recx->recx;

D_DEBUG(DB_REBUILD, "merge recx "DF_U64"/"DF_U64"\n",
recx->recx.rx_idx, recx->recx.rx_nr);
d_list_del(&recx->recx_list);
D_FREE(recx);
}

if (dst_iod->iod_recxs != recxs)
D_FREE(dst_iod->iod_recxs);

dst_iod->iod_recxs = recxs;
dst_iod->iod_nr = i;
out:
d_list_for_each_entry_safe(recx, tmp, &merge_list, recx_list) {
d_list_del(&recx->recx_list);
D_FREE(recx);
}
return rc;
}

static int
migrate_iod_sgl_add(daos_iod_t *iods, uint32_t *iods_num, daos_iod_t *new_iod,
d_sg_list_t *sgls, d_sg_list_t *new_sgl)
{
daos_iod_t *dst_iod;
int rc = 0;
int i;

for (i = 0; i < *iods_num; i++) {
if (daos_iov_cmp(&iods[i].iod_name, &new_iod->iod_name))
break;
}

/* None duplicate iods, let's create new one */
if (i == *iods_num) {
rc = daos_iod_copy(&iods[i], new_iod);
if (rc)
return rc;

if (new_sgl) {
rc = daos_sgl_alloc_copy_data(&sgls[i], new_sgl);
if (rc) {
daos_iov_free(&iods[i].iod_name);
return rc;
}
}

if (new_iod->iod_type == DAOS_IOD_SINGLE) {
iods[i].iod_recxs = NULL;
} else {
/**
* recx in iod has been reused, i.e. it will be
* freed with mrone, so let's set iod_recxs in
* iod to be NULL to avoid it is being freed
* with iod afterwards.
*/
new_iod->iod_recxs = NULL;
}
D_DEBUG(DB_REBUILD, "add new akey "DF_KEY" at %d\n",
DP_KEY(&new_iod->iod_name), i);
(*iods_num)++;
return rc;
}

/* Try to merge the iods to the existent IOD */
dst_iod = &iods[i];
if (dst_iod->iod_size != new_iod->iod_size ||
dst_iod->iod_type != new_iod->iod_type) {
D_ERROR(DF_KEY" dst_iod size "DF_U64" != "DF_U64
" dst_iod type %d != %d\n",
DP_KEY(&new_iod->iod_name), dst_iod->iod_size,
new_iod->iod_size, dst_iod->iod_type,
new_iod->iod_type);
D_GOTO(out, rc);
}

rc = migrate_merge_iod_recx(dst_iod, new_iod);
if (rc)
D_GOTO(out, rc);

if (new_sgl) {
rc = daos_sgl_merge(&sgls[i], new_sgl);
if (rc)
D_GOTO(out, rc);
}

D_DEBUG(DB_REBUILD, "Merge akey "DF_KEY" to %d\n",
DP_KEY(&new_iod->iod_name), i);
out:
return rc;
}

static int
rw_iod_pack(struct migrate_one *mrone, daos_iod_t *iod, d_sg_list_t *sgl)
{
uint64_t total_size = 0;
int rec_cnt = 0;
bool free_recxs = true;
int i;
int rc;

Expand All @@ -1509,72 +1645,24 @@ rw_iod_pack(struct migrate_one *mrone, daos_iod_t *iod, d_sg_list_t *sgl)
return -DER_NOMEM;
}

for (i = 0; i < mrone->mo_iod_num; i++) {
if (daos_iov_cmp(&mrone->mo_iods[i].iod_name,
&iod->iod_name))
break;
}

if (i < mrone->mo_iod_num) {
daos_iod_t *dst_iod = &mrone->mo_iods[i];

/*merge the iods to the existent IOD */
D_DEBUG(DB_REBUILD, "Merge akey "DF_KEY" to %d\n",
DP_KEY(&iod->iod_name), i);
if (dst_iod->iod_size != iod->iod_size ||
dst_iod->iod_type != iod->iod_type) {
D_ERROR(DF_KEY" dst_iod size "DF_U64" != "DF_U64
" dst_iod type %d != %d\n",
DP_KEY(&iod->iod_name), dst_iod->iod_size,
iod->iod_size, dst_iod->iod_type,
iod->iod_type);
D_GOTO(out, rc = -DER_INVAL);
}

rc = migrate_one_iod_merge_recx(mrone->mo_oid, dst_iod, iod);
if (rc)
D_GOTO(out, rc);

if (sgl) {
rc = daos_sgl_merge(&mrone->mo_sgls[i], sgl);
if (rc)
D_GOTO(out, rc);
}
} else {
rc = daos_iod_copy(&mrone->mo_iods[i], iod);
if (rc)
D_GOTO(out, rc);

free_recxs = false;
mrone->mo_iod_num++;
if (sgl) {
rc = daos_sgl_alloc_copy_data(&mrone->mo_sgls[i], sgl);
if (rc)
D_GOTO(out, rc);
}
}

if (iod->iod_type == DAOS_IOD_SINGLE) {
mrone->mo_iods[i].iod_recxs = NULL;
rec_cnt = 1;
total_size = iod->iod_size;
D_DEBUG(DB_REBUILD, "single recx "DF_U64"\n", total_size);
} else {
for (i = 0; i < iod->iod_nr; i++) {
D_DEBUG(DB_REBUILD, "recx "DF_U64"/"DF_U64"\n",
D_DEBUG(DB_REBUILD, "new recx "DF_U64"/"DF_U64"\n",
iod->iod_recxs[i].rx_idx,
iod->iod_recxs[i].rx_nr);
rec_cnt += iod->iod_recxs[i].rx_nr;
total_size += iod->iod_recxs[i].rx_nr * iod->iod_size;
}
}

/**
* If free_recxs is true, then the caller needs to free iod_recxs later,
* so do not set iod_recxs to NULL here, otherwise this mrone will
* reuse recxs.
*/
if (!free_recxs)
iod->iod_recxs = NULL;
rc = migrate_iod_sgl_add(mrone->mo_iods, &mrone->mo_iod_num, iod,
mrone->mo_sgls, sgl);
if (rc != 0)
D_GOTO(out, rc);

D_DEBUG(DB_REBUILD,
"idx %d akey "DF_KEY" nr %d size "DF_U64" type %d rec %d total "
Expand Down Expand Up @@ -1602,9 +1690,10 @@ punch_iod_pack(struct migrate_one *mrone, daos_iod_t *iod, daos_epoch_t eph)
return -DER_NOMEM;
}

rc = daos_iod_copy(&mrone->mo_punch_iods[idx], iod);
if (rc)
return rc;
rc = migrate_iod_sgl_add(mrone->mo_punch_iods, &mrone->mo_punch_iod_num,
iod, NULL, NULL);
if (rc != 0)
D_GOTO(out, rc);

D_DEBUG(DB_TRACE,
"idx %d akey "DF_KEY" nr %d size "DF_U64" type %d\n",
Expand All @@ -1613,83 +1702,7 @@ punch_iod_pack(struct migrate_one *mrone, daos_iod_t *iod, daos_epoch_t eph)

if (mrone->mo_rec_punch_eph < eph)
mrone->mo_rec_punch_eph = eph;

mrone->mo_punch_iod_num++;
iod->iod_recxs = NULL;
return 0;
}

static int
migrate_one_iod_merge_recx(daos_unit_oid_t oid, daos_iod_t *dst_iod,
daos_iod_t *src_iod)
{
struct obj_auxi_list_recx *recx;
struct obj_auxi_list_recx *tmp;
struct daos_oclass_attr *oca;
daos_recx_t *recxs;
d_list_t merge_list;
int nr_recxs = 0;
int i;
int rc = 0;

oca = daos_oclass_attr_find(oid.id_pub);
if (oca == NULL)
return -DER_NONEXIST;

D_INIT_LIST_HEAD(&merge_list);
if (src_iod != NULL) {
recxs = src_iod->iod_recxs;
for (i = 0; i < src_iod->iod_nr; i++) {
D_DEBUG(DB_REBUILD, "src merge "DF_U64"/"DF_U64"\n",
recxs[i].rx_idx, recxs[i].rx_nr);
rc = merge_recx(&merge_list, recxs[i].rx_idx,
recxs[i].rx_nr);
if (rc)
D_GOTO(out, rc);
}
}

D_ASSERT(dst_iod != NULL);
recxs = dst_iod->iod_recxs;
for (i = 0; i < dst_iod->iod_nr; i++) {
D_DEBUG(DB_REBUILD, "dst merge "DF_U64"/"DF_U64"\n",
recxs[i].rx_idx, recxs[i].rx_nr);
rc = merge_recx(&merge_list, recxs[i].rx_idx, recxs[i].rx_nr);
if (rc)
D_GOTO(out, rc);
}

d_list_for_each_entry(recx, &merge_list, recx_list)
nr_recxs++;

if (nr_recxs > dst_iod->iod_nr) {
D_ALLOC_ARRAY(recxs, nr_recxs);
if (recxs == NULL)
D_GOTO(out, rc = -DER_NOMEM);
} else {
recxs = dst_iod->iod_recxs;
}

i = 0;
d_list_for_each_entry_safe(recx, tmp, &merge_list, recx_list) {
recxs[i++] = recx->recx;

D_DEBUG(DB_REBUILD, "merge recx "DF_U64"/"DF_U64"\n",
recx->recx.rx_idx, recx->recx.rx_nr);
d_list_del(&recx->recx_list);
D_FREE(recx);
}

if (dst_iod->iod_recxs != recxs)
D_FREE(dst_iod->iod_recxs);

dst_iod->iod_recxs = recxs;
dst_iod->iod_nr = i;
out:
d_list_for_each_entry_safe(recx, tmp, &merge_list, recx_list) {
d_list_del(&recx->recx_list);
D_FREE(recx);
}
return rc;
}

Expand Down Expand Up @@ -1718,9 +1731,8 @@ migrate_one_merge(struct migrate_one *mo, struct dss_enum_unpack_io *io)
&io->ui_iods[i].iod_name))
continue;
if (mo->mo_iods[j].iod_type == DAOS_IOD_ARRAY) {
rc = migrate_one_iod_merge_recx(io->ui_oid,
&mo->mo_iods[j],
&io->ui_iods[i]);
rc = migrate_merge_iod_recx(&mo->mo_iods[j],
&io->ui_iods[i]);
if (rc)
D_GOTO(out, rc);

Expand Down Expand Up @@ -1905,8 +1917,7 @@ migrate_enum_unpack_cb(struct dss_enum_unpack_io *io, void *data)
* some duplicate recxs due to replication/parity
* space. let's remove them.
*/
rc = migrate_one_iod_merge_recx(io->ui_oid,
iod, NULL);
rc = migrate_merge_iod_recx(iod, NULL);
if (rc)
return rc;

Expand Down
Loading