Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

DAOS-6967 vos: Data loss on updates after uncommitted punch #4899

Merged
merged 17 commits into from
Mar 17, 2021
Merged
Show file tree
Hide file tree
Changes from 9 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 11 additions & 0 deletions src/common/btree.c
Original file line number Diff line number Diff line change
Expand Up @@ -121,6 +121,8 @@ struct btr_context {
* or delete because the probe path could have been changed.
*/
int tc_probe_rc;
/** Mark the current entry as aborted */
bool tc_aborted;
/** refcount, used by iterator */
int tc_ref;
/** cached tree class, avoid loading from slow memory */
Expand Down Expand Up @@ -918,6 +920,8 @@ btr_check_availability(struct btr_context *tcx, struct btr_check_alb *alb)
struct btr_record *rec;
int rc;

tcx->tc_aborted = false;

if (btr_ops(tcx)->to_check_availability == NULL)
return PROBE_RC_OK;

Expand Down Expand Up @@ -959,6 +963,9 @@ btr_check_availability(struct btr_context *tcx, struct btr_check_alb *alb)
*/
case ALB_AVAILABLE_CLEAN:
return PROBE_RC_OK;
case ALB_AVAILABLE_ABORTED:
tcx->tc_aborted = true;
return PROBE_RC_OK;
case ALB_UNAVAILABLE:
default:
/* Unavailable */
Expand Down Expand Up @@ -3658,6 +3665,10 @@ dbtree_iter_delete(daos_handle_t ih, void *args)
if (rc != 0)
return rc;

/** The probed entry is not committed so return -DER_TX_BUSY */
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't see why an aborted entry can't be removed?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You are right. This needs to be for in-progress entries. We need another way to determine if an entry is aborted. I don't think the dtx_status stuff is working for single value

if (tcx->tc_aborted)
return -DER_TX_BUSY;

rc = btr_tx_delete(tcx, args);

/* reset iterator */
Expand Down
50 changes: 37 additions & 13 deletions src/vos/ilog.c
Original file line number Diff line number Diff line change
Expand Up @@ -1465,6 +1465,7 @@ struct agg_arg {
const struct ilog_entry *aa_prior_punch;
daos_epoch_t aa_punched;
bool aa_discard;
uint16_t aa_punched_minor;
};

enum {
Expand All @@ -1475,26 +1476,47 @@ enum {
AGG_RC_ABORT,
};

static bool
entry_punched(const struct ilog_entry *entry, const struct agg_arg *agg_arg)
{
uint16_t minor_epc = MAX(entry->ie_id.id_punch_minor_eph,
entry->ie_id.id_update_minor_eph);

if (entry->ie_id.id_epoch > agg_arg->aa_punched)
return false;

if (entry->ie_id.id_epoch < agg_arg->aa_punched)
return true;

return minor_epc <= agg_arg->aa_punched_minor;

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

if (entry->ie_id.id_punch_minor_eph > entry->ie_id.id_update_minor_eph &&
entry->ie_id.id_punch_minor_eph > agg_arg->aa_punched_minor &&
entry->ie_id.id_epoch == agg_arg->aa_punched)

Then it is a punched entry or not?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If the entry is punched (e.g. punch_minor > update_minor) then we will determine later that the entry is punched. But this check is only about whether it is punched entirely by the parent.

}

static int
check_agg_entry(const struct ilog_entry *entry, struct agg_arg *agg_arg)
{
int rc;
int rc;
bool parent_punched = false;
uint16_t minor_epc = MAX(entry->ie_id.id_punch_minor_eph,
entry->ie_id.id_update_minor_eph);

D_DEBUG(DB_TRACE, "Entry "DF_X64" punch=%s prev="DF_X64
D_DEBUG(DB_TRACE, "Entry "DF_X64".%d punch=%s prev="DF_X64
" prior_punch="DF_X64"\n", entry->ie_id.id_epoch,
ilog_is_punch(entry) ? "yes" : "no",
minor_epc, ilog_is_punch(entry) ? "yes" : "no",
agg_arg->aa_prev ? agg_arg->aa_prev->ie_id.id_epoch : 0,
agg_arg->aa_prior_punch ?
agg_arg->aa_prior_punch->ie_id.id_epoch : 0);

if (entry->ie_id.id_epoch > agg_arg->aa_epr->epr_hi)
D_GOTO(done, rc = AGG_RC_DONE);

/* Abort ilog aggregation on hitting any uncommitted entry */
if (entry->ie_status == ILOG_UNCOMMITTED)
D_GOTO(done, rc = AGG_RC_ABORT);

if (entry->ie_id.id_epoch > agg_arg->aa_epr->epr_hi)
D_GOTO(done, rc = AGG_RC_DONE);
parent_punched = entry_punched(entry, agg_arg);
if (entry->ie_id.id_epoch < agg_arg->aa_epr->epr_lo) {
if (entry->ie_id.id_epoch <= agg_arg->aa_punched) {
if (parent_punched) {
/* Skip entries outside of the range and
* punched by the parent
*/
Expand All @@ -1514,7 +1536,7 @@ check_agg_entry(const struct ilog_entry *entry, struct agg_arg *agg_arg)
D_ASSERT(entry->ie_status != ILOG_UNCOMMITTED);

if (agg_arg->aa_discard || entry->ie_status == ILOG_REMOVED ||
agg_arg->aa_punched >= entry->ie_id.id_epoch) {
parent_punched) {
/* Remove stale entry or punched entry */
D_GOTO(done, rc = AGG_RC_REMOVE);
}
Expand All @@ -1525,7 +1547,7 @@ check_agg_entry(const struct ilog_entry *entry, struct agg_arg *agg_arg)

if (!punch) {
/* punched by outer level */
punch = prev->ie_id.id_epoch <= agg_arg->aa_punched;
punch = entry_punched(prev, agg_arg);
}
if (ilog_is_punch(entry) == punch) {
/* Remove redundant entry */
Expand Down Expand Up @@ -1560,7 +1582,8 @@ check_agg_entry(const struct ilog_entry *entry, struct agg_arg *agg_arg)
int
ilog_aggregate(struct umem_instance *umm, struct ilog_df *ilog,
const struct ilog_desc_cbs *cbs, const daos_epoch_range_t *epr,
bool discard, daos_epoch_t punched, struct ilog_entries *entries)
bool discard, daos_epoch_t punched_major, uint16_t punched_minor,
struct ilog_entries *entries)
{
struct ilog_priv *priv = ilog_ent2priv(entries);
struct ilog_context *lctx;
Expand All @@ -1576,11 +1599,11 @@ ilog_aggregate(struct umem_instance *umm, struct ilog_df *ilog,
daos_handle_t toh = DAOS_HDL_INVAL;

D_ASSERT(epr != NULL);
D_ASSERT(punched <= epr->epr_hi);
D_ASSERT(punched_major <= epr->epr_hi);

D_DEBUG(DB_TRACE, "%s incarnation log: epr: "DF_X64"-"DF_X64" punched="
DF_X64"\n", discard ? "Discard" : "Aggregate", epr->epr_lo,
epr->epr_hi, punched);
DF_X64".%d\n", discard ? "Discard" : "Aggregate", epr->epr_lo,
epr->epr_hi, punched_major, punched_minor);

/* This can potentially be optimized but using ilog_fetch gets some code
* reuse.
Expand All @@ -1603,7 +1626,8 @@ ilog_aggregate(struct umem_instance *umm, struct ilog_df *ilog,
agg_arg.aa_epr = epr;
agg_arg.aa_prev = NULL;
agg_arg.aa_prior_punch = NULL;
agg_arg.aa_punched = punched;
agg_arg.aa_punched = punched_major;
agg_arg.aa_punched_minor = punched_minor;
agg_arg.aa_discard = discard;

if (root->lr_tree.it_embedded) {
Expand Down
5 changes: 3 additions & 2 deletions src/vos/ilog.h
Original file line number Diff line number Diff line change
Expand Up @@ -190,7 +190,8 @@ struct ilog_entries {
* that are provably not needed. If discard is
* set, it will remove everything in the epoch
* range.
* \param punched[in] Max punch of parent incarnation log
* \param punch_major[in] Max major epoch punch of parent incarnation log
* \param punch_major[in] Max minor epoch punch of parent incarnation log
* \param entries[in] Used for efficiency since aggregation is used
* by vos_iterator
*
Expand All @@ -201,7 +202,7 @@ struct ilog_entries {
int
ilog_aggregate(struct umem_instance *umm, struct ilog_df *root,
const struct ilog_desc_cbs *cbs, const daos_epoch_range_t *epr,
bool discard, daos_epoch_t punched,
bool discard, daos_epoch_t punch_major, uint16_t punch_minor,
struct ilog_entries *entries);

/** Initialize an ilog_entries struct for fetch
Expand Down
15 changes: 9 additions & 6 deletions src/vos/tests/vts_ilog.c
Original file line number Diff line number Diff line change
Expand Up @@ -871,7 +871,7 @@ ilog_test_aggregate(void **state)
commit_all();
epr.epr_lo = 2;
epr.epr_hi = 4;
rc = ilog_aggregate(umm, ilog, &ilog_callbacks, &epr, false, 0,
rc = ilog_aggregate(umm, ilog, &ilog_callbacks, &epr, false, 0, 0,
&ilents);
LOG_FAIL(rc, 0, "Failed to aggregate ilog\n");
version_cache_fetch(&version_cache, loh, true);
Expand All @@ -895,7 +895,7 @@ ilog_test_aggregate(void **state)

epr.epr_lo = 0;
epr.epr_hi = 6;
rc = ilog_aggregate(umm, ilog, &ilog_callbacks, &epr, false, 0,
rc = ilog_aggregate(umm, ilog, &ilog_callbacks, &epr, false, 0, 0,
&ilents);
LOG_FAIL(rc, 0, "Failed to aggregate ilog\n");
version_cache_fetch(&version_cache, loh, true);
Expand All @@ -910,7 +910,7 @@ ilog_test_aggregate(void **state)
version_cache_fetch(&version_cache, loh, true);
commit_all();
epr.epr_hi = 7;
rc = ilog_aggregate(umm, ilog, &ilog_callbacks, &epr, false, 0,
rc = ilog_aggregate(umm, ilog, &ilog_callbacks, &epr, false, 0, 0,
&ilents);
/* 1 means empty */
LOG_FAIL(rc, 1, "Failed to aggregate log entry\n");
Expand Down Expand Up @@ -987,7 +987,8 @@ ilog_test_discard(void **state)
commit_all();
epr.epr_lo = 2;
epr.epr_hi = 4;
rc = ilog_aggregate(umm, ilog, &ilog_callbacks, &epr, true, 0, &ilents);
rc = ilog_aggregate(umm, ilog, &ilog_callbacks, &epr, true, 0, 0,
&ilents);
LOG_FAIL(rc, 0, "Failed to aggregate ilog\n");
version_cache_fetch(&version_cache, loh, true);

Expand All @@ -1009,7 +1010,8 @@ ilog_test_discard(void **state)
commit_all();
epr.epr_lo = 0;
epr.epr_hi = 6;
rc = ilog_aggregate(umm, ilog, &ilog_callbacks, &epr, true, 0, &ilents);
rc = ilog_aggregate(umm, ilog, &ilog_callbacks, &epr, true, 0, 0,
&ilents);
/* 1 means empty */
LOG_FAIL(rc, 1, "Failed to aggregate ilog\n");
version_cache_fetch(&version_cache, loh, true);
Expand All @@ -1026,7 +1028,8 @@ ilog_test_discard(void **state)
commit_all();

epr.epr_hi = 7;
rc = ilog_aggregate(umm, ilog, &ilog_callbacks, &epr, true, 0, &ilents);
rc = ilog_aggregate(umm, ilog, &ilog_callbacks, &epr, true, 0, 0,
&ilents);
/* 1 means empty */
LOG_FAIL(rc, 1, "Failed to aggregate ilog\n");
version_cache_fetch(&version_cache, loh, true);
Expand Down
113 changes: 113 additions & 0 deletions src/vos/tests/vts_pm.c
Original file line number Diff line number Diff line change
Expand Up @@ -2334,6 +2334,118 @@ many_tx(void **state)
start_epoch = epoch + 1;
}

static struct dtx_id
execute_op(daos_handle_t coh, daos_unit_oid_t oid, daos_epoch_t epoch,
daos_key_t *dkey, daos_key_t *akey, d_sg_list_t *sgl,
char *buf, int len, bool commit, int op)
{
struct vos_ioreq req = {0};
daos_iod_t iod = {0};
int rc;

vts_dtx_begin(&oid, coh, epoch, 0, &req.dth);

req.oid = oid;
req.coh = coh;
req.xid = req.dth->dth_xid;
req.flags = 0;
req.dkey = dkey;
req.akey = akey;
if (akey)
req.akey_nr = 1;

if (op <= TX_OP_PUNCH_AKEY) {
do_punch(&req);
goto do_commit;
}

iod.iod_type = DAOS_IOD_SINGLE;
iod.iod_recxs = NULL;
iod.iod_nr = 1;
req.akey = NULL;
req.iod = &iod;
iod.iod_name = *akey;
iod.iod_size = len;
d_iov_set(&sgl->sg_iovs[0], (void *)buf, iod.iod_size);
sgl->sg_nr = 1;
sgl->sg_nr_out = 0;
req.sgl = sgl;
req.fetch_sgl = sgl;
do_io(&req, op);
do_commit:
vts_dtx_end(req.dth);
if (commit && req.commit) {
rc = vos_dtx_commit(coh, &req.xid, 1, NULL);
assert_rc_equal(rc, 1);
}

return req.xid;
}


static void
uncommitted_parent(void **state)
{
struct io_test_args *arg = *state;
int rc = 0;
daos_key_t dkey;
daos_key_t akey[2];
daos_iod_t iod;
d_sg_list_t sgl;
char buf[32];
daos_epoch_t epoch = start_epoch;
daos_handle_t coh;
char *first = "Hello";
char dkey_buf[UPDATE_DKEY_SIZE];
char akey_buf[2][UPDATE_AKEY_SIZE];
daos_unit_oid_t oid;
struct dtx_id xid;

test_args_reset(arg, VPOOL_SIZE);
coh = arg->ctx.tc_co_hdl;

memset(&iod, 0, sizeof(iod));

rc = d_sgl_init(&sgl, 1);
assert_rc_equal(rc, 0);

/* Set up dkey and akey */
oid = gen_oid(arg->ofeat);
vts_key_gen(&dkey_buf[0], arg->dkey_size, true, arg);
set_iov(&dkey, &dkey_buf[0], arg->ofeat & DAOS_OF_DKEY_UINT64);
vts_key_gen(&akey_buf[0][0], arg->akey_size, true, arg);
set_iov(&akey[0], &akey_buf[0][0], arg->ofeat & DAOS_OF_AKEY_UINT64);
vts_key_gen(&akey_buf[1][0], arg->akey_size, true, arg);
set_iov(&akey[1], &akey_buf[1][0], arg->ofeat & DAOS_OF_AKEY_UINT64);

execute_op(coh, oid, epoch, &dkey, &akey[0], &sgl, first, 5, true,
TX_OP_UPDATE1);
epoch += 10;
xid = execute_op(coh, oid, epoch, NULL, NULL, NULL, NULL, 0, false,
TX_OP_PUNCH_OBJ);
epoch += 10;
execute_op(coh, oid, epoch, &dkey, &akey[1], &sgl, first, 5, true,
TX_OP_UPDATE1);
/** Commit the punch */
rc = vos_dtx_commit(coh, &xid, 1, NULL);
assert_rc_equal(rc, 1);

memset(buf, 'x', sizeof(buf));
epoch += 10;
execute_op(coh, oid, epoch, &dkey, &akey[0], &sgl, buf, 5, true,
TX_OP_FETCH1);
assert_memory_equal(buf, "xxxxx", 5);

memset(buf, 'x', sizeof(buf));
epoch += 10;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It is unnecessary to bump the epoch again for another fetch.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

True but it also doesn't hurt anything

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not hurt anything, just redundant.

execute_op(coh, oid, epoch, &dkey, &akey[1], &sgl, buf, 5, true,
TX_OP_FETCH1);
assert_memory_equal(buf, first, 5);

d_sgl_fini(&sgl, false);
start_epoch = epoch + 1;
}

static void
test_multiple_key_conditionals_common(void **state, bool with_dtx)
{
Expand Down Expand Up @@ -2572,6 +2684,7 @@ static const struct CMUnitTest punch_model_tests_pmdk[] = {
{ "VOS864: Multikey conditionals with tx",
test_multiple_key_conditionals_tx, NULL, NULL },
{ "VOS865: Many transactions", many_tx, NULL, NULL },
{ "VOS866: Uncommitted parent punch", uncommitted_parent, NULL, NULL },
};

static const struct CMUnitTest punch_model_tests_all[] = {
Expand Down
Loading