Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

txn: add newly inserted flag for memdb key node #369

Closed
wants to merge 1 commit into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ require (
github.com/google/uuid v1.1.1
github.com/grpc-ecosystem/go-grpc-middleware v1.1.0
github.com/opentracing/opentracing-go v1.1.0
github.com/pingcap/errors v0.11.5-0.20201126102027-b0a155152ca3
github.com/pingcap/failpoint v0.0.0-20210316064728-7acb0f0a3dfd
github.com/pingcap/goleveldb v0.0.0-20191226122134-f82aafb29989
github.com/pingcap/kvproto v0.0.0-20210915062418-0f5764a128ad
Expand Down
48 changes: 48 additions & 0 deletions integration_tests/2pc_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1550,3 +1550,51 @@ func (s *testCommitterSuite) TestCommitMultipleRegions() {
}
s.mustCommit(m)
}

func (s *testCommitterSuite) TestNewlyInsertedMemDBFlag() {
ctx := context.Background()
txn := s.begin()
memdb := txn.GetMemBuffer()
k0 := []byte("k0")
v0 := []byte("v0")
k1 := []byte("k1")
k2 := []byte("k2")
v1 := []byte("v1")
v2 := []byte("v2")

// Insert after delete, the newly inserted flag should not exist.
err := txn.Delete(k0)
s.Nil(err)
err = txn.Set(k0, v0)
s.Nil(err)
flags, err := memdb.GetFlags(k0)
s.Nil(err)
s.False(flags.HasNewlyInserted())

// Lock then insert, the newly inserted flag should exist.
lockCtx := &kv.LockCtx{ForUpdateTS: txn.StartTS(), WaitStartTime: time.Now()}
err = txn.LockKeys(context.Background(), lockCtx, k1)
s.Nil(err)
err = txn.Set(k1, v1)
s.Nil(err)
flags, err = memdb.GetFlags(k1)
s.Nil(err)
s.True(flags.HasNewlyInserted())

// Lock then delete and insert, the newly inserted flag should not exist.
err = txn.LockKeys(ctx, lockCtx, k2)
s.Nil(err)
err = txn.Delete(k2)
s.Nil(err)
flags, err = memdb.GetFlags(k2)
s.Nil(err)
s.False(flags.HasNewlyInserted())
err = txn.Set(k2, v2)
s.Nil(err)
flags, err = memdb.GetFlags(k2)
s.Nil(err)
s.False(flags.HasNewlyInserted())

err = txn.Commit(ctx)
s.Nil(err)
}
26 changes: 19 additions & 7 deletions internal/unionstore/memdb.go
Original file line number Diff line number Diff line change
Expand Up @@ -188,7 +188,7 @@ func (db *MemDB) Get(key []byte) ([]byte, error) {
panic("vlog is resetted")
}

x := db.traverse(key, false)
x, _ := db.traverse(key, false)
if x.isNull() {
return nil, tikverr.ErrNotExist
}
Expand All @@ -201,7 +201,7 @@ func (db *MemDB) Get(key []byte) ([]byte, error) {

// SelectValueHistory select the latest value which makes `predicate` returns true from the modification history.
func (db *MemDB) SelectValueHistory(key []byte, predicate func(value []byte) bool) ([]byte, error) {
x := db.traverse(key, false)
x, _ := db.traverse(key, false)
if x.isNull() {
return nil, tikverr.ErrNotExist
}
Expand All @@ -220,7 +220,7 @@ func (db *MemDB) SelectValueHistory(key []byte, predicate func(value []byte) boo

// GetFlags returns the latest flags associated with key.
func (db *MemDB) GetFlags(key []byte) (kv.KeyFlags, error) {
x := db.traverse(key, false)
x, _ := db.traverse(key, false)
if x.isNull() {
return 0, tikverr.ErrNotExist
}
Expand Down Expand Up @@ -314,7 +314,7 @@ func (db *MemDB) set(key []byte, value []byte, ops ...kv.FlagsOp) error {
if len(db.stages) == 0 {
db.dirty = true
}
x := db.traverse(key, true)
x, isNewNode := db.traverse(key, true)

if len(ops) != 0 {
flags := kv.ApplyFlagsOps(x.getKeyFlags(), ops...)
Expand All @@ -328,6 +328,18 @@ func (db *MemDB) set(key []byte, value []byte, ops ...kv.FlagsOp) error {
return nil
}

// Mark the node for these two situations:
// 1. The node is created by an insert operation and there's no old value in the memdb and storage before.
// 2. The node is locked and then inserted.
// The node marked with `SetNewlyInserted` flag could be turned into an `Op_Lock` operation or discarded
// in the commit mutation generation phase, it's used to optimize the "delete-your-write" case so no dangling
// delete record, check https://github.com/pingcap/tidb/issues/27564 for details.
if len(value) > 0 {
if isNewNode || (x.vptr.isNull() && x.getKeyFlags().HasLocked()) {
x.setKeyFlags(kv.ApplyFlagsOps(x.getKeyFlags(), kv.SetNewlyInserted))
}
}

db.setValue(x, value)
if uint64(db.Size()) > db.bufferSizeLimit {
return &tikverr.ErrTxnTooLarge{Size: db.Size()}
Expand Down Expand Up @@ -360,7 +372,7 @@ func (db *MemDB) setValue(x memdbNodeAddr, value []byte) {

// traverse search for and if not found and insert is true, will add a new node in.
// Returns a pointer to the new node, or the node found.
func (db *MemDB) traverse(key []byte, insert bool) memdbNodeAddr {
func (db *MemDB) traverse(key []byte, insert bool) (memdbNodeAddr, bool) {
x := db.getRoot()
y := memdbNodeAddr{nil, nullAddr}
found := false
Expand All @@ -379,7 +391,7 @@ func (db *MemDB) traverse(key []byte, insert bool) memdbNodeAddr {
}

if found || !insert {
return x
return x, false
}

z := db.allocNode(key)
Expand Down Expand Up @@ -471,7 +483,7 @@ func (db *MemDB) traverse(key []byte, insert bool) memdbNodeAddr {
// Set the root node black
db.getRoot().setBlack()

return z
return z, true
}

//
Expand Down
2 changes: 1 addition & 1 deletion internal/unionstore/memdb_snapshot.go
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,7 @@ type memdbSnapGetter struct {
}

func (snap *memdbSnapGetter) Get(key []byte) ([]byte, error) {
x := snap.db.traverse(key, false)
x, _ := snap.db.traverse(key, false)
if x.isNull() {
return nil, tikverr.ErrNotExist
}
Expand Down
96 changes: 95 additions & 1 deletion internal/unionstore/memdb_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,9 +41,11 @@ import (
"fmt"
"testing"

"github.com/pingcap/errors"
leveldb "github.com/pingcap/goleveldb/leveldb/memdb"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
tikverr "github.com/tikv/client-go/v2/error"
"github.com/tikv/client-go/v2/kv"
)

Expand All @@ -55,7 +57,7 @@ func init() {

// DeleteKey is used in test to verify the `deleteNode` used in `vlog.revertToCheckpoint`.
func (db *MemDB) DeleteKey(key []byte) {
x := db.traverse(key, false)
x, _ := db.traverse(key, false)
if x.isNull() {
return
}
Expand Down Expand Up @@ -838,3 +840,95 @@ func TestBufferLimit(t *testing.T) {
err = buffer.Delete(make([]byte, 500))
assert.NotNil(err)
}

func mustPut(t *testing.T, db *MemDB, key []byte, value []byte) {
assertions := assert.New(t)
err := db.set(key, value)
assertions.Nil(err)

retV1, err := db.Get(key)
assertions.Nil(err)
assertions.Equal(value, retV1)
}

func mustDelete(t *testing.T, db *MemDB, key []byte) {
assertions := assert.New(t)
err := db.Delete(key)
assertions.Nil(err)

retV1, err := db.Get(key)
assertions.Nil(err)
assertions.Equal(tombstone, retV1)
}

func mustGetNone(t *testing.T, db *MemDB, key []byte) {
assertions := assert.New(t)
_, err := db.Get(key)
assertions.Equal(errors.Cause(err), tikverr.ErrNotExist)
}

func mustGetVal(t *testing.T, db *MemDB, key []byte, value []byte) {
assertions := assert.New(t)
val, err := db.Get(key)
assertions.Nil(err)
assertions.Equal(value, val)
}

func mustNewlyInsertedFlag(t *testing.T, db *MemDB, key []byte, exist bool) {
assertions := assert.New(t)
flags, err := db.GetFlags(key)
assertions.Nil(err)
if exist {
assertions.True(flags.HasNewlyInserted())
} else {
assertions.False(flags.HasNewlyInserted())
}
}

func TestNewlyInsertedFlag(t *testing.T) {
db := newMemDB()
h := db.Staging()

// Delete the key after put, the NewlyInserted Flag should be kept.
k1 := []byte("x")
k2 := []byte("y")
k3 := []byte("z")
v1 := make([]byte, 2)
v2 := make([]byte, 3)
v3 := make([]byte, 4)
mustPut(t, db, k1, v1)
mustNewlyInsertedFlag(t, db, k1, true)
mustDelete(t, db, k1)
mustNewlyInsertedFlag(t, db, k1, true)
db.Cleanup(h)

// Put after delete, there should be no newly inserted flag.
h1 := db.Staging()
mustGetNone(t, db, k1)
mustDelete(t, db, k1)
mustGetVal(t, db, k1, tombstone)
mustPut(t, db, k1, v1)
mustNewlyInsertedFlag(t, db, k1, false)
mustPut(t, db, k1, v2)
mustNewlyInsertedFlag(t, db, k1, false)
db.Release(h1)

h2 := db.Staging()
mustGetVal(t, db, k1, v2)
mustDelete(t, db, k1)
mustNewlyInsertedFlag(t, db, k1, false)
mustDelete(t, db, k2)
mustNewlyInsertedFlag(t, db, k2, false)
mustPut(t, db, k2, v2)
mustNewlyInsertedFlag(t, db, k2, false)
mustPut(t, db, k3, v3)
mustNewlyInsertedFlag(t, db, k3, true)
mustPut(t, db, k3, v2)
mustNewlyInsertedFlag(t, db, k3, true)
mustDelete(t, db, k3)
mustNewlyInsertedFlag(t, db, k3, true)
db.Release(h2)

mustNewlyInsertedFlag(t, db, k3, true)
mustNewlyInsertedFlag(t, db, k2, false)
}
10 changes: 10 additions & 0 deletions kv/keyflags.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@ const (
flagPrewriteOnly
flagIgnoredIn2PC
flagReadable
flagNewlyInserted

persistentFlags = flagKeyLocked | flagKeyLockedValExist
)
Expand Down Expand Up @@ -98,6 +99,11 @@ func (f KeyFlags) AndPersistent() KeyFlags {
return f & persistentFlags
}

// HasNewlyInserted returns whether the in-transaction key is generated by an "insert" operation.
func (f KeyFlags) HasNewlyInserted() bool {
return f&flagNewlyInserted != 0
}

// ApplyFlagsOps applys flagspos to origin.
func ApplyFlagsOps(origin KeyFlags, ops ...FlagsOp) KeyFlags {
for _, op := range ops {
Expand Down Expand Up @@ -126,6 +132,8 @@ func ApplyFlagsOps(origin KeyFlags, ops ...FlagsOp) KeyFlags {
origin |= flagIgnoredIn2PC
case SetReadable:
origin |= flagReadable
case SetNewlyInserted:
origin |= flagNewlyInserted
}
}
return origin
Expand Down Expand Up @@ -160,4 +168,6 @@ const (
SetIgnoredIn2PC
// SetReadable marks the key is readable by in-transaction read.
SetReadable
// SetNewlyInserted marks the key is newly inserted with value length greater than zero.
SetNewlyInserted
)
19 changes: 15 additions & 4 deletions txnkv/transaction/2pc.go
Original file line number Diff line number Diff line change
Expand Up @@ -438,10 +438,21 @@ func (c *twoPhaseCommitter) initKeysAndMutations() error {
checkCnt++
memBuf.UpdateFlags(key, kv.SetPrewriteOnly)
} else {
// normal delete keys in optimistic txn can be delete without not exists checking
// delete-your-writes keys in pessimistic txn can ensure must be no exists so can directly delete them
op = kvrpcpb.Op_Del
delCnt++
if flags.HasNewlyInserted() {
// The delete-your-write keys in pessimistic transactions, only lock needed keys and skip
// other deletes for example the secondary index delete.
// Here if `tidb_constraint_check_in_place` is enabled and the transaction is in optimistic mode,
// the logic is same as the pessimistic mode.
if flags.HasLocked() {
op = kvrpcpb.Op_Lock
lockCnt++
} else {
continue
}
} else {
op = kvrpcpb.Op_Del
delCnt++
}
}
}
}
Expand Down