Skip to content

Commit

Permalink
DAOS-7136 object: merged recxs during punch migration (#5300)
Browse files Browse the repository at this point in the history
Merged punched recxs under same akey to avoid the
failure of punched recxs migration.

Signed-off-by: Di Wang <di.wang@intel.com>
  • Loading branch information
wangdi committed Apr 9, 2021
1 parent 623955b commit 4b10d24
Show file tree
Hide file tree
Showing 2 changed files with 193 additions and 141 deletions.
293 changes: 152 additions & 141 deletions src/object/srv_obj_migrate.c
Original file line number Diff line number Diff line change
Expand Up @@ -1487,16 +1487,152 @@ migrate_one_ult(void *arg)
}
}

/* If src_iod is NULL, it will try to merge the recxs inside dst_iod */
static int
migrate_one_iod_merge_recx(daos_unit_oid_t oid, daos_iod_t *dst_iod,
daos_iod_t *src_iod);
migrate_merge_iod_recx(daos_iod_t *dst_iod, daos_iod_t *src_iod)
{
struct obj_auxi_list_recx *recx;
struct obj_auxi_list_recx *tmp;
daos_recx_t *recxs;
d_list_t merge_list;
int nr_recxs = 0;
int i;
int rc = 0;

D_INIT_LIST_HEAD(&merge_list);
if (src_iod != NULL) {
recxs = src_iod->iod_recxs;
for (i = 0; i < src_iod->iod_nr; i++) {
D_DEBUG(DB_REBUILD, "src merge "DF_U64"/"DF_U64"\n",
recxs[i].rx_idx, recxs[i].rx_nr);
rc = merge_recx(&merge_list, recxs[i].rx_idx,
recxs[i].rx_nr);
if (rc)
D_GOTO(out, rc);
}
}

D_ASSERT(dst_iod != NULL);
recxs = dst_iod->iod_recxs;
for (i = 0; i < dst_iod->iod_nr; i++) {
D_DEBUG(DB_REBUILD, "dst merge "DF_U64"/"DF_U64"\n",
recxs[i].rx_idx, recxs[i].rx_nr);
rc = merge_recx(&merge_list, recxs[i].rx_idx, recxs[i].rx_nr);
if (rc)
D_GOTO(out, rc);
}

d_list_for_each_entry(recx, &merge_list, recx_list)
nr_recxs++;

if (nr_recxs > dst_iod->iod_nr) {
D_ALLOC_ARRAY(recxs, nr_recxs);
if (recxs == NULL)
D_GOTO(out, rc = -DER_NOMEM);
} else {
recxs = dst_iod->iod_recxs;
}

i = 0;
d_list_for_each_entry_safe(recx, tmp, &merge_list, recx_list) {
recxs[i++] = recx->recx;

D_DEBUG(DB_REBUILD, "merge recx "DF_U64"/"DF_U64"\n",
recx->recx.rx_idx, recx->recx.rx_nr);
d_list_del(&recx->recx_list);
D_FREE(recx);
}

if (dst_iod->iod_recxs != recxs)
D_FREE(dst_iod->iod_recxs);

dst_iod->iod_recxs = recxs;
dst_iod->iod_nr = i;
out:
d_list_for_each_entry_safe(recx, tmp, &merge_list, recx_list) {
d_list_del(&recx->recx_list);
D_FREE(recx);
}
return rc;
}

static int
migrate_iod_sgl_add(daos_iod_t *iods, uint32_t *iods_num, daos_iod_t *new_iod,
d_sg_list_t *sgls, d_sg_list_t *new_sgl)
{
daos_iod_t *dst_iod;
int rc = 0;
int i;

for (i = 0; i < *iods_num; i++) {
if (daos_iov_cmp(&iods[i].iod_name, &new_iod->iod_name))
break;
}

/* None duplicate iods, let's create new one */
if (i == *iods_num) {
rc = daos_iod_copy(&iods[i], new_iod);
if (rc)
return rc;

if (new_sgl) {
rc = daos_sgl_alloc_copy_data(&sgls[i], new_sgl);
if (rc) {
daos_iov_free(&iods[i].iod_name);
return rc;
}
}

if (new_iod->iod_type == DAOS_IOD_SINGLE) {
iods[i].iod_recxs = NULL;
} else {
/**
* recx in iod has been reused, i.e. it will be
* freed with mrone, so let's set iod_recxs in
* iod to be NULL to avoid it is being freed
* with iod afterwards.
*/
new_iod->iod_recxs = NULL;
}
D_DEBUG(DB_REBUILD, "add new akey "DF_KEY" at %d\n",
DP_KEY(&new_iod->iod_name), i);
(*iods_num)++;
return rc;
}

/* Try to merge the iods to the existent IOD */
dst_iod = &iods[i];
if (dst_iod->iod_size != new_iod->iod_size ||
dst_iod->iod_type != new_iod->iod_type) {
D_ERROR(DF_KEY" dst_iod size "DF_U64" != "DF_U64
" dst_iod type %d != %d\n",
DP_KEY(&new_iod->iod_name), dst_iod->iod_size,
new_iod->iod_size, dst_iod->iod_type,
new_iod->iod_type);
D_GOTO(out, rc);
}

rc = migrate_merge_iod_recx(dst_iod, new_iod);
if (rc)
D_GOTO(out, rc);

if (new_sgl) {
rc = daos_sgl_merge(&sgls[i], new_sgl);
if (rc)
D_GOTO(out, rc);
}

D_DEBUG(DB_REBUILD, "Merge akey "DF_KEY" to %d\n",
DP_KEY(&new_iod->iod_name), i);
out:
return rc;
}

static int
rw_iod_pack(struct migrate_one *mrone, daos_iod_t *iod, d_sg_list_t *sgl)
{
uint64_t total_size = 0;
int rec_cnt = 0;
bool free_recxs = true;
int i;
int rc;

Expand All @@ -1509,72 +1645,24 @@ rw_iod_pack(struct migrate_one *mrone, daos_iod_t *iod, d_sg_list_t *sgl)
return -DER_NOMEM;
}

for (i = 0; i < mrone->mo_iod_num; i++) {
if (daos_iov_cmp(&mrone->mo_iods[i].iod_name,
&iod->iod_name))
break;
}

if (i < mrone->mo_iod_num) {
daos_iod_t *dst_iod = &mrone->mo_iods[i];

/*merge the iods to the existent IOD */
D_DEBUG(DB_REBUILD, "Merge akey "DF_KEY" to %d\n",
DP_KEY(&iod->iod_name), i);
if (dst_iod->iod_size != iod->iod_size ||
dst_iod->iod_type != iod->iod_type) {
D_ERROR(DF_KEY" dst_iod size "DF_U64" != "DF_U64
" dst_iod type %d != %d\n",
DP_KEY(&iod->iod_name), dst_iod->iod_size,
iod->iod_size, dst_iod->iod_type,
iod->iod_type);
D_GOTO(out, rc = -DER_INVAL);
}

rc = migrate_one_iod_merge_recx(mrone->mo_oid, dst_iod, iod);
if (rc)
D_GOTO(out, rc);

if (sgl) {
rc = daos_sgl_merge(&mrone->mo_sgls[i], sgl);
if (rc)
D_GOTO(out, rc);
}
} else {
rc = daos_iod_copy(&mrone->mo_iods[i], iod);
if (rc)
D_GOTO(out, rc);

free_recxs = false;
mrone->mo_iod_num++;
if (sgl) {
rc = daos_sgl_alloc_copy_data(&mrone->mo_sgls[i], sgl);
if (rc)
D_GOTO(out, rc);
}
}

if (iod->iod_type == DAOS_IOD_SINGLE) {
mrone->mo_iods[i].iod_recxs = NULL;
rec_cnt = 1;
total_size = iod->iod_size;
D_DEBUG(DB_REBUILD, "single recx "DF_U64"\n", total_size);
} else {
for (i = 0; i < iod->iod_nr; i++) {
D_DEBUG(DB_REBUILD, "recx "DF_U64"/"DF_U64"\n",
D_DEBUG(DB_REBUILD, "new recx "DF_U64"/"DF_U64"\n",
iod->iod_recxs[i].rx_idx,
iod->iod_recxs[i].rx_nr);
rec_cnt += iod->iod_recxs[i].rx_nr;
total_size += iod->iod_recxs[i].rx_nr * iod->iod_size;
}
}

/**
* If free_recxs is true, then the caller needs to free iod_recxs later,
* so do not set iod_recxs to NULL here, otherwise this mrone will
* reuse recxs.
*/
if (!free_recxs)
iod->iod_recxs = NULL;
rc = migrate_iod_sgl_add(mrone->mo_iods, &mrone->mo_iod_num, iod,
mrone->mo_sgls, sgl);
if (rc != 0)
D_GOTO(out, rc);

D_DEBUG(DB_REBUILD,
"idx %d akey "DF_KEY" nr %d size "DF_U64" type %d rec %d total "
Expand Down Expand Up @@ -1602,9 +1690,10 @@ punch_iod_pack(struct migrate_one *mrone, daos_iod_t *iod, daos_epoch_t eph)
return -DER_NOMEM;
}

rc = daos_iod_copy(&mrone->mo_punch_iods[idx], iod);
if (rc)
return rc;
rc = migrate_iod_sgl_add(mrone->mo_punch_iods, &mrone->mo_punch_iod_num,
iod, NULL, NULL);
if (rc != 0)
D_GOTO(out, rc);

D_DEBUG(DB_TRACE,
"idx %d akey "DF_KEY" nr %d size "DF_U64" type %d\n",
Expand All @@ -1613,83 +1702,7 @@ punch_iod_pack(struct migrate_one *mrone, daos_iod_t *iod, daos_epoch_t eph)

if (mrone->mo_rec_punch_eph < eph)
mrone->mo_rec_punch_eph = eph;

mrone->mo_punch_iod_num++;
iod->iod_recxs = NULL;
return 0;
}

static int
migrate_one_iod_merge_recx(daos_unit_oid_t oid, daos_iod_t *dst_iod,
daos_iod_t *src_iod)
{
struct obj_auxi_list_recx *recx;
struct obj_auxi_list_recx *tmp;
struct daos_oclass_attr *oca;
daos_recx_t *recxs;
d_list_t merge_list;
int nr_recxs = 0;
int i;
int rc = 0;

oca = daos_oclass_attr_find(oid.id_pub);
if (oca == NULL)
return -DER_NONEXIST;

D_INIT_LIST_HEAD(&merge_list);
if (src_iod != NULL) {
recxs = src_iod->iod_recxs;
for (i = 0; i < src_iod->iod_nr; i++) {
D_DEBUG(DB_REBUILD, "src merge "DF_U64"/"DF_U64"\n",
recxs[i].rx_idx, recxs[i].rx_nr);
rc = merge_recx(&merge_list, recxs[i].rx_idx,
recxs[i].rx_nr);
if (rc)
D_GOTO(out, rc);
}
}

D_ASSERT(dst_iod != NULL);
recxs = dst_iod->iod_recxs;
for (i = 0; i < dst_iod->iod_nr; i++) {
D_DEBUG(DB_REBUILD, "dst merge "DF_U64"/"DF_U64"\n",
recxs[i].rx_idx, recxs[i].rx_nr);
rc = merge_recx(&merge_list, recxs[i].rx_idx, recxs[i].rx_nr);
if (rc)
D_GOTO(out, rc);
}

d_list_for_each_entry(recx, &merge_list, recx_list)
nr_recxs++;

if (nr_recxs > dst_iod->iod_nr) {
D_ALLOC_ARRAY(recxs, nr_recxs);
if (recxs == NULL)
D_GOTO(out, rc = -DER_NOMEM);
} else {
recxs = dst_iod->iod_recxs;
}

i = 0;
d_list_for_each_entry_safe(recx, tmp, &merge_list, recx_list) {
recxs[i++] = recx->recx;

D_DEBUG(DB_REBUILD, "merge recx "DF_U64"/"DF_U64"\n",
recx->recx.rx_idx, recx->recx.rx_nr);
d_list_del(&recx->recx_list);
D_FREE(recx);
}

if (dst_iod->iod_recxs != recxs)
D_FREE(dst_iod->iod_recxs);

dst_iod->iod_recxs = recxs;
dst_iod->iod_nr = i;
out:
d_list_for_each_entry_safe(recx, tmp, &merge_list, recx_list) {
d_list_del(&recx->recx_list);
D_FREE(recx);
}
return rc;
}

Expand Down Expand Up @@ -1718,9 +1731,8 @@ migrate_one_merge(struct migrate_one *mo, struct dss_enum_unpack_io *io)
&io->ui_iods[i].iod_name))
continue;
if (mo->mo_iods[j].iod_type == DAOS_IOD_ARRAY) {
rc = migrate_one_iod_merge_recx(io->ui_oid,
&mo->mo_iods[j],
&io->ui_iods[i]);
rc = migrate_merge_iod_recx(&mo->mo_iods[j],
&io->ui_iods[i]);
if (rc)
D_GOTO(out, rc);

Expand Down Expand Up @@ -1905,8 +1917,7 @@ migrate_enum_unpack_cb(struct dss_enum_unpack_io *io, void *data)
* some duplicate recxs due to replication/parity
* space. let's remove them.
*/
rc = migrate_one_iod_merge_recx(io->ui_oid,
iod, NULL);
rc = migrate_merge_iod_recx(iod, NULL);
if (rc)
return rc;

Expand Down
Loading

0 comments on commit 4b10d24

Please sign in to comment.