diff --git a/go.mod b/go.mod index 1c866c7e92..2b9529aedf 100644 --- a/go.mod +++ b/go.mod @@ -15,6 +15,7 @@ require ( github.com/google/uuid v1.1.1 github.com/grpc-ecosystem/go-grpc-middleware v1.1.0 github.com/opentracing/opentracing-go v1.1.0 + github.com/pingcap/errors v0.11.5-0.20201126102027-b0a155152ca3 github.com/pingcap/failpoint v0.0.0-20210316064728-7acb0f0a3dfd github.com/pingcap/goleveldb v0.0.0-20191226122134-f82aafb29989 github.com/pingcap/kvproto v0.0.0-20210915062418-0f5764a128ad diff --git a/integration_tests/2pc_test.go b/integration_tests/2pc_test.go index f28bbb29e3..c2e1d11e44 100644 --- a/integration_tests/2pc_test.go +++ b/integration_tests/2pc_test.go @@ -1550,3 +1550,51 @@ func (s *testCommitterSuite) TestCommitMultipleRegions() { } s.mustCommit(m) } + +func (s *testCommitterSuite) TestNewlyInsertedMemDBFlag() { + ctx := context.Background() + txn := s.begin() + memdb := txn.GetMemBuffer() + k0 := []byte("k0") + v0 := []byte("v0") + k1 := []byte("k1") + k2 := []byte("k2") + v1 := []byte("v1") + v2 := []byte("v2") + + // Insert after delete, the newly inserted flag should not exist. + err := txn.Delete(k0) + s.Nil(err) + err = txn.Set(k0, v0) + s.Nil(err) + flags, err := memdb.GetFlags(k0) + s.Nil(err) + s.False(flags.HasNewlyInserted()) + + // Lock then insert, the newly inserted flag should exist. + lockCtx := &kv.LockCtx{ForUpdateTS: txn.StartTS(), WaitStartTime: time.Now()} + err = txn.LockKeys(context.Background(), lockCtx, k1) + s.Nil(err) + err = txn.Set(k1, v1) + s.Nil(err) + flags, err = memdb.GetFlags(k1) + s.Nil(err) + s.True(flags.HasNewlyInserted()) + + // Lock then delete and insert, the newly inserted flag should not exist. + err = txn.LockKeys(ctx, lockCtx, k2) + s.Nil(err) + err = txn.Delete(k2) + s.Nil(err) + flags, err = memdb.GetFlags(k2) + s.Nil(err) + s.False(flags.HasNewlyInserted()) + err = txn.Set(k2, v2) + s.Nil(err) + flags, err = memdb.GetFlags(k2) + s.Nil(err) + s.False(flags.HasNewlyInserted()) + + err = txn.Commit(ctx) + s.Nil(err) +} diff --git a/internal/unionstore/memdb.go b/internal/unionstore/memdb.go index cd0703e060..1348922593 100644 --- a/internal/unionstore/memdb.go +++ b/internal/unionstore/memdb.go @@ -188,7 +188,7 @@ func (db *MemDB) Get(key []byte) ([]byte, error) { panic("vlog is resetted") } - x := db.traverse(key, false) + x, _ := db.traverse(key, false) if x.isNull() { return nil, tikverr.ErrNotExist } @@ -201,7 +201,7 @@ func (db *MemDB) Get(key []byte) ([]byte, error) { // SelectValueHistory select the latest value which makes `predicate` returns true from the modification history. func (db *MemDB) SelectValueHistory(key []byte, predicate func(value []byte) bool) ([]byte, error) { - x := db.traverse(key, false) + x, _ := db.traverse(key, false) if x.isNull() { return nil, tikverr.ErrNotExist } @@ -220,7 +220,7 @@ func (db *MemDB) SelectValueHistory(key []byte, predicate func(value []byte) boo // GetFlags returns the latest flags associated with key. func (db *MemDB) GetFlags(key []byte) (kv.KeyFlags, error) { - x := db.traverse(key, false) + x, _ := db.traverse(key, false) if x.isNull() { return 0, tikverr.ErrNotExist } @@ -314,7 +314,7 @@ func (db *MemDB) set(key []byte, value []byte, ops ...kv.FlagsOp) error { if len(db.stages) == 0 { db.dirty = true } - x := db.traverse(key, true) + x, isNewNode := db.traverse(key, true) if len(ops) != 0 { flags := kv.ApplyFlagsOps(x.getKeyFlags(), ops...) @@ -328,6 +328,18 @@ func (db *MemDB) set(key []byte, value []byte, ops ...kv.FlagsOp) error { return nil } + // Mark the node for these two situations: + // 1. The node is created by an insert operation and there's no old value in the memdb and storage before. + // 2. The node is locked and then inserted. + // The node marked with `SetNewlyInserted` flag could be turned into an `Op_Lock` operation or discarded + // in the commit mutation generation phase, it's used to optimize the "delete-your-write" case so no dangling + // delete record, check https://github.com/pingcap/tidb/issues/27564 for details. + if len(value) > 0 { + if isNewNode || (x.vptr.isNull() && x.getKeyFlags().HasLocked()) { + x.setKeyFlags(kv.ApplyFlagsOps(x.getKeyFlags(), kv.SetNewlyInserted)) + } + } + db.setValue(x, value) if uint64(db.Size()) > db.bufferSizeLimit { return &tikverr.ErrTxnTooLarge{Size: db.Size()} @@ -360,7 +372,7 @@ func (db *MemDB) setValue(x memdbNodeAddr, value []byte) { // traverse search for and if not found and insert is true, will add a new node in. // Returns a pointer to the new node, or the node found. -func (db *MemDB) traverse(key []byte, insert bool) memdbNodeAddr { +func (db *MemDB) traverse(key []byte, insert bool) (memdbNodeAddr, bool) { x := db.getRoot() y := memdbNodeAddr{nil, nullAddr} found := false @@ -379,7 +391,7 @@ func (db *MemDB) traverse(key []byte, insert bool) memdbNodeAddr { } if found || !insert { - return x + return x, false } z := db.allocNode(key) @@ -471,7 +483,7 @@ func (db *MemDB) traverse(key []byte, insert bool) memdbNodeAddr { // Set the root node black db.getRoot().setBlack() - return z + return z, true } // diff --git a/internal/unionstore/memdb_snapshot.go b/internal/unionstore/memdb_snapshot.go index 6d17bf6102..d30249551c 100644 --- a/internal/unionstore/memdb_snapshot.go +++ b/internal/unionstore/memdb_snapshot.go @@ -73,7 +73,7 @@ type memdbSnapGetter struct { } func (snap *memdbSnapGetter) Get(key []byte) ([]byte, error) { - x := snap.db.traverse(key, false) + x, _ := snap.db.traverse(key, false) if x.isNull() { return nil, tikverr.ErrNotExist } diff --git a/internal/unionstore/memdb_test.go b/internal/unionstore/memdb_test.go index d5d6824070..930160aeb1 100644 --- a/internal/unionstore/memdb_test.go +++ b/internal/unionstore/memdb_test.go @@ -41,9 +41,11 @@ import ( "fmt" "testing" + "github.com/pingcap/errors" leveldb "github.com/pingcap/goleveldb/leveldb/memdb" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" + tikverr "github.com/tikv/client-go/v2/error" "github.com/tikv/client-go/v2/kv" ) @@ -55,7 +57,7 @@ func init() { // DeleteKey is used in test to verify the `deleteNode` used in `vlog.revertToCheckpoint`. func (db *MemDB) DeleteKey(key []byte) { - x := db.traverse(key, false) + x, _ := db.traverse(key, false) if x.isNull() { return } @@ -838,3 +840,95 @@ func TestBufferLimit(t *testing.T) { err = buffer.Delete(make([]byte, 500)) assert.NotNil(err) } + +func mustPut(t *testing.T, db *MemDB, key []byte, value []byte) { + assertions := assert.New(t) + err := db.set(key, value) + assertions.Nil(err) + + retV1, err := db.Get(key) + assertions.Nil(err) + assertions.Equal(value, retV1) +} + +func mustDelete(t *testing.T, db *MemDB, key []byte) { + assertions := assert.New(t) + err := db.Delete(key) + assertions.Nil(err) + + retV1, err := db.Get(key) + assertions.Nil(err) + assertions.Equal(tombstone, retV1) +} + +func mustGetNone(t *testing.T, db *MemDB, key []byte) { + assertions := assert.New(t) + _, err := db.Get(key) + assertions.Equal(errors.Cause(err), tikverr.ErrNotExist) +} + +func mustGetVal(t *testing.T, db *MemDB, key []byte, value []byte) { + assertions := assert.New(t) + val, err := db.Get(key) + assertions.Nil(err) + assertions.Equal(value, val) +} + +func mustNewlyInsertedFlag(t *testing.T, db *MemDB, key []byte, exist bool) { + assertions := assert.New(t) + flags, err := db.GetFlags(key) + assertions.Nil(err) + if exist { + assertions.True(flags.HasNewlyInserted()) + } else { + assertions.False(flags.HasNewlyInserted()) + } +} + +func TestNewlyInsertedFlag(t *testing.T) { + db := newMemDB() + h := db.Staging() + + // Delete the key after put, the NewlyInserted Flag should be kept. + k1 := []byte("x") + k2 := []byte("y") + k3 := []byte("z") + v1 := make([]byte, 2) + v2 := make([]byte, 3) + v3 := make([]byte, 4) + mustPut(t, db, k1, v1) + mustNewlyInsertedFlag(t, db, k1, true) + mustDelete(t, db, k1) + mustNewlyInsertedFlag(t, db, k1, true) + db.Cleanup(h) + + // Put after delete, there should be no newly inserted flag. + h1 := db.Staging() + mustGetNone(t, db, k1) + mustDelete(t, db, k1) + mustGetVal(t, db, k1, tombstone) + mustPut(t, db, k1, v1) + mustNewlyInsertedFlag(t, db, k1, false) + mustPut(t, db, k1, v2) + mustNewlyInsertedFlag(t, db, k1, false) + db.Release(h1) + + h2 := db.Staging() + mustGetVal(t, db, k1, v2) + mustDelete(t, db, k1) + mustNewlyInsertedFlag(t, db, k1, false) + mustDelete(t, db, k2) + mustNewlyInsertedFlag(t, db, k2, false) + mustPut(t, db, k2, v2) + mustNewlyInsertedFlag(t, db, k2, false) + mustPut(t, db, k3, v3) + mustNewlyInsertedFlag(t, db, k3, true) + mustPut(t, db, k3, v2) + mustNewlyInsertedFlag(t, db, k3, true) + mustDelete(t, db, k3) + mustNewlyInsertedFlag(t, db, k3, true) + db.Release(h2) + + mustNewlyInsertedFlag(t, db, k3, true) + mustNewlyInsertedFlag(t, db, k2, false) +} diff --git a/kv/keyflags.go b/kv/keyflags.go index 133e5079eb..8bbba0440f 100644 --- a/kv/keyflags.go +++ b/kv/keyflags.go @@ -49,6 +49,7 @@ const ( flagPrewriteOnly flagIgnoredIn2PC flagReadable + flagNewlyInserted persistentFlags = flagKeyLocked | flagKeyLockedValExist ) @@ -98,6 +99,11 @@ func (f KeyFlags) AndPersistent() KeyFlags { return f & persistentFlags } +// HasNewlyInserted returns whether the in-transaction key is generated by an "insert" operation. +func (f KeyFlags) HasNewlyInserted() bool { + return f&flagNewlyInserted != 0 +} + // ApplyFlagsOps applys flagspos to origin. func ApplyFlagsOps(origin KeyFlags, ops ...FlagsOp) KeyFlags { for _, op := range ops { @@ -126,6 +132,8 @@ func ApplyFlagsOps(origin KeyFlags, ops ...FlagsOp) KeyFlags { origin |= flagIgnoredIn2PC case SetReadable: origin |= flagReadable + case SetNewlyInserted: + origin |= flagNewlyInserted } } return origin @@ -160,4 +168,6 @@ const ( SetIgnoredIn2PC // SetReadable marks the key is readable by in-transaction read. SetReadable + // SetNewlyInserted marks the key is newly inserted with value length greater than zero. + SetNewlyInserted ) diff --git a/txnkv/transaction/2pc.go b/txnkv/transaction/2pc.go index 229f9de914..b53524dc14 100644 --- a/txnkv/transaction/2pc.go +++ b/txnkv/transaction/2pc.go @@ -438,10 +438,21 @@ func (c *twoPhaseCommitter) initKeysAndMutations() error { checkCnt++ memBuf.UpdateFlags(key, kv.SetPrewriteOnly) } else { - // normal delete keys in optimistic txn can be delete without not exists checking - // delete-your-writes keys in pessimistic txn can ensure must be no exists so can directly delete them - op = kvrpcpb.Op_Del - delCnt++ + if flags.HasNewlyInserted() { + // The delete-your-write keys in pessimistic transactions, only lock needed keys and skip + // other deletes for example the secondary index delete. + // Here if `tidb_constraint_check_in_place` is enabled and the transaction is in optimistic mode, + // the logic is same as the pessimistic mode. + if flags.HasLocked() { + op = kvrpcpb.Op_Lock + lockCnt++ + } else { + continue + } + } else { + op = kvrpcpb.Op_Del + delCnt++ + } } } }