Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

DAOS-5279 rebuild: add rebuild inflight control (#4656) #5172

Merged
merged 1 commit into from
Mar 26, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 9 additions & 0 deletions src/object/obj_internal.h
Original file line number Diff line number Diff line change
Expand Up @@ -228,6 +228,15 @@ struct migrate_pool_tls {

/* reference count for the structure */
uint64_t mpt_refcount;

/* The current inflight iod, mainly used for controlling
* rebuild inflight rate to avoid the DMA buffer overflow.
*/
uint64_t mpt_inflight_size;
uint64_t mpt_inflight_max_size;
ABT_cond mpt_inflight_cond;
ABT_mutex mpt_inflight_mutex;
int mpt_inflight_max_ult;
/* migrate leader ULT */
unsigned int mpt_ult_running:1,
/* Indicates whether objects on the migration destination should be
Expand Down
116 changes: 106 additions & 10 deletions src/object/srv_obj_migrate.c
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,14 @@
#pragma GCC diagnostic ignored "-Wframe-larger-than="
#endif

/* Max inflight data size per xstream */
/* Set the total inflight size to be 25% of MAX DMA size for
* the moment, will adjust it later if needed.
*/
#define MIGRATE_MAX_SIZE (1 << 28)
/* Max migrate ULT number on the server */
#define MIGRATE_MAX_ULT 8192

struct migrate_one {
daos_key_t mo_dkey;
uuid_t mo_pool_uuid;
Expand Down Expand Up @@ -288,6 +296,10 @@ migrate_pool_tls_destroy(struct migrate_pool_tls *tls)

if (tls->mpt_done_eventual)
ABT_eventual_free(&tls->mpt_done_eventual);
if (tls->mpt_inflight_cond)
ABT_cond_free(&tls->mpt_inflight_cond);
if (tls->mpt_inflight_mutex)
ABT_mutex_free(&tls->mpt_inflight_mutex);
if (daos_handle_is_valid(tls->mpt_root_hdl))
obj_tree_destroy(tls->mpt_root_hdl);
if (daos_handle_is_valid(tls->mpt_migrated_root_hdl))
Expand Down Expand Up @@ -369,6 +381,14 @@ migrate_pool_tls_create_one(void *data)
if (rc != ABT_SUCCESS)
D_GOTO(out, rc = dss_abterr2der(rc));

rc = ABT_cond_create(&pool_tls->mpt_inflight_cond);
if (rc != ABT_SUCCESS)
D_GOTO(out, rc = dss_abterr2der(rc));

rc = ABT_mutex_create(&pool_tls->mpt_inflight_mutex);
if (rc != ABT_SUCCESS)
D_GOTO(out, rc = dss_abterr2der(rc));

uuid_copy(pool_tls->mpt_pool_uuid, arg->pool_uuid);
uuid_copy(pool_tls->mpt_poh_uuid, arg->pool_hdl_uuid);
uuid_copy(pool_tls->mpt_coh_uuid, arg->co_hdl_uuid);
Expand All @@ -383,7 +403,9 @@ migrate_pool_tls_create_one(void *data)
pool_tls->mpt_max_eph = arg->max_eph;
pool_tls->mpt_pool = ds_pool_child_lookup(arg->pool_uuid);
pool_tls->mpt_del_local_objs = arg->del_local_objs;

pool_tls->mpt_inflight_max_size = MIGRATE_MAX_SIZE;
pool_tls->mpt_inflight_max_ult = MIGRATE_MAX_ULT;
pool_tls->mpt_inflight_size = 0;
pool_tls->mpt_refcount = 1;
rc = daos_rank_list_copy(&pool_tls->mpt_svc_list, arg->svc_list);
if (rc)
Expand Down Expand Up @@ -1281,12 +1303,12 @@ migrate_punch(struct migrate_pool_tls *tls, struct migrate_one *mrone,
}

static int
migrate_dkey(struct migrate_pool_tls *tls, struct migrate_one *mrone)
migrate_dkey(struct migrate_pool_tls *tls, struct migrate_one *mrone,
daos_size_t data_size)
{
struct ds_cont_child *cont;
daos_handle_t coh = DAOS_HDL_INVAL;
daos_handle_t oh;
daos_size_t data_size;
int rc;

if (daos_handle_is_inval(tls->mpt_pool_hdl)) {
Expand Down Expand Up @@ -1330,10 +1352,8 @@ migrate_dkey(struct migrate_pool_tls *tls, struct migrate_one *mrone)
if (rc)
D_GOTO(obj_close, rc);

data_size = daos_iods_len(mrone->mo_iods, mrone->mo_iod_num);
D_DEBUG(DB_TRACE, "data size is "DF_U64"\n", data_size);
if (data_size == 0) {
D_DEBUG(DB_REBUILD, "skip empty iod\n");
D_DEBUG(DB_REBUILD, "empty mrone %p\n", mrone);
D_GOTO(obj_close, rc);
}

Expand Down Expand Up @@ -1391,7 +1411,8 @@ migrate_one_ult(void *arg)
{
struct migrate_one *mrone = arg;
struct migrate_pool_tls *tls;
int rc;
daos_size_t data_size;
int rc = 0;

if (daos_fail_check(DAOS_REBUILD_TGT_REBUILD_HANG))
dss_sleep(daos_fail_value_get() * 1000000);
Expand All @@ -1404,9 +1425,38 @@ migrate_one_ult(void *arg)
goto out;
}

rc = migrate_dkey(tls, mrone);
D_DEBUG(DB_REBUILD, DF_UOID" migrate dkey "DF_KEY" rc %d\n",
DP_UOID(mrone->mo_oid), DP_KEY(&mrone->mo_dkey), rc);
data_size = daos_iods_len(mrone->mo_iods, mrone->mo_iod_num);
D_DEBUG(DB_TRACE, "mrone %p data size is "DF_U64"\n",
mrone, data_size);
D_ASSERT(data_size != (daos_size_t)-1);
D_DEBUG(DB_REBUILD, "mrone %p inflight size "DF_U64" max "DF_U64"\n",
mrone, tls->mpt_inflight_size, tls->mpt_inflight_max_size);

while (tls->mpt_inflight_size + data_size >=
tls->mpt_inflight_max_size && tls->mpt_inflight_max_size != 0
&& !tls->mpt_fini) {
D_DEBUG(DB_REBUILD, "mrone %p wait "DF_U64"/"DF_U64"\n",
mrone, tls->mpt_inflight_size,
tls->mpt_inflight_max_size);
ABT_mutex_lock(tls->mpt_inflight_mutex);
ABT_cond_wait(tls->mpt_inflight_cond, tls->mpt_inflight_mutex);
ABT_mutex_unlock(tls->mpt_inflight_mutex);
}

if (tls->mpt_fini)
D_GOTO(out, rc);

tls->mpt_inflight_size += data_size;
rc = migrate_dkey(tls, mrone, data_size);
tls->mpt_inflight_size -= data_size;

ABT_mutex_lock(tls->mpt_inflight_mutex);
ABT_cond_broadcast(tls->mpt_inflight_cond);
ABT_mutex_unlock(tls->mpt_inflight_mutex);

D_DEBUG(DB_REBUILD, DF_UOID" migrate dkey "DF_KEY" inflight "DF_U64
" rc %d\n", DP_UOID(mrone->mo_oid), DP_KEY(&mrone->mo_dkey),
tls->mpt_inflight_size, rc);

/* Ignore nonexistent error because puller could race
* with user's container destroy:
Expand Down Expand Up @@ -2127,6 +2177,10 @@ ds_migrate_fini_one(uuid_t pool_uuid, uint32_t ver)
return;

tls->mpt_fini = 1;

ABT_mutex_lock(tls->mpt_inflight_mutex);
ABT_cond_broadcast(tls->mpt_inflight_cond);
ABT_mutex_unlock(tls->mpt_inflight_mutex);
migrate_pool_tls_put(tls); /* lookup */
migrate_pool_tls_put(tls); /* destroy */
}
Expand All @@ -2149,6 +2203,10 @@ migrate_fini_one_ult(void *data)
D_ASSERT(tls->mpt_refcount > 1);
tls->mpt_fini = 1;

ABT_mutex_lock(tls->mpt_inflight_mutex);
ABT_cond_broadcast(tls->mpt_inflight_cond);
ABT_mutex_unlock(tls->mpt_inflight_mutex);

ABT_eventual_wait(tls->mpt_done_eventual, NULL);
migrate_pool_tls_put(tls); /* destroy */

Expand Down Expand Up @@ -2516,6 +2574,37 @@ migrate_cont_iter_cb(daos_handle_t ih, d_iov_t *key_iov,
arg.pool_tls = tls;
uuid_copy(arg.cont_uuid, cont_uuid);
while (!dbtree_is_empty(root->root_hdl)) {
uint64_t ult_cnt;

D_ASSERT(tls->mpt_obj_generated_ult >=
tls->mpt_obj_executed_ult);
D_ASSERT(tls->mpt_generated_ult >= tls->mpt_executed_ult);

ult_cnt = max(tls->mpt_obj_generated_ult -
tls->mpt_obj_executed_ult,
tls->mpt_generated_ult -
tls->mpt_executed_ult);

while (ult_cnt >= tls->mpt_inflight_max_ult && !tls->mpt_fini) {
ABT_mutex_lock(tls->mpt_inflight_mutex);
ABT_cond_wait(tls->mpt_inflight_cond,
tls->mpt_inflight_mutex);
ABT_mutex_unlock(tls->mpt_inflight_mutex);
ult_cnt = max(tls->mpt_obj_generated_ult -
tls->mpt_obj_executed_ult,
tls->mpt_generated_ult -
tls->mpt_executed_ult);
D_DEBUG(DB_REBUILD, "obj "DF_U64"/"DF_U64", key"
DF_U64"/"DF_U64" "DF_U64"\n",
tls->mpt_obj_generated_ult,
tls->mpt_obj_executed_ult,
tls->mpt_generated_ult,
tls->mpt_executed_ult, ult_cnt);
}

if (tls->mpt_fini)
break;

rc = dbtree_iterate(root->root_hdl, DAOS_INTENT_MIGRATION,
false, migrate_obj_iter_cb, &arg);
if (rc || tls->mpt_fini)
Expand Down Expand Up @@ -2865,13 +2954,20 @@ ds_migrate_query_status(uuid_t pool_uuid, uint32_t ver,
* do collective on 0 xstream
**/
arg.obj_generated_ult += tls->mpt_obj_generated_ult;
tls->mpt_obj_executed_ult = arg.obj_executed_ult;
tls->mpt_generated_ult = arg.generated_ult;
tls->mpt_executed_ult = arg.executed_ult;
*dms = arg.dms;
if (arg.obj_generated_ult > arg.obj_executed_ult ||
arg.generated_ult > arg.executed_ult || tls->mpt_ult_running)
dms->dm_migrating = 1;
else
dms->dm_migrating = 0;

ABT_mutex_lock(tls->mpt_inflight_mutex);
ABT_cond_broadcast(tls->mpt_inflight_cond);
ABT_mutex_unlock(tls->mpt_inflight_mutex);

D_DEBUG(DB_REBUILD, "pool "DF_UUID" migrating=%s,"
" obj_count="DF_U64", rec_count="DF_U64
" size="DF_U64" obj %u/%u general %u/%u status %d\n",
Expand Down