diff --git a/btree/btree.c b/btree/btree.c index f48c61bf1a6..655b45dee76 100644 --- a/btree/btree.c +++ b/btree/btree.c @@ -595,6 +595,7 @@ enum base_phase { P_LOCKALL, P_LOCK, P_CHECK, + P_SANITY_CHECK, P_MAKESPACE, P_ACT, P_CAPTURE, @@ -3736,6 +3737,15 @@ static int64_t btree_put_kv_tick(struct m0_sm_op *smop) &child_node_addr, P_NEXTDOWN); } else { node_unlock(lev->l_node); + if (bop->bo_opc == M0_BO_UPDATE) { + /** + * TBD : Revisit this part, while + * implmenting UPDATE operation for + * variable value size. + */ + return P_LOCK; + } + if (oi->i_key_found) return P_LOCK; /** @@ -3865,19 +3875,24 @@ static int64_t btree_put_kv_tick(struct m0_sm_op *smop) } } /** Fall through if path_check is successful. */ - case P_MAKESPACE: { - if (oi->i_key_found) { - struct m0_btree_rec rec; - rec.r_flags = M0_BSC_KEY_EXISTS; - int rc = bop->bo_cb.c_act(&bop->bo_cb, &rec); - if (rc) { - lock_op_unlock(tree); - return fail(bop, rc); - } + case P_SANITY_CHECK: { + int rc = 0; + if (bop->bo_opc == M0_BO_PUT && oi->i_key_found) + rc = M0_BSC_KEY_EXISTS; + else if (bop->bo_opc == M0_BO_UPDATE && !oi->i_key_found) + rc = M0_BSC_KEY_NOT_FOUND; + + if (rc) { lock_op_unlock(tree); - return m0_sm_op_sub(&bop->bo_op, P_CLEANUP, P_FINI); + return fail(bop, rc); } - + if (bop->bo_opc == M0_BO_PUT) + return P_MAKESPACE; + else + return P_ACT; + } + case P_MAKESPACE: { + M0_ASSERT(!oi->i_key_found); lev = &oi->i_level[oi->i_used]; struct slot slot_for_right_node = { .s_node = lev->l_node, @@ -3896,16 +3911,18 @@ static int64_t btree_put_kv_tick(struct m0_sm_op *smop) void *p_key; m0_bcount_t vsize; void *p_val; - struct m0_btree_rec *rec; struct slot node_slot; + int rc; lev = &oi->i_level[oi->i_used]; + if (bop->bo_opc == M0_BO_UPDATE) + node_lock(lev->l_node); + node_slot.s_node = lev->l_node; node_slot.s_idx = lev->l_idx; - rec = &node_slot.s_rec; - *rec = REC_INIT(&p_key, &ksize, &p_val, &vsize); + node_slot.s_rec = REC_INIT(&p_key, &ksize, &p_val, &vsize); node_rec(&node_slot); /** @@ -3917,14 +3934,17 @@ static int64_t btree_put_kv_tick(struct m0_sm_op *smop) * revert back the changes made on btree. Detailed * explination is provided at P_MAKESPACE stage. */ - rec->r_flags = M0_BSC_SUCCESS; - int rc = bop->bo_cb.c_act(&bop->bo_cb, rec); + node_slot.s_rec.r_flags = M0_BSC_SUCCESS; + rc = bop->bo_cb.c_act(&bop->bo_cb, &node_slot.s_rec); if (rc) { /* handle if callback fail i.e undo make */ - node_del(node_slot.s_node, node_slot.s_idx, bop->bo_tx); - node_done(&node_slot, bop->bo_tx, true); - node_seq_cnt_update(lev->l_node); - node_fix(lev->l_node, bop->bo_tx); + if (bop->bo_opc == M0_BO_PUT) { + node_del(node_slot.s_node, node_slot.s_idx, + bop->bo_tx); + node_done(&node_slot, bop->bo_tx, true); + node_seq_cnt_update(lev->l_node); + node_fix(lev->l_node, bop->bo_tx); + } node_unlock(lev->l_node); lock_op_unlock(tree); @@ -3945,6 +3965,7 @@ static int64_t btree_put_kv_tick(struct m0_sm_op *smop) } case P_CAPTURE: btree_tx_nodes_capture(oi, bop->bo_tx); + bop->bo_op.o_sm.sm_rc = M0_BSC_SUCCESS; lock_op_unlock(tree); return m0_sm_op_sub(&bop->bo_op, P_CLEANUP, P_FINI); case P_CLEANUP: @@ -4022,17 +4043,24 @@ static struct m0_sm_state_descr btree_states[P_NR] = { [P_LOCK] = { .sd_flags = 0, .sd_name = "P_LOCK", - .sd_allowed = M0_BITS(P_CHECK, P_CAPTURE, P_CLEANUP), + .sd_allowed = M0_BITS(P_CHECK, P_MAKESPACE, P_ACT, P_CAPTURE, + P_CLEANUP), }, [P_CHECK] = { .sd_flags = 0, .sd_name = "P_CHECK", - .sd_allowed = M0_BITS(P_CAPTURE, P_CLEANUP, P_DOWN), + .sd_allowed = M0_BITS(P_CAPTURE, P_MAKESPACE, P_ACT, P_CLEANUP, + P_DOWN), + }, + [P_SANITY_CHECK] = { + .sd_flags = 0, + .sd_name = "P_SANITY_CHECK", + .sd_allowed = M0_BITS(P_MAKESPACE, P_ACT, P_CLEANUP), }, [P_MAKESPACE] = { .sd_flags = 0, .sd_name = "P_MAKESPACE", - .sd_allowed = M0_BITS(P_CLEANUP), + .sd_allowed = M0_BITS(P_CAPTURE, P_CLEANUP), }, [P_ACT] = { .sd_flags = 0, @@ -4108,11 +4136,18 @@ static struct m0_sm_trans_descr btree_trans[] = { { "kvop-lock", P_LOCK, P_CHECK }, { "kvop-lock-check-ht-changed", P_LOCK, P_CLEANUP }, { "put-lock-ft-capture", P_LOCK, P_CAPTURE }, + { "put-lock-ft-makespace", P_LOCK, P_MAKESPACE }, + { "put-lock-ft-act", P_LOCK, P_ACT }, { "kvop-check-height-changed", P_CHECK, P_CLEANUP }, { "kvop-check-height-same", P_CHECK, P_DOWN }, { "put-check-ft-capture", P_CHECK, P_CAPTURE }, + { "put-check-ft-makespace", P_LOCK, P_MAKESPACE }, + { "put-check-ft-act", P_LOCK, P_ACT }, + { "put-sanity-makespace", P_SANITY_CHECK, P_MAKESPACE }, + { "put-sanity-act", P_SANITY_CHECK, P_ACT }, + { "put-sanity-cleanup", P_SANITY_CHECK, P_CLEANUP}, + { "put-makespace-capture", P_MAKESPACE, P_CAPTURE }, { "put-makespace-cleanup", P_MAKESPACE, P_CLEANUP }, - { "put-makespace", P_MAKESPACE, P_ACT }, { "kvop-act", P_ACT, P_CLEANUP }, { "put-del-act", P_ACT, P_CAPTURE }, { "put-capture", P_CAPTURE, P_CLEANUP}, @@ -5603,6 +5638,23 @@ void m0_btree_del(struct m0_btree *arbor, const struct m0_btree_key *key, &btree_conf, &bop->bo_sm_group); } +void m0_btree_update(struct m0_btree *arbor, const struct m0_btree_rec *rec, + const struct m0_btree_cb *cb, uint64_t flags, + struct m0_btree_op *bop, struct m0_be_tx *tx) +{ + bop->bo_opc = M0_BO_UPDATE; + bop->bo_arbor = arbor; + bop->bo_rec = *rec; + bop->bo_cb = *cb; + bop->bo_tx = tx; + bop->bo_flags = flags; + bop->bo_seg = arbor->t_desc->t_seg; + bop->bo_i = NULL; + + m0_sm_op_init(&bop->bo_op, &btree_put_kv_tick, &bop->bo_op_exec, + &btree_conf, &bop->bo_sm_group); +} + #endif #ifndef __KERNEL__ @@ -6119,6 +6171,15 @@ struct cb_data { uint32_t flags; }; +/** + * Put callback will receive record pointer from put routine which will contain + * r_flag which will indicate SUCCESS or ERROR code. + * If put routine is able to find the record, it will set SUCCESS code and + * callback routine needs to put key-value to the given record and return + * success code. + * else put routine will set the ERROR code to indicate failure and it is + * expected to return ERROR code by callback routine. + */ static int btree_kv_put_cb(struct m0_btree_cb *cb, struct m0_btree_rec *rec) { struct m0_bufvec_cursor scur; @@ -6129,9 +6190,7 @@ static int btree_kv_put_cb(struct m0_btree_cb *cb, struct m0_btree_rec *rec) /** The caller can look at these flags if he needs to. */ datum->flags = rec->r_flags; - - if (rec->r_flags == M0_BSC_KEY_EXISTS) - return M0_BSC_KEY_EXISTS; + M0_ASSERT(datum->flags == M0_BSC_SUCCESS); ksize = m0_vec_count(&datum->key->k_data.ov_vec); M0_ASSERT(m0_vec_count(&rec->r_key.k_data.ov_vec) >= ksize); @@ -6199,7 +6258,7 @@ static int btree_kv_get_cb(struct m0_btree_cb *cb, struct m0_btree_rec *rec) v_off += vsize; } - /** + /** * If check_failed then maybe this entry was updated in which * case we use the complement of the key for comparison. */ @@ -6211,6 +6270,7 @@ static int btree_kv_get_cb(struct m0_btree_cb *cb, struct m0_btree_rec *rec) m0_bufvec_cursor_copyfrom(&vcur, &value, vsize); if (key != value) M0_ASSERT(0); + v_off += vsize; } } } @@ -6228,6 +6288,27 @@ static int btree_kv_del_cb(struct m0_btree_cb *cb, struct m0_btree_rec *rec) return rec->r_flags; } +static int btree_kv_update_cb(struct m0_btree_cb *cb, struct m0_btree_rec *rec) +{ + struct m0_bufvec_cursor scur; + struct m0_bufvec_cursor dcur; + m0_bcount_t vsize; + struct cb_data *datum = cb->c_datum; + + /** The caller can look at these flags if he needs to. */ + datum->flags = rec->r_flags; + M0_ASSERT(datum->flags == M0_BSC_SUCCESS); + + vsize = m0_vec_count(&datum->value->ov_vec); + M0_ASSERT(m0_vec_count(&rec->r_val.ov_vec) >= vsize); + + m0_bufvec_cursor_init(&scur, datum->value); + m0_bufvec_cursor_init(&dcur, &rec->r_val); + m0_bufvec_cursor_copy(&dcur, &scur, vsize); + + return 0; +} + /** * This unit test exercises the KV operations for both valid and invalid * conditions. @@ -6521,10 +6602,11 @@ static void ut_multi_stream_kv_oper(void) rc = m0_be_tx_open_sync(tx); M0_ASSERT(rc == 0); - M0_BTREE_OP_SYNC_WITH_RC(&kv_op, - m0_btree_put(tree, &rec, - &ut_cb, 0, - &kv_op, tx)); + rc = M0_BTREE_OP_SYNC_WITH_RC(&kv_op, + m0_btree_put(tree, &rec, + &ut_cb, 0, + &kv_op, tx)); + M0_ASSERT(rc == M0_BSC_SUCCESS); m0_be_tx_close_sync(tx); m0_be_tx_fini(tx); } @@ -6779,6 +6861,7 @@ static void btree_ut_kv_oper_thread_handler(struct btree_ut_thread_info *ti) int32_t r; uint64_t iter_dir; uint64_t del_key; + int rc; key_first = key_iter_start; if (ti->ti_random_bursts) { @@ -6818,19 +6901,45 @@ static void btree_ut_kv_oper_thread_handler(struct btree_ut_thread_info *ti) cred = M0_BE_TX_CB_CREDIT(0, 0, 0); btree_callback_credit(&cred); - M0_BTREE_OP_SYNC_WITH_RC(&kv_op, - m0_btree_put(tree, &rec, - &ut_cb, 0, - &kv_op, tx)); - M0_ASSERT(data.flags == M0_BSC_SUCCESS); + rc = M0_BTREE_OP_SYNC_WITH_RC(&kv_op, + m0_btree_put(tree, &rec, + &ut_cb, 0, + &kv_op, tx)); + M0_ASSERT(rc == M0_BSC_SUCCESS && + data.flags == M0_BSC_SUCCESS); keys_put_count++; key_first += ti->ti_key_incr; } -#if 0 - /** Modify at least 20% of the values which have been inserted. */ + /** + * Execute one error case where we PUT a key which already + * exists in the btree. + */ key_first = key_iter_start; + if ((key_last - key_first) > (ti->ti_key_incr * 2)) + key_first += ti->ti_key_incr; + key[0] = (key_first << (sizeof(ti->ti_thread_id) * 8)) + + ti->ti_thread_id; + key[0] = m0_byteorder_cpu_to_be64(key[0]); + for (i = 1; i < ARRAY_SIZE(key); i++) + key[i] = key[0]; + /** Skip initializing the value as this is an error case */ + + cred = M0_BE_TX_CB_CREDIT(0, 0, 0); + btree_callback_credit(&cred); + + rc = M0_BTREE_OP_SYNC_WITH_RC(&kv_op, + m0_btree_put(tree, &rec, &ut_cb, + 0, &kv_op, tx)); + M0_ASSERT(rc == M0_BSC_KEY_EXISTS); + + /** Modify at least 20% of the values which have been inserted. */ + + key_first = key_iter_start; + ut_cb.c_act = btree_kv_update_cb; + ut_cb.c_datum = &data; + while (key_first <= key_last) { /** * Embed the thread-id in LSB so that different threads @@ -6851,15 +6960,39 @@ static void btree_ut_kv_oper_thread_handler(struct btree_ut_thread_info *ti) cred = M0_BE_TX_CB_CREDIT(0, 0, 0); btree_callback_credit(&cred); - M0_BTREE_OP_SYNC_WITH_RC(&kv_op, - m0_btree_put(tree, &rec, - &ut_cb, 0, - &kv_op, tx)); - M0_ASSERT(data.flags == M0_BSC_SUCCESS); + rc = M0_BTREE_OP_SYNC_WITH_RC(&kv_op, + m0_btree_update(tree, + &rec, + &ut_cb, 0, + &kv_op, + tx)); + M0_ASSERT(rc == M0_BSC_SUCCESS && + data.flags == M0_BSC_SUCCESS); key_first += (ti->ti_key_incr * 5); } -#endif + + /** + * Execute one error case where we UPDATE a key that does not + * exist in the btree. + */ + key_first = key_iter_start; + key[0] = (key_first << (sizeof(ti->ti_thread_id) * 8)) + + (typeof(ti->ti_thread_id))-1; + key[0] = m0_byteorder_cpu_to_be64(key[0]); + for (i = 1; i < ARRAY_SIZE(key); i++) + key[i] = key[0]; + /** Skip initializing the value as this is an error case */ + + cred = M0_BE_TX_CB_CREDIT(0, 0, 0); + btree_callback_credit(&cred); + + rc = M0_BTREE_OP_SYNC_WITH_RC(&kv_op, + m0_btree_update(tree, &rec, + &ut_cb, 0, + &kv_op, tx)); + M0_ASSERT(rc == M0_BSC_KEY_NOT_FOUND); + /** GET and ITERATE over the keys which we inserted above. */ @@ -7352,7 +7485,8 @@ static void btree_ut_tree_oper_thread_handler(struct btree_ut_thread_info *ti) m0_btree_put(tree, &rec, &ut_cb, 0, &kv_op, tx)); - M0_ASSERT(data.flags == M0_BSC_SUCCESS && rc == 0); + M0_ASSERT(data.flags == M0_BSC_SUCCESS && + rc == M0_BSC_SUCCESS); } rc = M0_BTREE_OP_SYNC_WITH_RC(&b_op, @@ -7750,10 +7884,10 @@ static void ut_invariant_check(struct td *tree) } /** - * This ut will put records in the tree and delete those records in sequencial - * manner. + * This ut will put records in the tree then update and delete those records in + * sequencial manner. */ -static void ut_put_del_operation(void) +static void ut_put_update_del_operation(void) { struct m0_btree_type btree_type = {.tt_id = M0_BT_UT_KV_OPS, .ksize = 8, @@ -7820,13 +7954,61 @@ static void ut_put_del_operation(void) ut_cb.c_act = btree_kv_put_cb; ut_cb.c_datum = &put_data; - M0_BTREE_OP_SYNC_WITH_RC(&kv_op, + int rc = M0_BTREE_OP_SYNC_WITH_RC(&kv_op, m0_btree_put(tree, &rec, &ut_cb, 0, &kv_op, tx)); - if (put_data.flags == M0_BSC_KEY_EXISTS) { - printf("M0_BSC_KEY_EXISTS "); - } else { - M0_ASSERT(put_data.flags == M0_BSC_SUCCESS); + M0_ASSERT(rc == M0_BSC_SUCCESS); + + if (i >= total_records - 5) { + int rc = M0_BTREE_OP_SYNC_WITH_RC(&kv_op, + m0_btree_put(tree, + &rec, + &ut_cb, + 0, + &kv_op, + tx)); + M0_ASSERT(rc == M0_BSC_KEY_EXISTS); + } + + } + printf("level : %d\n", node_level(tree->t_desc->t_root)); + M0_ASSERT(node_expensive_invariant(tree->t_desc->t_root)); + + inc = false; + for (i = 0; i < 1000000; i++) { + uint64_t key; + uint64_t value; + struct cb_data update_data; + struct m0_btree_rec rec; + m0_bcount_t ksize = sizeof key; + m0_bcount_t vsize = sizeof value; + void *k_ptr = &key; + void *v_ptr = &value; + + /** + * There is a very low possibility of hitting the same key + * again. This is fine as it helps debug the code when insert + * is called with the same key instead of update function. + */ + int temp = inc ? total_records - i : i; + key = m0_byteorder_cpu_to_be64(temp); + value = m0_byteorder_cpu_to_be64(temp + 1); + + rec.r_key.k_data = M0_BUFVEC_INIT_BUF(&k_ptr, &ksize); + rec.r_val = M0_BUFVEC_INIT_BUF(&v_ptr, &vsize); + + update_data.key = &rec.r_key; + update_data.value = &rec.r_val; + + ut_cb.c_act = btree_kv_update_cb; + ut_cb.c_datum = &update_data; + + int rc = M0_BTREE_OP_SYNC_WITH_RC(&kv_op, + m0_btree_update(tree, &rec, &ut_cb, 0, + &kv_op, tx)); + if (rc) { + M0_ASSERT(rc == M0_BSC_KEY_NOT_FOUND); + printf("M0_BSC_KEY_NOT_FOUND "); } } @@ -7943,7 +8125,7 @@ struct m0_ut_suite btree_ut = { {"multi_thread_tree_op", ut_mt_tree_oper}, {"node_create_delete", ut_node_create_delete}, {"node_add_del_rec", ut_node_add_del_rec}, - /* {"btree_kv_add_del", ut_put_del_operation}, */ + /* {"btree_kv_add_upd_del", ut_put_update_del_operation}, */ {NULL, NULL} } }; diff --git a/btree/btree.h b/btree/btree.h index 816ffcdc3b0..efa9d99232b 100644 --- a/btree/btree.h +++ b/btree/btree.h @@ -27,6 +27,7 @@ #include "lib/types.h" #include "lib/vec.h" #include "xcode/xcode_attr.h" +#include "lib/errno.h" /** * @defgroup btree @@ -113,6 +114,7 @@ enum m0_btree_opcode { M0_BO_DESTROY, M0_BO_GET, M0_BO_PUT, + M0_BO_UPDATE, M0_BO_DEL, M0_BO_ITER, @@ -134,8 +136,8 @@ enum m0_btree_op_flags { */ enum m0_btree_status_codes { M0_BSC_SUCCESS = 0, - M0_BSC_KEY_EXISTS, - M0_BSC_KEY_NOT_FOUND, + M0_BSC_KEY_EXISTS = EEXIST, + M0_BSC_KEY_NOT_FOUND = ENOENT, M0_BSC_KEY_BTREE_BOUNDARY, }; diff --git a/sm/op.c b/sm/op.c index 6a83f067b39..4ba6bbad5de 100644 --- a/sm/op.c +++ b/sm/op.c @@ -118,7 +118,6 @@ bool m0_sm_op_tick(struct m0_sm_op *op) { int64_t result; bool wait; - bool fail = false; struct m0_sm_op_exec *ceo = op->o_ceo; M0_ASSERT(op_invariant(op)); @@ -142,7 +141,6 @@ bool m0_sm_op_tick(struct m0_sm_op *op) } else result = op->o_tick(op); if (result < 0) { - fail = true; op->o_sm.sm_rc = result; result = M0_SOS_DONE; } else if (result == M0_SMOP_SAME) { @@ -152,8 +150,8 @@ bool m0_sm_op_tick(struct m0_sm_op *op) } wait = (result & M0_SMOP_WAIT) != 0; M0_ASSERT(ergo(wait, ceo->oe_vec->eo_is_armed(ceo))); - if (fail) - m0_sm_state_and_rc_set(&op->o_sm, + if (op->o_sm.sm_rc != 0) + m0_sm_state_and_rc_set(&op->o_sm, result & ~M0_SMOP_WAIT, op->o_sm.sm_rc); else