diff --git a/src/container/srv_target.c b/src/container/srv_target.c index 653cb5f7d37a..71d13f637bd0 100644 --- a/src/container/srv_target.c +++ b/src/container/srv_target.c @@ -321,7 +321,7 @@ cont_child_aggregate(struct ds_cont_child *cont, cont_aggregate_cb_t agg_cb, DAOS_FAIL_CHECK(DAOS_FORCE_EC_AGG_PEER_FAIL))) interval = 0; else - interval = d_sec2hlc(DAOS_AGG_THRESHOLD); + interval = cont->sc_agg_eph_gap; D_ASSERT(hlc > (interval * 2)); /* @@ -409,6 +409,9 @@ cont_child_aggregate(struct ds_cont_child *cont, cont_aggregate_cb_t agg_cb, DP_CONT(cont->sc_pool->spc_uuid, cont->sc_uuid), tgt_id, epoch_range.epr_lo, epoch_range.epr_hi); + if (!param->ap_vos_agg) + vos_cont_set_mod_bound(cont->sc_hdl, epoch_range.epr_hi); + flags |= VOS_AGG_FL_FORCE_MERGE; rc = agg_cb(cont, &epoch_range, flags, param); if (rc) @@ -425,6 +428,9 @@ cont_child_aggregate(struct ds_cont_child *cont, cont_aggregate_cb_t agg_cb, DP_CONT(cont->sc_pool->spc_uuid, cont->sc_uuid), tgt_id, epoch_range.epr_lo, epoch_range.epr_hi); + if (!param->ap_vos_agg) + vos_cont_set_mod_bound(cont->sc_hdl, epoch_range.epr_hi); + if (dss_xstream_is_busy()) flags &= ~VOS_AGG_FL_FORCE_MERGE; rc = agg_cb(cont, &epoch_range, flags, param); diff --git a/src/dtx/dtx_common.c b/src/dtx/dtx_common.c index 1ee74ae11a44..96e8119d4ce8 100644 --- a/src/dtx/dtx_common.c +++ b/src/dtx/dtx_common.c @@ -922,6 +922,7 @@ dtx_handle_init(struct dtx_id *dti, daos_handle_t xoh, struct dtx_epoch *epoch, dth->dth_for_migration = (flags & DTX_FOR_MIGRATION) ? 1 : 0; dth->dth_ignore_uncommitted = (flags & DTX_IGNORE_UNCOMMITTED) ? 1 : 0; dth->dth_prepared = (flags & DTX_PREPARED) ? 1 : 0; + dth->dth_epoch_owner = (flags & DTX_EPOCH_OWNER) ? 1 : 0; dth->dth_aborted = 0; dth->dth_already = 0; dth->dth_need_validation = 0; @@ -1853,6 +1854,8 @@ dtx_cont_register(struct ds_cont_child *cont) D_GOTO(out, rc = -DER_NOMEM); } + cont->sc_agg_eph_gap = d_sec2hlc(vos_get_agg_gap()); + ds_cont_child_get(cont); dbca->dbca_refs = 0; dbca->dbca_cont = cont; diff --git a/src/dtx/tests/dts_structs.c b/src/dtx/tests/dts_structs.c index dc4347fed7cc..a763546f824e 100644 --- a/src/dtx/tests/dts_structs.c +++ b/src/dtx/tests/dts_structs.c @@ -70,8 +70,9 @@ struct_dtx_handle(void **state) SET_BITFIELD_1(dummy, dth_need_validation); SET_BITFIELD_1(dummy, dth_ignore_uncommitted); SET_BITFIELD_1(dummy, dth_local); + SET_BITFIELD_1(dummy, dth_epoch_owner); SET_BITFIELD_1(dummy, dth_local_complete); - SET_BITFIELD(dummy, padding1, 13); + SET_BITFIELD(dummy, padding1, 12); SET_FIELD(dummy, dth_dti_cos_count); SET_FIELD(dummy, dth_dti_cos); diff --git a/src/engine/sched.c b/src/engine/sched.c index 49a46ca3618d..807f150839e2 100644 --- a/src/engine/sched.c +++ b/src/engine/sched.c @@ -197,17 +197,6 @@ enum { static int sched_policy; -/* - * Time threshold for giving IO up throttling. If space pressure stays in the - * highest level for enough long time, we assume that no more space can be - * reclaimed and choose to give up IO throttling, so that ENOSPACE error could - * be returned to client earlier. - * - * To make time for aggregation reclaiming overwriteen space, this threshold - * should be longer than the DAOS_AGG_THRESHOLD. - */ -#define SCHED_DELAY_THRESH 40000 /* msecs */ - struct pressure_ratio { unsigned int pr_free; /* free space ratio */ unsigned int pr_gc_ratio; /* CPU percentage for GC & Aggregation */ @@ -943,12 +932,21 @@ is_gc_pending(struct sched_pool_info *spi) return spi->spi_gc_ults && (spi->spi_gc_ults > spi->spi_gc_sleeping); } -/* Just run into this space pressure situation recently? */ +/* + * Just run into this space pressure situation recently? + * + * If space pressure stays in the highest level for enough long time, we assume + * that no more space can be reclaimed and choose to give up IO throttling, so + * that ENOSPACE error could be returned to client earlier. + * + * To make time for aggregation reclaiming overwriteen space, this threshold + * should be longer than VOS aggregation epoch gap with current HLC. + */ static inline bool is_pressure_recent(struct sched_info *info, struct sched_pool_info *spi) { D_ASSERT(info->si_cur_ts >= spi->spi_pressure_ts); - return (info->si_cur_ts - spi->spi_pressure_ts) < SCHED_DELAY_THRESH; + return (info->si_cur_ts - spi->spi_pressure_ts) < info->si_agg_gap; } static inline uint64_t @@ -2256,6 +2254,8 @@ sched_run(ABT_sched sched) return; } + dx->dx_sched_info.si_agg_gap = (vos_get_agg_gap() + 10) * 1000; /* msecs */ + while (1) { /* Try to pick network poll ULT */ pool = pools[DSS_POOL_NET_POLL]; diff --git a/src/engine/srv_internal.h b/src/engine/srv_internal.h index 222f07e49060..d1f240270fbb 100644 --- a/src/engine/srv_internal.h +++ b/src/engine/srv_internal.h @@ -61,6 +61,7 @@ struct sched_info { /* Number of kicked requests for each type in current cycle */ uint32_t si_kicked_req_cnt[SCHED_REQ_MAX]; unsigned int si_stop:1; + uint64_t si_agg_gap; }; struct mem_stats { diff --git a/src/include/daos/dtx.h b/src/include/daos/dtx.h index ca719077a143..17497565c16c 100644 --- a/src/include/daos/dtx.h +++ b/src/include/daos/dtx.h @@ -27,17 +27,6 @@ /* The time (in second) threshold for batched DTX commit. */ #define DTX_COMMIT_THRESHOLD_AGE 10 -/* - * VOS aggregation should try to avoid aggregating in the epoch range where - * lots of data records are pending to commit, so the aggregation epoch upper - * bound is: current HLC - (DTX batched commit threshold + buffer period) - * - * To avoid conflicting of aggregation vs. transactions, any transactional - * update/fetch with epoch lower than the aggregation upper bound should be - * rejected and restarted. - */ -#define DAOS_AGG_THRESHOLD (DTX_COMMIT_THRESHOLD_AGE + 10) /* seconds */ - enum dtx_target_flags { /* The target only contains read-only operations for the DTX. */ DTF_RDONLY = (1 << 0), diff --git a/src/include/daos_srv/container.h b/src/include/daos_srv/container.h index 9fc615c2a8b7..14953a1972ae 100644 --- a/src/include/daos_srv/container.h +++ b/src/include/daos_srv/container.h @@ -129,6 +129,12 @@ struct ds_cont_child { */ uint64_t sc_ec_update_timestamp; + /* + * The gap between the max allowed aggregation epoch and current HLC. The modification + * with older epoch out of range may cause conflict with aggregation as to be rejected. + */ + uint64_t sc_agg_eph_gap; + /* The objects with committable DTXs in DRAM. */ daos_handle_t sc_dtx_cos_hdl; /* The DTX COS-btree. */ diff --git a/src/include/daos_srv/dtx_srv.h b/src/include/daos_srv/dtx_srv.h index 7c60d2deaa03..f648b42d7be6 100644 --- a/src/include/daos_srv/dtx_srv.h +++ b/src/include/daos_srv/dtx_srv.h @@ -113,8 +113,10 @@ struct dtx_handle { dth_ignore_uncommitted : 1, /* Local transaction */ dth_local : 1, + /* Locally generate the epoch. */ + dth_epoch_owner : 1, /* Flag to commit the local transaction */ - dth_local_complete : 1, padding1 : 13; + dth_local_complete : 1, padding1 : 12; /* The count the DTXs in the dth_dti_cos array. */ uint32_t dth_dti_cos_count; @@ -287,6 +289,8 @@ enum dtx_flags { DTX_RELAY = (1 << 10), /** Local transaction */ DTX_LOCAL = (1 << 11), + /** Locally generate the epoch. */ + DTX_EPOCH_OWNER = (1 << 12), }; void diff --git a/src/include/daos_srv/vos.h b/src/include/daos_srv/vos.h index b6287c2986e1..e34ade044dd3 100644 --- a/src/include/daos_srv/vos.h +++ b/src/include/daos_srv/vos.h @@ -938,6 +938,56 @@ vos_update_renew_epoch(daos_handle_t ioh, struct dtx_handle *dth); void vos_dtx_renew_epoch(struct dtx_handle *dth); +/** + * Calculate current locally known stable epoch for the given container. + * + * \param coh [IN] Container open handle + * + * \return The epoch on success, negative value if error. + */ +daos_epoch_t +vos_cont_get_local_stable_epoch(daos_handle_t coh); + +/** + * Get global stable epoch for the given container. + * + * \param coh [IN] Container open handle + * + * \return The epoch on success, negative value if error. + */ +daos_epoch_t +vos_cont_get_global_stable_epoch(daos_handle_t coh); + +/** + * Set global stable epoch for the given container. + * + * \param coh [IN] Container open handle + * \param epoch [IN] The epoch to be used as the new global stable epoch. + * + * \return Zero on success, negative value if error. + */ +int +vos_cont_set_global_stable_epoch(daos_handle_t coh, daos_epoch_t epoch); + +/** + * Set the lowest allowed modification epoch for the given container. + * + * \param coh [IN] Container open handle + * \param epoch [IN] The lowest allowed epoch for modification. + * + * \return Zero on success, negative value if error. + */ +int +vos_cont_set_mod_bound(daos_handle_t coh, uint64_t epoch); + +/** + * Query the gap between the max allowed aggregation epoch and current HLC. + * + * \return The gap value in seconds. + */ +uint32_t +vos_get_agg_gap(void); + /** * Get the recx/epoch list. * diff --git a/src/include/daos_srv/vos_types.h b/src/include/daos_srv/vos_types.h index 0a52851c390d..fa173cf1f63e 100644 --- a/src/include/daos_srv/vos_types.h +++ b/src/include/daos_srv/vos_types.h @@ -58,6 +58,8 @@ enum dtx_entry_flags { * on all yet, need to be re-committed. */ DTE_PARTIAL_COMMITTED = (1 << 5), + /* The DTX epoch is sorted locally. */ + DTE_EPOCH_SORTED = (1 << 6), }; struct dtx_entry { diff --git a/src/object/srv_obj.c b/src/object/srv_obj.c index a79cec03f6fe..2654df4d5f98 100644 --- a/src/object/srv_obj.c +++ b/src/object/srv_obj.c @@ -2896,8 +2896,10 @@ ds_obj_rw_handler(crt_rpc_t *rpc) rc = process_epoch(&orw->orw_epoch, &orw->orw_epoch_first, &orw->orw_flags); - if (rc == PE_OK_LOCAL) + if (rc == PE_OK_LOCAL) { orw->orw_flags &= ~ORF_EPOCH_UNCERTAIN; + dtx_flags |= DTX_EPOCH_OWNER; + } if (obj_rpc_is_fetch(rpc)) { struct dtx_handle *dth; @@ -3856,8 +3858,10 @@ ds_obj_punch_handler(crt_rpc_t *rpc) rc = process_epoch(&opi->opi_epoch, NULL /* epoch_first */, &opi->opi_flags); - if (rc == PE_OK_LOCAL) + if (rc == PE_OK_LOCAL) { opi->opi_flags &= ~ORF_EPOCH_UNCERTAIN; + dtx_flags |= DTX_EPOCH_OWNER; + } version = opi->opi_map_ver; max_ver = opi->opi_map_ver; @@ -5110,6 +5114,7 @@ ds_obj_dtx_leader(struct daos_cpd_args *dca) &dcsh->dcsh_epoch.oe_first, &dcsh->dcsh_epoch.oe_rpc_flags); if (rc == PE_OK_LOCAL) { + dtx_flags |= DTX_EPOCH_OWNER; /* * In this case, writes to local RDGs can use the chosen epoch * without any uncertainty. This optimization is left to future @@ -5701,8 +5706,10 @@ ds_obj_coll_punch_handler(crt_rpc_t *rpc) if (ocpi->ocpi_flags & ORF_LEADER) { rc = process_epoch(&ocpi->ocpi_epoch, NULL /* epoch_first */, &ocpi->ocpi_flags); - if (rc == PE_OK_LOCAL) + if (rc == PE_OK_LOCAL) { ocpi->ocpi_flags &= ~ORF_EPOCH_UNCERTAIN; + dtx_flags |= DTX_EPOCH_OWNER; + } } else if (dct_nr == 1) { rc = obj_coll_local(rpc, dcts[0].dct_shards, dce, &version, &ioc, NULL, odm->odm_mbs, obj_coll_tgt_punch); diff --git a/src/tests/ftest/util/server_utils_params.py b/src/tests/ftest/util/server_utils_params.py index 46db48912200..a113956ce73a 100644 --- a/src/tests/ftest/util/server_utils_params.py +++ b/src/tests/ftest/util/server_utils_params.py @@ -436,6 +436,7 @@ class EngineYamlParameters(YamlParameters): "D_LOG_FILE_APPEND_PID=1", "DAOS_POOL_RF=4", "CRT_EVENT_DELAY=1", + "DAOS_VOS_AGG_GAP=20", "COVFILE=/tmp/test.cov"], "ofi+tcp": [], "ofi+tcp;ofi_rxm": [], diff --git a/src/vos/tests/vts_dtx.c b/src/vos/tests/vts_dtx.c index bd54dd52838c..8859405686d7 100644 --- a/src/vos/tests/vts_dtx.c +++ b/src/vos/tests/vts_dtx.c @@ -66,6 +66,7 @@ vts_dtx_begin(const daos_unit_oid_t *oid, daos_handle_t coh, daos_epoch_t epoch, dth->dth_for_migration = 0; dth->dth_ignore_uncommitted = 0; dth->dth_prepared = 0; + dth->dth_epoch_owner = 0; dth->dth_aborted = 0; dth->dth_already = 0; dth->dth_need_validation = 0; diff --git a/src/vos/vos_common.c b/src/vos/vos_common.c index e19768d4c032..c9da2f934001 100644 --- a/src/vos/vos_common.c +++ b/src/vos/vos_common.c @@ -620,6 +620,42 @@ struct dss_module_key vos_module_key = { daos_epoch_t vos_start_epoch = DAOS_EPOCH_MAX; +/* + * For standalone transaction, when it is started on the DTX leader, its epoch + * is generated by the leader, then the modification RPC will be forwarded to + * other related non-leader(s). If the forwarded RPC is delayed for some reason, + * such as netwrok congestion or system busy on the non-leader, as to the epoch + * for such transaction becomes very old (exceed related threshold), as to VOS + * aggregation may has already aggregated related epoch rang. Under such case, + * the non-leader will reject such modification to avoid data lost/corruption. + * + * For distributed transaction, if there is no read (fetch, query, enumerate, + * and so on) before client tx_commit, then related DTX leader will generate + * epoch for the transaction after client commit_tx. Then it will be the same + * as above standalone transaction for epoch handling. + * + * If the distributed transaction involves some read before client commit_tx, + * its epoch will be generated by the first accessed engine for read. If the + * transaction takes too long time after that, then when client commit_tx, its + * epoch may become very old as to related DTX leader will have to reject the + * transaction to aovid above mentioned conflict. And even if the DTX leader + * did not reject the transaction, some non-leader may also reject it because + * of the very old epoch. So it means that under such framework, the life for + * a distributed transaction cannot be too long. That can be adjusted via the + * server side environment variable DAOS_VOS_AGG_GAP. + * + * NOTE: EC/VOS aggregation should avoid aggregating in the epoch range where + * lots of data records are pending to commit, so the aggregation epoch + * upper bound is 'current HLC - vos_agg_gap'. + */ +uint32_t vos_agg_gap; + +uint32_t +vos_get_agg_gap(void) +{ + return vos_agg_gap; +} + static int vos_mod_init(void) { @@ -679,6 +715,15 @@ vos_mod_init(void) d_getenv_bool("DAOS_DKEY_PUNCH_PROPAGATE", &vos_dkey_punch_propagate); D_INFO("DKEY punch propagation is %s\n", vos_dkey_punch_propagate ? "enabled" : "disabled"); + vos_agg_gap = VOS_AGG_GAP_DEF; + d_getenv_uint("DAOS_VOS_AGG_GAP", &vos_agg_gap); + if (vos_agg_gap < VOS_AGG_GAP_MIN || vos_agg_gap > VOS_AGG_GAP_MAX) { + D_WARN("Invalid DAOS_VOS_AGG_GAP value, " + "valid range [%u, %u], set it as default %u (second)\n", + VOS_AGG_GAP_MIN, VOS_AGG_GAP_MAX, VOS_AGG_GAP_DEF); + vos_agg_gap = VOS_AGG_GAP_DEF; + } + D_INFO("Set DAOS VOS aggregation gap as %u (second)\n", vos_agg_gap); return rc; } diff --git a/src/vos/vos_container.c b/src/vos/vos_container.c index a5a55a902b9e..9dc3d3d626b8 100644 --- a/src/vos/vos_container.c +++ b/src/vos/vos_container.c @@ -198,6 +198,9 @@ cont_free_internal(struct vos_container *cont) lrua_array_free(cont->vc_dtx_array); D_ASSERT(d_list_empty(&cont->vc_dtx_act_list)); + D_ASSERT(d_list_empty(&cont->vc_dtx_sorted_list)); + D_ASSERT(d_list_empty(&cont->vc_dtx_unsorted_list)); + D_ASSERT(d_list_empty(&cont->vc_dtx_reindex_list)); dbtree_close(cont->vc_btr_hdl); @@ -395,6 +398,9 @@ vos_cont_open(daos_handle_t poh, uuid_t co_uuid, daos_handle_t *coh) cont->vc_cmt_dtx_indexed = 0; cont->vc_cmt_dtx_reindex_pos = cont->vc_cont_df->cd_dtx_committed_head; D_INIT_LIST_HEAD(&cont->vc_dtx_act_list); + D_INIT_LIST_HEAD(&cont->vc_dtx_sorted_list); + D_INIT_LIST_HEAD(&cont->vc_dtx_unsorted_list); + D_INIT_LIST_HEAD(&cont->vc_dtx_reindex_list); cont->vc_dtx_committed_count = 0; cont->vc_solo_dtx_epoch = d_hlc_get(); rc = gc_open_cont(cont); @@ -815,3 +821,173 @@ struct vos_iter_ops vos_cont_iter_ops = { .iop_fetch = cont_iter_fetch, .iop_process = cont_iter_process, }; + +/* + * The local stable epoch can be used to calculate global stable epoch: all the container + * shards report each own local stable epoch to some leader who will find out the smallest + * one as the global stable epoch and dispatch it to all related container shards. + */ +daos_epoch_t +vos_cont_get_local_stable_epoch(daos_handle_t coh) +{ + struct vos_container *cont; + struct vos_cont_ext_df *cont_ext; + struct vos_dtx_act_ent *dae; + uint64_t gap = d_sec2hlc(vos_agg_gap); + daos_epoch_t epoch = d_hlc_get() - gap; + + cont = vos_hdl2cont(coh); + D_ASSERT(cont != NULL); + + /* + * If the oldest (that is at the head of the sorted list) sorted DTX's + * epoch is out of the boundary, then use it as the local stable epoch. + */ + if (!d_list_empty(&cont->vc_dtx_sorted_list)) { + dae = d_list_entry(cont->vc_dtx_sorted_list.next, + struct vos_dtx_act_ent, dae_order_link); + if (epoch >= DAE_EPOCH(dae)) + epoch = DAE_EPOCH(dae) - 1; + } + + /* + * It is not easy to know which DTX is the oldest one in the unsorted list. + * The one after the header in the list maybe older than the header. But the + * epoch difference will NOT exceed 'vos_agg_gap' since any DTX with older + * epoch will be rejected (and restart with newer epoch). + * + * So "DAE_EPOCH(header) - vos_agg_gap" can be used to estimate the local + * stable epoch for unsorted DTX entries. + */ + if (!d_list_empty(&cont->vc_dtx_unsorted_list)) { + dae = d_list_entry(cont->vc_dtx_unsorted_list.next, + struct vos_dtx_act_ent, dae_order_link); + if (epoch > DAE_EPOCH(dae) - gap) + epoch = DAE_EPOCH(dae) - gap; + } + + /* + * The historical vos_agg_gap for the DTX entries in the reindex list is unknown. + * We use cont->vc_dtx_reindex_eph_diff to estimate the local stable epoch. That + * may be over-estimated. Usually, the count of re-indexed DTX entries is quite + * limited, and will be purged soon after the container opened (via DTX resync). + * So it will not much affect the local stable epoch calculation. + */ + if (unlikely(!d_list_empty(&cont->vc_dtx_reindex_list))) { + dae = d_list_entry(cont->vc_dtx_reindex_list.next, + struct vos_dtx_act_ent, dae_order_link); + if (epoch > DAE_EPOCH(dae) - cont->vc_dtx_reindex_eph_diff) + epoch = DAE_EPOCH(dae) - cont->vc_dtx_reindex_eph_diff; + } + + /* The new local stable epoch cannot be smaller than the old global stable epoch. */ + cont_ext = umem_off2ptr(vos_cont2umm(cont), cont->vc_cont_df->cd_ext); + if (cont_ext != NULL && epoch < cont_ext->ced_global_stable_epoch) + epoch = cont_ext->ced_global_stable_epoch; + + D_ASSERT(cont->vc_local_stable_epoch <= epoch); + cont->vc_local_stable_epoch = epoch; + + return epoch; +} + +/* + * The global stable epoch can be used for incremental reintegration: all the modifications + * involved in current target (container shard) under the global stable epoch have already + * been persistently stored globally, only need to care about the modification with newer + * epoch when reintegrate into the system. + */ +daos_epoch_t +vos_cont_get_globla_stable_epoch(daos_handle_t coh) +{ + struct vos_container *cont; + struct vos_cont_ext_df *cont_ext; + daos_epoch_t epoch = 0; + + cont = vos_hdl2cont(coh); + D_ASSERT(cont != NULL); + + cont_ext = umem_off2ptr(vos_cont2umm(cont), cont->vc_cont_df->cd_ext); + if (cont_ext != NULL) + epoch = cont_ext->ced_global_stable_epoch; + + return epoch; +} + +int +vos_cont_set_global_stable_epoch(daos_handle_t coh, daos_epoch_t epoch) +{ + struct umem_instance *umm; + struct vos_container *cont; + struct vos_cont_ext_df *cont_ext; + daos_epoch_t old = 0; + int rc = 0; + + cont = vos_hdl2cont(coh); + D_ASSERT(cont != NULL); + + umm = vos_cont2umm(cont); + cont_ext = umem_off2ptr(umm, cont->vc_cont_df->cd_ext); + + /* Do not allow to set global stable epoch against old container without extension. */ + if (cont_ext == NULL) + D_GOTO(out, rc = -DER_NOTSUPPORTED); + + /* + * Either the leader gives wrong global stable epoch or current target does not participant + * in the calculating new globle stable epoch. Then do not allow to set globle stable epoch. + */ + if (unlikely(cont->vc_local_stable_epoch < epoch)) { + D_WARN("Invalid global stable epoch: " DF_X64" vs " DF_X64 " for container " + DF_UUID "\n", cont->vc_local_stable_epoch, epoch, DP_UUID(cont->vc_id)); + D_GOTO(out, rc = -DER_NO_PERM); + } + + if (unlikely(cont_ext->ced_global_stable_epoch > epoch)) { + D_WARN("Do not allow to rollback global stable epoch from " + DF_X64" to " DF_X64 " for container " DF_UUID "\n", + cont_ext->ced_global_stable_epoch, epoch, DP_UUID(cont->vc_id)); + D_GOTO(out, rc = -DER_NO_PERM); + } + + if (cont_ext->ced_global_stable_epoch == epoch) + D_GOTO(out, rc = 0); + + old = cont_ext->ced_global_stable_epoch; + rc = umem_tx_begin(umm, NULL); + if (rc == 0) { + rc = umem_tx_add_ptr(umm, &cont_ext->ced_global_stable_epoch, + sizeof(cont_ext->ced_global_stable_epoch)); + if (rc == 0) { + cont_ext->ced_global_stable_epoch = epoch; + rc = umem_tx_commit(vos_cont2umm(cont)); + } else { + rc = umem_tx_abort(umm, rc); + } + } + + DL_CDEBUG(rc != 0, DLOG_ERR, DB_MGMT, rc, + "Set global stable epoch from "DF_X64" to " DF_X64 " for container " DF_UUID, + old , epoch, DP_UUID(cont->vc_id)); + +out: + return rc; +} + +int +vos_cont_set_mod_bound(daos_handle_t coh, uint64_t epoch) +{ + struct vos_container *cont; + + cont = vos_hdl2cont(coh); + D_ASSERT(cont != NULL); + + if (cont->vc_mod_epoch_bound < epoch) { + D_DEBUG(DB_TRACE, "Increase acceptable modification boundary from " + DF_X64 " to " DF_X64 " for container " DF_UUID "\n", + cont->vc_mod_epoch_bound, epoch, DP_UUID(cont->vc_id)); + cont->vc_mod_epoch_bound = epoch; + } + + return 0; +} diff --git a/src/vos/vos_dtx.c b/src/vos/vos_dtx.c index 86c100f47393..55a91d8bbff9 100644 --- a/src/vos/vos_dtx.c +++ b/src/vos/vos_dtx.c @@ -261,8 +261,10 @@ dtx_act_ent_free(struct btr_instance *tins, struct btr_record *rec, dae = umem_off2ptr(&tins->ti_umm, rec->rec_off); rec->rec_off = UMOFF_NULL; - if (dae != NULL) + if (dae != NULL) { + d_list_del_init(&dae->dae_order_link); d_list_del_init(&dae->dae_link); + } if (args != NULL) { /* Return the record addreass (offset in DRAM). @@ -995,6 +997,62 @@ vos_dtx_alloc(struct umem_instance *umm, struct dtx_handle *dth) cont = vos_hdl2cont(dth->dth_coh); D_ASSERT(cont != NULL); + /* Do not allow the modification with too old epoch. */ + if (dth->dth_epoch <= cont->vc_mod_epoch_bound) { + D_DEBUG(DB_TRACE, "Need to restart DTX (1) " DF_DTI " with epoch " + DF_X64 " vs bound " DF_X64 "\n", DP_DTI(&dth->dth_xid), + dth->dth_epoch, cont->vc_mod_epoch_bound); + return -DER_TX_RESTART; + } + + /* + * NOTE: For the purpose of efficient calculating container based local stable epoch, + * we will maintain some kind of sorted list for active DTX entries with epoch + * order. But consider related overhead, it is not easy to maintain a strictly + * sorted list for all active DTX entries. For the DTX which leader resides on + * current target, its epoch is already sorted when generate on current engine. + * So the main difficulty is for those DTX entries which leaders are on remote + * targets. + * + * On the other hand, the local stable epoch is mainly used to generate global + * stable epoch that is for incremental reintegration. In fact, we do not need + * a very accurate global stable epoch for incremental reintegration. It means + * that it is no matter (or non-fatal) if the calculated stable epoch is a bit + * smaller than the real case. For example, seconds error for the stable epoch + * almost can be ignored if we compare such overhead with rebuilding the whole + * target from scratch. So for the DTX entry which leader is on remomte target, + * we will maintain it in the list with relative incremental trend based on the + * epoch instead of strict sorting the epoch. We introduce an O(1) algorithm to + * handle such unsroted DTX entries list. + * + * For distributed transaction, its epoch may be generated on non-leader. + */ + + if (!dth->dth_epoch_owner && !d_list_empty(&cont->vc_dtx_unsorted_list)) { + dae = d_list_entry(cont->vc_dtx_unsorted_list.prev, struct vos_dtx_act_ent, + dae_order_link); + if (dth->dth_epoch < DAE_EPOCH(dae) && + cont->vc_mod_epoch_bound < DAE_EPOCH(dae) - d_sec2hlc(vos_agg_gap)) { + /* + * It guarantees that even if there was some older DTX to be added, + * the epoch difference beween it and all former added ones cannot + * exceed vos_agg_gap. So we can easily calculate the local stable + * epoch. Please reference vos_cont_get_local_stable_epoch(). + */ + D_DEBUG(DB_TRACE, "Increase acceptable modification boundary from " + DF_X64 " to " DF_X64 " for container " DF_UUID "\n", + cont->vc_mod_epoch_bound, + DAE_EPOCH(dae) - d_sec2hlc(vos_agg_gap), DP_UUID(cont->vc_id)); + cont->vc_mod_epoch_bound = DAE_EPOCH(dae) - d_sec2hlc(vos_agg_gap); + if (dth->dth_epoch <= cont->vc_mod_epoch_bound) { + D_DEBUG(DB_TRACE, "Need to restart DTX (2) " DF_DTI " with epoch " + DF_X64 " vs bound " DF_X64 "\n", DP_DTI(&dth->dth_xid), + dth->dth_epoch, cont->vc_mod_epoch_bound); + return -DER_TX_RESTART; + } + } + } + rc = lrua_allocx(cont->vc_dtx_array, &idx, dth->dth_epoch, &dae, &dth->dth_local_stub); if (rc != 0) { /* The array is full, need to commit some transactions first */ @@ -1007,6 +1065,7 @@ vos_dtx_alloc(struct umem_instance *umm, struct dtx_handle *dth) } D_INIT_LIST_HEAD(&dae->dae_link); + D_INIT_LIST_HEAD(&dae->dae_order_link); DAE_LID(dae) = idx + DTX_LID_RESERVED; if (dth->dth_solo) DAE_LID(dae) |= DTX_LID_SOLO_FLAG; @@ -1015,6 +1074,8 @@ vos_dtx_alloc(struct umem_instance *umm, struct dtx_handle *dth) DAE_DKEY_HASH(dae) = dth->dth_dkey_hash; DAE_EPOCH(dae) = dth->dth_epoch; DAE_FLAGS(dae) = dth->dth_flags; + if (dth->dth_epoch_owner) + DAE_FLAGS(dae) |= DTE_EPOCH_SORTED; DAE_VER(dae) = dth->dth_ver; if (dth->dth_mbs != NULL) { @@ -1043,6 +1104,15 @@ vos_dtx_alloc(struct umem_instance *umm, struct dtx_handle *dth) if (rc == 0) { dae->dae_start_time = daos_gettime_coarse(); d_list_add_tail(&dae->dae_link, &cont->vc_dtx_act_list); + if (dth->dth_epoch_owner) + d_list_add_tail(&dae->dae_order_link, &cont->vc_dtx_sorted_list); + else + /* + * Add all the others, including non-leader(s), into unsorted list. + * Then even though the leader was evicted for some reason, related + * DTX still can be considered via the new leader on another target. + */ + d_list_add_tail(&dae->dae_order_link, &cont->vc_dtx_unsorted_list); dth->dth_ent = dae; } else { dtx_evict_lid(cont, dae); @@ -2933,11 +3003,18 @@ int vos_dtx_act_reindex(struct vos_container *cont) { struct umem_instance *umm = vos_cont2umm(cont); - struct vos_cont_df *cont_df = cont->vc_cont_df; + struct vos_dtx_act_ent *prev = NULL; struct vos_dtx_blob_df *dbd; umem_off_t dbd_off = cont_df->cd_dtx_active_head; d_iov_t kiov; d_iov_t riov; + struct vos_cont_df *cont_df = cont->vc_cont_df; + /* The max epoch for all unsorted DTX entries to be re-indexed. */ + uint64_t max_eph = 0; + /* The min epoch which DTX entry is after the max_eph DTX. */ + uint64_t min_eph = 0; + /* The largest diff for above pairs 'max_eph - min_eph'. */ + uint64_t diff = 0; uint64_t start_time = daos_gettime_coarse(); int rc = 0; int i; @@ -3027,6 +3104,42 @@ vos_dtx_act_reindex(struct vos_container *cont) dae->dae_start_time = start_time; d_list_add_tail(&dae->dae_link, &cont->vc_dtx_act_list); + if (DAE_FLAGS(dae) & DTE_EPOCH_SORTED) { + d_list_add_tail(&dae->dae_order_link, &cont->vc_dtx_sorted_list); + } else { + /* + * The DXT entires in the active blob may be generated against + * different VOS AGG GAP configurations, or even upgraded from + * old system that did not support VOS AGG GAP logic yet. Link + * them into a reindex list. During the reindex scannning, we + * will find out the pairs with the largest epoch difference. + * Using such difference to estimate the local stable epoch. + * + * NOTE: The min_eph may be not the smallest one in all the DTX + * entries to be re-indexed, instead, it is after current + * known max_eph, and if max_eph is changed, min_eph will + * be reset. So there may be multuple max/min pairs. Each + * pairs has epoch own difference. We use the largest one. + * + * This is an O(N) algorithm. N is the count of DTX entries to be + * re-indexed. Please reference vos_cont_get_local_stable_epoch(). + */ + if (prev == NULL || DAE_EPOCH(dae) > DAE_EPOCH(prev)) { + if (max_eph < DAE_EPOCH(dae)) { + max_eph = DAE_EPOCH(dae); + min_eph = 0; + } + } else { + if (min_eph == 0 || min_eph > DAE_EPOCH(dae)) { + min_eph = DAE_EPOCH(dae); + if (diff < max_eph - min_eph) + diff = max_eph - min_eph; + } + } + + d_list_add_tail(&dae->dae_order_link, &cont->vc_dtx_reindex_list); + } + prev = dae; dbd_count++; } @@ -3044,6 +3157,8 @@ vos_dtx_act_reindex(struct vos_container *cont) dbd_off = dbd->dbd_next; } + cont->vc_dtx_reindex_eph_diff = diff; + out: return rc > 0 ? 0 : rc; } diff --git a/src/vos/vos_internal.h b/src/vos/vos_internal.h index 30e92318299a..c2252dc2c4fe 100644 --- a/src/vos/vos_internal.h +++ b/src/vos/vos_internal.h @@ -140,6 +140,12 @@ enum { /* Throttle ENOSPACE error message */ #define VOS_NOSPC_ERROR_INTVL 60 /* seconds */ +extern uint32_t vos_agg_gap; + +#define VOS_AGG_GAP_MIN 20 /* seconds */ +#define VOS_AGG_GAP_DEF 60 +#define VOS_AGG_GAP_MAX 180 + extern unsigned int vos_agg_nvme_thresh; extern bool vos_dkey_punch_propagate; @@ -359,6 +365,18 @@ struct vos_container { struct btr_root vc_dtx_committed_btr; /* The list for active DTXs, roughly ordered in time. */ d_list_t vc_dtx_act_list; + /* The list for the active DTX entries with epoch sorted. */ + d_list_t vc_dtx_sorted_list; + /* The list for the active DTX entries (but not re-indexed) with epoch unsorted. */ + d_list_t vc_dtx_unsorted_list; + /* The list for the active DTX entries that are re-indexed when open the container. */ + d_list_t vc_dtx_reindex_list; + /* The largest epoch difference for re-indexed DTX entries max/min pairs. */ + uint64_t vc_dtx_reindex_eph_diff; + /* The latest calculated local stable epoch. */ + daos_epoch_t vc_local_stable_epoch; + /* The lowest epoch boundary for current acceptable modification. */ + daos_epoch_t vc_mod_epoch_bound; /* The count of committed DTXs. */ uint32_t vc_dtx_committed_count; /** Index for timestamp lookup */ @@ -428,6 +446,8 @@ struct vos_dtx_act_ent { daos_unit_oid_t *dae_oids; /* The time (hlc) when the DTX entry is created. */ uint64_t dae_start_time; + /* Link into container::vc_dtx_{sorted,unsorted,reindex}_list. */ + d_list_t dae_order_link; /* Link into container::vc_dtx_act_list. */ d_list_t dae_link; /* Back pointer to the DTX handle. */ diff --git a/src/vos/vos_layout.h b/src/vos/vos_layout.h index 87d092bc882a..40daa55da937 100644 --- a/src/vos/vos_layout.h +++ b/src/vos/vos_layout.h @@ -271,8 +271,13 @@ enum vos_io_stream { struct vos_cont_ext_df { /* GC bucket extension */ struct vos_gc_bkt_df ced_gc_bkt; + /* + * Any modification involved in current target (container shard) under the global + * stable epoch have already been persistently stored globally. + */ + uint64_t ced_global_stable_epoch; /* Reserved for potential new features */ - uint64_t ced_paddings[38]; + uint64_t ced_paddings[37]; /* Reserved for future extension */ uint64_t ced_reserve; };