Skip to content

Commit

Permalink
DAOS-6967 vos: Data loss on updates after uncommitted punch (#4899)
Browse files Browse the repository at this point in the history
The main issue we are trying to prevent is removal of a subtree that isn't empty.
This typically means that we have a bug in aggregation or in the incarnation log

This patch does the following:

Modifies the btree code so that it returns aborted entries during aggregation.
Modify ilog_aggregate so it takes a minor epoch into account. Previously, if we had a punch of a parent, it would remove the child at the same major epoch even if it had updates afterward.
Modify the incarnation log to ensure we take the punch of a parent into account when doing an update
4.Modify aggregation so that we skip aggregation of parent objects if a child iterator hits -DER_TX_BUSY. This avoids removing orphaned subtrees.
Aggregation of the incarnation log was returning -DER_TX_BUSY for entries that are after the aggregated range. Moved the check to after the range check.
Modify evtree aggregation to restart the current tree in case of removing an aborted entry rather than returning -DER_TX_BUSY which would cause upper layer to abort too.
Fixed a few other issues with aggregation.

Signed-off-by: Jeff Olivier <jeffrey.v.olivier@intel.com>
  • Loading branch information
jolivier23 committed Mar 17, 2021
1 parent c4fc2df commit 4ac0f3e
Show file tree
Hide file tree
Showing 12 changed files with 287 additions and 68 deletions.
2 changes: 2 additions & 0 deletions src/common/btree.c
Original file line number Diff line number Diff line change
Expand Up @@ -957,6 +957,8 @@ btr_check_availability(struct btr_context *tcx, struct btr_check_alb *alb)
* for the case in the new aggregation logic.
* But before that, just make it fall through.
*/
case ALB_AVAILABLE_ABORTED:
/** NB: Entry is aborted flag set and we can purge it */
case ALB_AVAILABLE_CLEAN:
return PROBE_RC_OK;
case ALB_UNAVAILABLE:
Expand Down
2 changes: 2 additions & 0 deletions src/include/daos_srv/vos_types.h
Original file line number Diff line number Diff line change
Expand Up @@ -434,6 +434,8 @@ enum {
VOS_ITER_CB_SKIP = (1UL << 2),
/** Abort current level iteration */
VOS_ITER_CB_ABORT = (1UL << 3),
/** Abort the current level iterator and restart */
VOS_ITER_CB_RESTART = (1UL << 4),
};

/**
Expand Down
50 changes: 37 additions & 13 deletions src/vos/ilog.c
Original file line number Diff line number Diff line change
Expand Up @@ -1465,6 +1465,7 @@ struct agg_arg {
const struct ilog_entry *aa_prior_punch;
daos_epoch_t aa_punched;
bool aa_discard;
uint16_t aa_punched_minor;
};

enum {
Expand All @@ -1475,26 +1476,47 @@ enum {
AGG_RC_ABORT,
};

static bool
entry_punched(const struct ilog_entry *entry, const struct agg_arg *agg_arg)
{
uint16_t minor_epc = MAX(entry->ie_id.id_punch_minor_eph,
entry->ie_id.id_update_minor_eph);

if (entry->ie_id.id_epoch > agg_arg->aa_punched)
return false;

if (entry->ie_id.id_epoch < agg_arg->aa_punched)
return true;

return minor_epc <= agg_arg->aa_punched_minor;

}

static int
check_agg_entry(const struct ilog_entry *entry, struct agg_arg *agg_arg)
{
int rc;
int rc;
bool parent_punched = false;
uint16_t minor_epc = MAX(entry->ie_id.id_punch_minor_eph,
entry->ie_id.id_update_minor_eph);

D_DEBUG(DB_TRACE, "Entry "DF_X64" punch=%s prev="DF_X64
D_DEBUG(DB_TRACE, "Entry "DF_X64".%d punch=%s prev="DF_X64
" prior_punch="DF_X64"\n", entry->ie_id.id_epoch,
ilog_is_punch(entry) ? "yes" : "no",
minor_epc, ilog_is_punch(entry) ? "yes" : "no",
agg_arg->aa_prev ? agg_arg->aa_prev->ie_id.id_epoch : 0,
agg_arg->aa_prior_punch ?
agg_arg->aa_prior_punch->ie_id.id_epoch : 0);

if (entry->ie_id.id_epoch > agg_arg->aa_epr->epr_hi)
D_GOTO(done, rc = AGG_RC_DONE);

/* Abort ilog aggregation on hitting any uncommitted entry */
if (entry->ie_status == ILOG_UNCOMMITTED)
D_GOTO(done, rc = AGG_RC_ABORT);

if (entry->ie_id.id_epoch > agg_arg->aa_epr->epr_hi)
D_GOTO(done, rc = AGG_RC_DONE);
parent_punched = entry_punched(entry, agg_arg);
if (entry->ie_id.id_epoch < agg_arg->aa_epr->epr_lo) {
if (entry->ie_id.id_epoch <= agg_arg->aa_punched) {
if (parent_punched) {
/* Skip entries outside of the range and
* punched by the parent
*/
Expand All @@ -1514,7 +1536,7 @@ check_agg_entry(const struct ilog_entry *entry, struct agg_arg *agg_arg)
D_ASSERT(entry->ie_status != ILOG_UNCOMMITTED);

if (agg_arg->aa_discard || entry->ie_status == ILOG_REMOVED ||
agg_arg->aa_punched >= entry->ie_id.id_epoch) {
parent_punched) {
/* Remove stale entry or punched entry */
D_GOTO(done, rc = AGG_RC_REMOVE);
}
Expand All @@ -1525,7 +1547,7 @@ check_agg_entry(const struct ilog_entry *entry, struct agg_arg *agg_arg)

if (!punch) {
/* punched by outer level */
punch = prev->ie_id.id_epoch <= agg_arg->aa_punched;
punch = entry_punched(prev, agg_arg);
}
if (ilog_is_punch(entry) == punch) {
/* Remove redundant entry */
Expand Down Expand Up @@ -1560,7 +1582,8 @@ check_agg_entry(const struct ilog_entry *entry, struct agg_arg *agg_arg)
int
ilog_aggregate(struct umem_instance *umm, struct ilog_df *ilog,
const struct ilog_desc_cbs *cbs, const daos_epoch_range_t *epr,
bool discard, daos_epoch_t punched, struct ilog_entries *entries)
bool discard, daos_epoch_t punched_major, uint16_t punched_minor,
struct ilog_entries *entries)
{
struct ilog_priv *priv = ilog_ent2priv(entries);
struct ilog_context *lctx;
Expand All @@ -1576,11 +1599,11 @@ ilog_aggregate(struct umem_instance *umm, struct ilog_df *ilog,
daos_handle_t toh = DAOS_HDL_INVAL;

D_ASSERT(epr != NULL);
D_ASSERT(punched <= epr->epr_hi);
D_ASSERT(punched_major <= epr->epr_hi);

D_DEBUG(DB_TRACE, "%s incarnation log: epr: "DF_X64"-"DF_X64" punched="
DF_X64"\n", discard ? "Discard" : "Aggregate", epr->epr_lo,
epr->epr_hi, punched);
DF_X64".%d\n", discard ? "Discard" : "Aggregate", epr->epr_lo,
epr->epr_hi, punched_major, punched_minor);

/* This can potentially be optimized but using ilog_fetch gets some code
* reuse.
Expand All @@ -1603,7 +1626,8 @@ ilog_aggregate(struct umem_instance *umm, struct ilog_df *ilog,
agg_arg.aa_epr = epr;
agg_arg.aa_prev = NULL;
agg_arg.aa_prior_punch = NULL;
agg_arg.aa_punched = punched;
agg_arg.aa_punched = punched_major;
agg_arg.aa_punched_minor = punched_minor;
agg_arg.aa_discard = discard;

if (root->lr_tree.it_embedded) {
Expand Down
5 changes: 3 additions & 2 deletions src/vos/ilog.h
Original file line number Diff line number Diff line change
Expand Up @@ -190,7 +190,8 @@ struct ilog_entries {
* that are provably not needed. If discard is
* set, it will remove everything in the epoch
* range.
* \param punched[in] Max punch of parent incarnation log
* \param punch_major[in] Max major epoch punch of parent incarnation log
* \param punch_major[in] Max minor epoch punch of parent incarnation log
* \param entries[in] Used for efficiency since aggregation is used
* by vos_iterator
*
Expand All @@ -201,7 +202,7 @@ struct ilog_entries {
int
ilog_aggregate(struct umem_instance *umm, struct ilog_df *root,
const struct ilog_desc_cbs *cbs, const daos_epoch_range_t *epr,
bool discard, daos_epoch_t punched,
bool discard, daos_epoch_t punch_major, uint16_t punch_minor,
struct ilog_entries *entries);

/** Initialize an ilog_entries struct for fetch
Expand Down
15 changes: 9 additions & 6 deletions src/vos/tests/vts_ilog.c
Original file line number Diff line number Diff line change
Expand Up @@ -871,7 +871,7 @@ ilog_test_aggregate(void **state)
commit_all();
epr.epr_lo = 2;
epr.epr_hi = 4;
rc = ilog_aggregate(umm, ilog, &ilog_callbacks, &epr, false, 0,
rc = ilog_aggregate(umm, ilog, &ilog_callbacks, &epr, false, 0, 0,
&ilents);
LOG_FAIL(rc, 0, "Failed to aggregate ilog\n");
version_cache_fetch(&version_cache, loh, true);
Expand All @@ -895,7 +895,7 @@ ilog_test_aggregate(void **state)

epr.epr_lo = 0;
epr.epr_hi = 6;
rc = ilog_aggregate(umm, ilog, &ilog_callbacks, &epr, false, 0,
rc = ilog_aggregate(umm, ilog, &ilog_callbacks, &epr, false, 0, 0,
&ilents);
LOG_FAIL(rc, 0, "Failed to aggregate ilog\n");
version_cache_fetch(&version_cache, loh, true);
Expand All @@ -910,7 +910,7 @@ ilog_test_aggregate(void **state)
version_cache_fetch(&version_cache, loh, true);
commit_all();
epr.epr_hi = 7;
rc = ilog_aggregate(umm, ilog, &ilog_callbacks, &epr, false, 0,
rc = ilog_aggregate(umm, ilog, &ilog_callbacks, &epr, false, 0, 0,
&ilents);
/* 1 means empty */
LOG_FAIL(rc, 1, "Failed to aggregate log entry\n");
Expand Down Expand Up @@ -987,7 +987,8 @@ ilog_test_discard(void **state)
commit_all();
epr.epr_lo = 2;
epr.epr_hi = 4;
rc = ilog_aggregate(umm, ilog, &ilog_callbacks, &epr, true, 0, &ilents);
rc = ilog_aggregate(umm, ilog, &ilog_callbacks, &epr, true, 0, 0,
&ilents);
LOG_FAIL(rc, 0, "Failed to aggregate ilog\n");
version_cache_fetch(&version_cache, loh, true);

Expand All @@ -1009,7 +1010,8 @@ ilog_test_discard(void **state)
commit_all();
epr.epr_lo = 0;
epr.epr_hi = 6;
rc = ilog_aggregate(umm, ilog, &ilog_callbacks, &epr, true, 0, &ilents);
rc = ilog_aggregate(umm, ilog, &ilog_callbacks, &epr, true, 0, 0,
&ilents);
/* 1 means empty */
LOG_FAIL(rc, 1, "Failed to aggregate ilog\n");
version_cache_fetch(&version_cache, loh, true);
Expand All @@ -1026,7 +1028,8 @@ ilog_test_discard(void **state)
commit_all();

epr.epr_hi = 7;
rc = ilog_aggregate(umm, ilog, &ilog_callbacks, &epr, true, 0, &ilents);
rc = ilog_aggregate(umm, ilog, &ilog_callbacks, &epr, true, 0, 0,
&ilents);
/* 1 means empty */
LOG_FAIL(rc, 1, "Failed to aggregate ilog\n");
version_cache_fetch(&version_cache, loh, true);
Expand Down
113 changes: 113 additions & 0 deletions src/vos/tests/vts_pm.c
Original file line number Diff line number Diff line change
Expand Up @@ -2334,6 +2334,118 @@ many_tx(void **state)
start_epoch = epoch + 1;
}

static struct dtx_id
execute_op(daos_handle_t coh, daos_unit_oid_t oid, daos_epoch_t epoch,
daos_key_t *dkey, daos_key_t *akey, d_sg_list_t *sgl,
char *buf, int len, bool commit, int op)
{
struct vos_ioreq req = {0};
daos_iod_t iod = {0};
int rc;

vts_dtx_begin(&oid, coh, epoch, 0, &req.dth);

req.oid = oid;
req.coh = coh;
req.xid = req.dth->dth_xid;
req.flags = 0;
req.dkey = dkey;
req.akey = akey;
if (akey)
req.akey_nr = 1;

if (op <= TX_OP_PUNCH_AKEY) {
do_punch(&req);
goto do_commit;
}

iod.iod_type = DAOS_IOD_SINGLE;
iod.iod_recxs = NULL;
iod.iod_nr = 1;
req.akey = NULL;
req.iod = &iod;
iod.iod_name = *akey;
iod.iod_size = len;
d_iov_set(&sgl->sg_iovs[0], (void *)buf, iod.iod_size);
sgl->sg_nr = 1;
sgl->sg_nr_out = 0;
req.sgl = sgl;
req.fetch_sgl = sgl;
do_io(&req, op);
do_commit:
vts_dtx_end(req.dth);
if (commit && req.commit) {
rc = vos_dtx_commit(coh, &req.xid, 1, NULL);
assert_rc_equal(rc, 1);
}

return req.xid;
}


static void
uncommitted_parent(void **state)
{
struct io_test_args *arg = *state;
int rc = 0;
daos_key_t dkey;
daos_key_t akey[2];
daos_iod_t iod;
d_sg_list_t sgl;
char buf[32];
daos_epoch_t epoch = start_epoch;
daos_handle_t coh;
char *first = "Hello";
char dkey_buf[UPDATE_DKEY_SIZE];
char akey_buf[2][UPDATE_AKEY_SIZE];
daos_unit_oid_t oid;
struct dtx_id xid;

test_args_reset(arg, VPOOL_SIZE);
coh = arg->ctx.tc_co_hdl;

memset(&iod, 0, sizeof(iod));

rc = d_sgl_init(&sgl, 1);
assert_rc_equal(rc, 0);

/* Set up dkey and akey */
oid = gen_oid(arg->ofeat);
vts_key_gen(&dkey_buf[0], arg->dkey_size, true, arg);
set_iov(&dkey, &dkey_buf[0], arg->ofeat & DAOS_OF_DKEY_UINT64);
vts_key_gen(&akey_buf[0][0], arg->akey_size, true, arg);
set_iov(&akey[0], &akey_buf[0][0], arg->ofeat & DAOS_OF_AKEY_UINT64);
vts_key_gen(&akey_buf[1][0], arg->akey_size, true, arg);
set_iov(&akey[1], &akey_buf[1][0], arg->ofeat & DAOS_OF_AKEY_UINT64);

execute_op(coh, oid, epoch, &dkey, &akey[0], &sgl, first, 5, true,
TX_OP_UPDATE1);
epoch += 10;
xid = execute_op(coh, oid, epoch, NULL, NULL, NULL, NULL, 0, false,
TX_OP_PUNCH_OBJ);
epoch += 10;
execute_op(coh, oid, epoch, &dkey, &akey[1], &sgl, first, 5, true,
TX_OP_UPDATE1);
/** Commit the punch */
rc = vos_dtx_commit(coh, &xid, 1, NULL);
assert_rc_equal(rc, 1);

memset(buf, 'x', sizeof(buf));
epoch += 10;
execute_op(coh, oid, epoch, &dkey, &akey[0], &sgl, buf, 5, true,
TX_OP_FETCH1);
assert_memory_equal(buf, "xxxxx", 5);

memset(buf, 'x', sizeof(buf));
epoch += 10;
execute_op(coh, oid, epoch, &dkey, &akey[1], &sgl, buf, 5, true,
TX_OP_FETCH1);
assert_memory_equal(buf, first, 5);

d_sgl_fini(&sgl, false);
start_epoch = epoch + 1;
}

static void
test_multiple_key_conditionals_common(void **state, bool with_dtx)
{
Expand Down Expand Up @@ -2572,6 +2684,7 @@ static const struct CMUnitTest punch_model_tests_pmdk[] = {
{ "VOS864: Multikey conditionals with tx",
test_multiple_key_conditionals_tx, NULL, NULL },
{ "VOS865: Many transactions", many_tx, NULL, NULL },
{ "VOS866: Uncommitted parent punch", uncommitted_parent, NULL, NULL },
};

static const struct CMUnitTest punch_model_tests_all[] = {
Expand Down
Loading

0 comments on commit 4ac0f3e

Please sign in to comment.