diff --git a/integration_tests/2pc_test.go b/integration_tests/2pc_test.go index f3e91e0d77..61f105f6ac 100644 --- a/integration_tests/2pc_test.go +++ b/integration_tests/2pc_test.go @@ -42,6 +42,7 @@ import ( "fmt" "math" "math/rand" + "strconv" "sync" "sync/atomic" "testing" @@ -1731,7 +1732,7 @@ func (s *testCommitterSuite) TestFlagsInMemBufferMutations() { forEachCase(func(op kvrpcpb.Op, key []byte, value []byte, i int, isPessimisticLock, assertExist, assertNotExist bool) { handle := db.IterWithFlags(key, nil).Handle() - mutations.Push(op, isPessimisticLock, assertExist, assertNotExist, handle) + mutations.Push(op, isPessimisticLock, assertExist, assertNotExist, handle, nil) }) forEachCase(func(op kvrpcpb.Op, key []byte, value []byte, i int, isPessimisticLock, assertExist, assertNotExist bool) { @@ -1742,3 +1743,110 @@ func (s *testCommitterSuite) TestFlagsInMemBufferMutations() { s.Equal(assertNotExist, mutations.IsAssertNotExist(i)) }) } + +func (s *testCommitterSuite) TestSetLockedKeyValue() { + ctx := context.Background() + k1 := []byte("k1") + v1 := []byte("v1") + v2 := []byte("v2") + + mustLockKey := func(txn transaction.TxnProbe, key []byte) { + s.Require().NoError(txn.LockKeys(ctx, &kv.LockCtx{ForUpdateTS: txn.StartTS(), WaitStartTime: time.Now()}, key)) + } + checkByOpVals := func(opVals ...interface{}) func(m transaction.CommitterMutations) { + s.Require().Equal(0, len(opVals)%2) + return func(m transaction.CommitterMutations) { + s.Require().Equal(m.Len(), len(opVals)/2) + for i := 0; i < len(opVals); i += 2 { + s.Require().Equal(opVals[i], m.GetOp(0)) + if opVals[i+1] == nil { + s.Require().Nil(m.GetValue(0)) + } else { + s.Require().Equal(opVals[i+1], m.GetValue(0)) + } + } + } + } + + for _, tt := range []struct { + name string + actions []func(txn transaction.TxnProbe) + checkPessimistic func(m transaction.CommitterMutations) + checkOptimisitc func(m transaction.CommitterMutations) + }{ + { + "NoLock", + []func(txn transaction.TxnProbe){ + func(txn transaction.TxnProbe) { txn.SetLockedKeyValue(k1, v1) }, + }, + checkByOpVals(), + checkByOpVals(), + }, + { + "LockOnly", + []func(txn transaction.TxnProbe){ + func(txn transaction.TxnProbe) { txn.SetLockedKeyValue(k1, v1) }, + func(txn transaction.TxnProbe) { mustLockKey(txn, k1) }, + }, + checkByOpVals(kvrpcpb.Op_Put, v1), + checkByOpVals(kvrpcpb.Op_Lock, nil), + }, + { + "LockAndSet", + []func(txn transaction.TxnProbe){ + func(txn transaction.TxnProbe) { txn.SetLockedKeyValue(k1, v1) }, + func(txn transaction.TxnProbe) { mustLockKey(txn, k1) }, + func(txn transaction.TxnProbe) { s.Require().NoError(txn.Set(k1, v2)) }, + }, + checkByOpVals(kvrpcpb.Op_Put, v2), + checkByOpVals(kvrpcpb.Op_Put, v2), + }, + { + "LockAndDelete", + []func(txn transaction.TxnProbe){ + func(txn transaction.TxnProbe) { txn.SetLockedKeyValue(k1, v1) }, + func(txn transaction.TxnProbe) { mustLockKey(txn, k1) }, + func(txn transaction.TxnProbe) { s.Require().NoError(txn.Delete(k1)) }, + }, + checkByOpVals(kvrpcpb.Op_Del, []byte{}), + checkByOpVals(kvrpcpb.Op_Del, []byte{}), + }, + } { + var testAll func(name string, state []bool, actions []func(txn transaction.TxnProbe)) + testAll = func(name string, state []bool, actions []func(txn transaction.TxnProbe)) { + if len(actions) == len(tt.actions) { + s.Run("Pessimistic"+name, func() { + txn := s.begin() + txn.SetPessimistic(true) + for _, action := range actions { + action(txn) + } + c, err := txn.NewCommitter(1) + s.Require().NoError(err) + tt.checkPessimistic(c.GetMutations()) + s.Require().NoError(txn.Rollback()) + }) + s.Run("Optimistic"+name, func() { + txn := s.begin() + for _, action := range actions { + action(txn) + } + c, err := txn.NewCommitter(1) + s.Require().NoError(err) + tt.checkOptimisitc(c.GetMutations()) + s.Require().NoError(txn.Rollback()) + }) + return + } + for i, used := range state { + if used { + continue + } + state[i] = true + testAll(name+"-"+strconv.Itoa(i), state, append(actions, tt.actions[i])) + state[i] = false + } + } + testAll(tt.name, make([]bool, len(tt.actions)), nil) + } +} diff --git a/txnkv/transaction/2pc.go b/txnkv/transaction/2pc.go index 04231c413e..d6108130fa 100644 --- a/txnkv/transaction/2pc.go +++ b/txnkv/transaction/2pc.go @@ -185,6 +185,8 @@ type memBufferMutations struct { // MSB LSB // [13 bits: Op][1 bit: assertNotExist][1 bit: assertExist][1 bit: isPessimisticLock] handles []unionstore.MemKeyHandle + // overlay of mutation values + overlay map[unionstore.MemKeyHandle][]byte } func newMemBufferMutations(sizeHint int, storage *unionstore.MemDB) *memBufferMutations { @@ -211,7 +213,13 @@ func (m *memBufferMutations) GetKeys() [][]byte { } func (m *memBufferMutations) GetValue(i int) []byte { - v, _ := m.storage.GetValueByHandle(m.handles[i]) + h := m.handles[i] + if m.overlay != nil { + if v, ok := m.overlay[h]; ok { + return v + } + } + v, _ := m.storage.GetValueByHandle(h) return v } @@ -235,10 +243,11 @@ func (m *memBufferMutations) Slice(from, to int) CommitterMutations { return &memBufferMutations{ handles: m.handles[from:to], storage: m.storage, + overlay: m.overlay, } } -func (m *memBufferMutations) Push(op kvrpcpb.Op, isPessimisticLock, assertExist, assertNotExist bool, handle unionstore.MemKeyHandle) { +func (m *memBufferMutations) Push(op kvrpcpb.Op, isPessimisticLock, assertExist, assertNotExist bool, handle unionstore.MemKeyHandle, value []byte) { // See comments of `m.handles` field about the format of the user data `aux`. aux := uint16(op) << 3 if isPessimisticLock { @@ -252,6 +261,18 @@ func (m *memBufferMutations) Push(op kvrpcpb.Op, isPessimisticLock, assertExist, } handle.UserData = aux m.handles = append(m.handles, handle) + if len(value) > 0 { + if op != kvrpcpb.Op_Put { + panic("op must be PUT when pushing with value") + } + if !isPessimisticLock { + panic("key must be locked when pushing with value") + } + if m.overlay == nil { + m.overlay = make(map[unionstore.MemKeyHandle][]byte) + } + m.overlay[handle] = value + } } // CommitterMutationFlags represents various bit flags of mutations. @@ -493,7 +514,7 @@ func (c *twoPhaseCommitter) checkSchemaOnAssertionFail(ctx context.Context, asse } func (c *twoPhaseCommitter) initKeysAndMutations(ctx context.Context) error { - var size, putCnt, delCnt, lockCnt, checkCnt int + var size, putCnt, delCnt, lockCnt, checkCnt, putFromLockCnt int txn := c.txn memBuf := txn.GetMemBuffer() @@ -508,15 +529,25 @@ func (c *twoPhaseCommitter) initKeysAndMutations(ctx context.Context) error { _ = err key := it.Key() flags := it.Flags() - var value []byte - var op kvrpcpb.Op + var ( + value []byte + cachedValue []byte = nil + op kvrpcpb.Op + ) if !it.HasValue() { if !flags.HasLocked() { continue } - op = kvrpcpb.Op_Lock - lockCnt++ + if val, ok := txn.getValueByLockedKey(key); ok && len(val) > 0 && c.isPessimistic { + // Change the LOCK into PUT if the value of this key has a cached value. + cachedValue = val + op = kvrpcpb.Op_Put + putFromLockCnt++ + } else { + op = kvrpcpb.Op_Lock + lockCnt++ + } } else { value = it.Value() var isUnnecessaryKV bool @@ -581,8 +612,8 @@ func (c *twoPhaseCommitter) initKeysAndMutations(ctx context.Context) error { if c.txn.schemaAmender != nil || c.txn.assertionLevel == kvrpcpb.AssertionLevel_Off { mustExist, mustNotExist, hasAssertUnknown = false, false, false } - c.mutations.Push(op, isPessimistic, mustExist, mustNotExist, it.Handle()) - size += len(key) + len(value) + c.mutations.Push(op, isPessimistic, mustExist, mustNotExist, it.Handle(), cachedValue) + size += len(key) + len(value) + len(cachedValue) if c.txn.assertionLevel != kvrpcpb.AssertionLevel_Off { // Check mutations for pessimistic-locked keys with the read results of pessimistic lock requests. @@ -635,6 +666,7 @@ func (c *twoPhaseCommitter) initKeysAndMutations(ctx context.Context) error { zap.Int("dels", delCnt), zap.Int("locks", lockCnt), zap.Int("checks", checkCnt), + zap.Int("putsFromLocks", putFromLockCnt), zap.Uint64("txnStartTS", txn.startTS)) } @@ -1758,7 +1790,7 @@ func (c *twoPhaseCommitter) tryAmendTxn(ctx context.Context, startInfoSchema Sch return false, err } handle := c.txn.GetMemBuffer().IterWithFlags(key, nil).Handle() - c.mutations.Push(op, addMutations.IsPessimisticLock(i), addMutations.IsAssertExists(i), addMutations.IsAssertNotExist(i), handle) + c.mutations.Push(op, addMutations.IsPessimisticLock(i), addMutations.IsAssertExists(i), addMutations.IsAssertNotExist(i), handle, nil) } } return false, nil diff --git a/txnkv/transaction/txn.go b/txnkv/transaction/txn.go index 22da179423..a03ce0214e 100644 --- a/txnkv/transaction/txn.go +++ b/txnkv/transaction/txn.go @@ -85,11 +85,14 @@ type KVTxn struct { startTS uint64 startTime time.Time // Monotonic timestamp for recording txn time consuming. commitTS uint64 - mu sync.Mutex // For thread-safe LockKeys function. + mu sync.Mutex // For thread-safe LockKeys, SetLockedKeyValue functions. setCnt int64 vars *tikv.Variables committer *twoPhaseCommitter lockedCnt int + // lockedKV is used to cache kv pairs that have been locked, the 2pc committer will read this map when init + // mutations, convert lock into put if needed. + lockedKVs map[string][]byte valid bool @@ -749,6 +752,27 @@ func (txn *KVTxn) LockKeys(ctx context.Context, lockCtx *tikv.LockCtx, keysInput return nil } +// SetLockedKeyValue caches a key-value pair whose key has been locked. Those key-value pairs may be turned to PUT +// record if possible. +func (txn *KVTxn) SetLockedKeyValue(key []byte, value []byte) { + txn.mu.Lock() + if txn.lockedKVs == nil { + txn.lockedKVs = make(map[string][]byte) + } + txn.lockedKVs[string(key)] = value + txn.mu.Unlock() +} + +// getValueByLockedKey returns the cached value of the given locked key. +func (txn *KVTxn) getValueByLockedKey(key []byte) (value []byte, ok bool) { + txn.mu.Lock() + if txn.lockedKVs != nil { + value, ok = txn.lockedKVs[string(key)] + } + txn.mu.Unlock() + return +} + // deduplicateKeys deduplicate the keys, it use sort instead of map to avoid memory allocation. func deduplicateKeys(keys [][]byte) [][]byte { sort.Slice(keys, func(i, j int) bool {