Skip to content

Commit

Permalink
* Modify ilog_aggregate so it takes minor epoch with parent punch
Browse files Browse the repository at this point in the history
* Modify aggregate to bypass aggregation if child tree was aborted
* Fix an issue with incarnation log.  We don't need to abort
  aggregation if the uncommitted entry is after the aggregation period
  so move the check.

Note, there is an issue with single value still because it doesn't
abort the aggregation under such cases so this is a WIP

Skip-test: true

Signed-off-by: Jeff Olivier <jeffrey.v.olivier@intel.com>
  • Loading branch information
jolivier23 committed Mar 11, 2021
1 parent fa892ce commit 318aaf1
Show file tree
Hide file tree
Showing 8 changed files with 100 additions and 30 deletions.
50 changes: 37 additions & 13 deletions src/vos/ilog.c
Original file line number Diff line number Diff line change
Expand Up @@ -1465,6 +1465,7 @@ struct agg_arg {
const struct ilog_entry *aa_prior_punch;
daos_epoch_t aa_punched;
bool aa_discard;
uint16_t aa_punched_minor;
};

enum {
Expand All @@ -1475,26 +1476,47 @@ enum {
AGG_RC_ABORT,
};

static bool
entry_punched(const struct ilog_entry *entry, const struct agg_arg *agg_arg)
{
uint16_t minor_epc = MAX(entry->ie_id.id_punch_minor_eph,
entry->ie_id.id_update_minor_eph);

if (entry->ie_id.id_epoch > agg_arg->aa_punched)
return false;

if (entry->ie_id.id_epoch < agg_arg->aa_punched)
return true;

return minor_epc <= agg_arg->aa_punched_minor;

}

static int
check_agg_entry(const struct ilog_entry *entry, struct agg_arg *agg_arg)
{
int rc;
int rc;
bool parent_punched = false;
uint16_t minor_epc = MAX(entry->ie_id.id_punch_minor_eph,
entry->ie_id.id_update_minor_eph);

D_DEBUG(DB_TRACE, "Entry "DF_X64" punch=%s prev="DF_X64
D_DEBUG(DB_TRACE, "Entry "DF_X64".%d punch=%s prev="DF_X64
" prior_punch="DF_X64"\n", entry->ie_id.id_epoch,
ilog_is_punch(entry) ? "yes" : "no",
minor_epc, ilog_is_punch(entry) ? "yes" : "no",
agg_arg->aa_prev ? agg_arg->aa_prev->ie_id.id_epoch : 0,
agg_arg->aa_prior_punch ?
agg_arg->aa_prior_punch->ie_id.id_epoch : 0);

if (entry->ie_id.id_epoch > agg_arg->aa_epr->epr_hi)
D_GOTO(done, rc = AGG_RC_DONE);

/* Abort ilog aggregation on hitting any uncommitted entry */
if (entry->ie_status == ILOG_UNCOMMITTED)
D_GOTO(done, rc = AGG_RC_ABORT);

if (entry->ie_id.id_epoch > agg_arg->aa_epr->epr_hi)
D_GOTO(done, rc = AGG_RC_DONE);
parent_punched = entry_punched(entry, agg_arg);
if (entry->ie_id.id_epoch < agg_arg->aa_epr->epr_lo) {
if (entry->ie_id.id_epoch <= agg_arg->aa_punched) {
if (parent_punched) {
/* Skip entries outside of the range and
* punched by the parent
*/
Expand All @@ -1514,7 +1536,7 @@ check_agg_entry(const struct ilog_entry *entry, struct agg_arg *agg_arg)
D_ASSERT(entry->ie_status != ILOG_UNCOMMITTED);

if (agg_arg->aa_discard || entry->ie_status == ILOG_REMOVED ||
agg_arg->aa_punched >= entry->ie_id.id_epoch) {
parent_punched) {
/* Remove stale entry or punched entry */
D_GOTO(done, rc = AGG_RC_REMOVE);
}
Expand All @@ -1525,7 +1547,7 @@ check_agg_entry(const struct ilog_entry *entry, struct agg_arg *agg_arg)

if (!punch) {
/* punched by outer level */
punch = prev->ie_id.id_epoch <= agg_arg->aa_punched;
punch = entry_punched(prev, agg_arg);
}
if (ilog_is_punch(entry) == punch) {
/* Remove redundant entry */
Expand Down Expand Up @@ -1560,7 +1582,8 @@ check_agg_entry(const struct ilog_entry *entry, struct agg_arg *agg_arg)
int
ilog_aggregate(struct umem_instance *umm, struct ilog_df *ilog,
const struct ilog_desc_cbs *cbs, const daos_epoch_range_t *epr,
bool discard, daos_epoch_t punched, struct ilog_entries *entries)
bool discard, daos_epoch_t punched_major, uint16_t punched_minor,
struct ilog_entries *entries)
{
struct ilog_priv *priv = ilog_ent2priv(entries);
struct ilog_context *lctx;
Expand All @@ -1576,11 +1599,11 @@ ilog_aggregate(struct umem_instance *umm, struct ilog_df *ilog,
daos_handle_t toh = DAOS_HDL_INVAL;

D_ASSERT(epr != NULL);
D_ASSERT(punched <= epr->epr_hi);
D_ASSERT(punched_major <= epr->epr_hi);

D_DEBUG(DB_TRACE, "%s incarnation log: epr: "DF_X64"-"DF_X64" punched="
DF_X64"\n", discard ? "Discard" : "Aggregate", epr->epr_lo,
epr->epr_hi, punched);
DF_X64".%d\n", discard ? "Discard" : "Aggregate", epr->epr_lo,
epr->epr_hi, punched_major, punched_minor);

/* This can potentially be optimized but using ilog_fetch gets some code
* reuse.
Expand All @@ -1603,7 +1626,8 @@ ilog_aggregate(struct umem_instance *umm, struct ilog_df *ilog,
agg_arg.aa_epr = epr;
agg_arg.aa_prev = NULL;
agg_arg.aa_prior_punch = NULL;
agg_arg.aa_punched = punched;
agg_arg.aa_punched = punched_major;
agg_arg.aa_punched_minor = punched_minor;
agg_arg.aa_discard = discard;

if (root->lr_tree.it_embedded) {
Expand Down
5 changes: 3 additions & 2 deletions src/vos/ilog.h
Original file line number Diff line number Diff line change
Expand Up @@ -190,7 +190,8 @@ struct ilog_entries {
* that are provably not needed. If discard is
* set, it will remove everything in the epoch
* range.
* \param punched[in] Max punch of parent incarnation log
* \param punch_major[in] Max major epoch punch of parent incarnation log
* \param punch_major[in] Max minor epoch punch of parent incarnation log
* \param entries[in] Used for efficiency since aggregation is used
* by vos_iterator
*
Expand All @@ -201,7 +202,7 @@ struct ilog_entries {
int
ilog_aggregate(struct umem_instance *umm, struct ilog_df *root,
const struct ilog_desc_cbs *cbs, const daos_epoch_range_t *epr,
bool discard, daos_epoch_t punched,
bool discard, daos_epoch_t punch_major, uint16_t punch_minor,
struct ilog_entries *entries);

/** Initialize an ilog_entries struct for fetch
Expand Down
15 changes: 9 additions & 6 deletions src/vos/tests/vts_ilog.c
Original file line number Diff line number Diff line change
Expand Up @@ -871,7 +871,7 @@ ilog_test_aggregate(void **state)
commit_all();
epr.epr_lo = 2;
epr.epr_hi = 4;
rc = ilog_aggregate(umm, ilog, &ilog_callbacks, &epr, false, 0,
rc = ilog_aggregate(umm, ilog, &ilog_callbacks, &epr, false, 0, 0,
&ilents);
LOG_FAIL(rc, 0, "Failed to aggregate ilog\n");
version_cache_fetch(&version_cache, loh, true);
Expand All @@ -895,7 +895,7 @@ ilog_test_aggregate(void **state)

epr.epr_lo = 0;
epr.epr_hi = 6;
rc = ilog_aggregate(umm, ilog, &ilog_callbacks, &epr, false, 0,
rc = ilog_aggregate(umm, ilog, &ilog_callbacks, &epr, false, 0, 0,
&ilents);
LOG_FAIL(rc, 0, "Failed to aggregate ilog\n");
version_cache_fetch(&version_cache, loh, true);
Expand All @@ -910,7 +910,7 @@ ilog_test_aggregate(void **state)
version_cache_fetch(&version_cache, loh, true);
commit_all();
epr.epr_hi = 7;
rc = ilog_aggregate(umm, ilog, &ilog_callbacks, &epr, false, 0,
rc = ilog_aggregate(umm, ilog, &ilog_callbacks, &epr, false, 0, 0,
&ilents);
/* 1 means empty */
LOG_FAIL(rc, 1, "Failed to aggregate log entry\n");
Expand Down Expand Up @@ -987,7 +987,8 @@ ilog_test_discard(void **state)
commit_all();
epr.epr_lo = 2;
epr.epr_hi = 4;
rc = ilog_aggregate(umm, ilog, &ilog_callbacks, &epr, true, 0, &ilents);
rc = ilog_aggregate(umm, ilog, &ilog_callbacks, &epr, true, 0, 0,
&ilents);
LOG_FAIL(rc, 0, "Failed to aggregate ilog\n");
version_cache_fetch(&version_cache, loh, true);

Expand All @@ -1009,7 +1010,8 @@ ilog_test_discard(void **state)
commit_all();
epr.epr_lo = 0;
epr.epr_hi = 6;
rc = ilog_aggregate(umm, ilog, &ilog_callbacks, &epr, true, 0, &ilents);
rc = ilog_aggregate(umm, ilog, &ilog_callbacks, &epr, true, 0, 0,
&ilents);
/* 1 means empty */
LOG_FAIL(rc, 1, "Failed to aggregate ilog\n");
version_cache_fetch(&version_cache, loh, true);
Expand All @@ -1026,7 +1028,8 @@ ilog_test_discard(void **state)
commit_all();

epr.epr_hi = 7;
rc = ilog_aggregate(umm, ilog, &ilog_callbacks, &epr, true, 0, &ilents);
rc = ilog_aggregate(umm, ilog, &ilog_callbacks, &epr, true, 0, 0,
&ilents);
/* 1 means empty */
LOG_FAIL(rc, 1, "Failed to aggregate ilog\n");
version_cache_fetch(&version_cache, loh, true);
Expand Down
42 changes: 40 additions & 2 deletions src/vos/vos_aggregate.c
Original file line number Diff line number Diff line change
Expand Up @@ -137,6 +137,9 @@ struct vos_agg_param {
daos_epoch_t ap_max_epoch;
/* EV tree: Merge window for evtree aggregation */
struct agg_merge_window ap_window;
bool ap_skip_akey;
bool ap_skip_dkey;
bool ap_skip_obj;
};

static inline void
Expand Down Expand Up @@ -1824,6 +1827,14 @@ vos_aggregate_pre_cb(daos_handle_t ih, vos_iter_entry_t *entry,
*acts |= VOS_ITER_CB_ABORT;
if (rc == -DER_CSUM)
agg_param->ap_csum_err = true;
if (rc == -DER_TX_BUSY) {
/** Must not aggregate anything above this
* entry to avoid orphaned updates
*/
agg_param->ap_skip_akey = true;
agg_param->ap_skip_dkey = true;
agg_param->ap_skip_obj = true;
}
rc = 0;
}
break;
Expand Down Expand Up @@ -1876,10 +1887,22 @@ vos_aggregate_post_cb(daos_handle_t ih, vos_iter_entry_t *entry,

switch (type) {
case VOS_ITER_OBJ:
if (agg_param->ap_skip_obj) {
agg_param->ap_skip_obj = false;
break;
}
rc = oi_iter_aggregate(ih, agg_param->ap_discard);
break;
case VOS_ITER_DKEY:
if (agg_param->ap_skip_dkey) {
agg_param->ap_skip_dkey = false;
break;
}
case VOS_ITER_AKEY:
if (agg_param->ap_skip_dkey) {
agg_param->ap_skip_dkey = false;
break;
}
rc = vos_obj_iter_aggregate(ih, agg_param->ap_discard);
break;
case VOS_ITER_SINGLE:
Expand All @@ -1902,10 +1925,25 @@ vos_aggregate_post_cb(daos_handle_t ih, vos_iter_entry_t *entry,
* -DER_TX_BUSY error indicates current ilog aggregation
* aborted on hitting uncommitted entry, this should be a very
* rare case, we'd suppress the error here to keep aggregation
* moving forward.
* moving forward. We do, however, need to ensure we do not
* aggregate anything in the parent path. Otherwise, we could
* orphan the current entry due to incarnation log semantics.
*/
if (rc == -DER_TX_BUSY)
if (rc == -DER_TX_BUSY) {
rc = 0;
switch (type) {
default:
D_ASSERTF(type == VOS_ITER_OBJ,
"Invalid iter type\n");
break;
case VOS_ITER_AKEY:
agg_param->ap_skip_dkey = true;
/* fall through */
case VOS_ITER_DKEY:
agg_param->ap_skip_obj = true;
/* fall through */
}
}
}

return rc;
Expand Down
11 changes: 7 additions & 4 deletions src/vos/vos_ilog.c
Original file line number Diff line number Diff line change
Expand Up @@ -507,20 +507,23 @@ vos_ilog_punch_(struct vos_container *cont, struct ilog_df *ilog,
int
vos_ilog_aggregate(daos_handle_t coh, struct ilog_df *ilog,
const daos_epoch_range_t *epr,
bool discard, daos_epoch_t punched,
bool discard, const struct vos_punch_record *parent_punch,
struct vos_ilog_info *info)
{
struct vos_container *cont = vos_hdl2cont(coh);
struct umem_instance *umm = vos_cont2umm(cont);
struct ilog_desc_cbs cbs;
struct vos_punch_record punch_rec = {punched, 0};
struct vos_punch_record punch_rec = {0, 0};
int rc;

if (parent_punch)
punch_rec = *parent_punch;

vos_ilog_desc_cbs_init(&cbs, coh);
D_DEBUG(DB_TRACE, "log="DF_X64"\n", umem_ptr2off(umm, ilog));

rc = ilog_aggregate(umm, ilog, &cbs, epr, discard, punched,
&info->ii_entries);
rc = ilog_aggregate(umm, ilog, &cbs, epr, discard, punch_rec.pr_epc,
punch_rec.pr_minor_epc, &info->ii_entries);

if (rc != 0)
return rc;
Expand Down
3 changes: 2 additions & 1 deletion src/vos/vos_ilog.h
Original file line number Diff line number Diff line change
Expand Up @@ -199,7 +199,8 @@ vos_ilog_desc_cbs_init(struct ilog_desc_cbs *cbs, daos_handle_t coh);
int
vos_ilog_aggregate(daos_handle_t coh, struct ilog_df *ilog,
const daos_epoch_range_t *epr, bool discard,
daos_epoch_t punched, struct vos_ilog_info *info);
const struct vos_punch_record *parent_punch,
struct vos_ilog_info *info);

/* #define ILOG_TRACE */
#ifdef ILOG_TRACE
Expand Down
2 changes: 1 addition & 1 deletion src/vos/vos_obj.c
Original file line number Diff line number Diff line change
Expand Up @@ -1827,7 +1827,7 @@ vos_obj_iter_aggregate(daos_handle_t ih, bool discard)

rc = vos_ilog_aggregate(vos_cont2hdl(obj->obj_cont), &krec->kr_ilog,
&oiter->it_epr, discard,
oiter->it_punched.pr_epc, &oiter->it_ilog_info);
&oiter->it_punched, &oiter->it_ilog_info);

if (rc == 1) {
/* Incarnation log is empty so delete the key */
Expand Down
2 changes: 1 addition & 1 deletion src/vos/vos_obj_index.c
Original file line number Diff line number Diff line change
Expand Up @@ -670,7 +670,7 @@ oi_iter_aggregate(daos_handle_t ih, bool discard)
goto exit;

rc = vos_ilog_aggregate(vos_cont2hdl(oiter->oit_cont), &obj->vo_ilog,
&oiter->oit_epr, discard, 0,
&oiter->oit_epr, discard, NULL,
&oiter->oit_ilog_info);
if (rc == 1) {
/* Incarnation log is empty, delete the object */
Expand Down

0 comments on commit 318aaf1

Please sign in to comment.