Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

store: sets flagPreviousPresumeKNE after when a stmt finishes which sets Pre… #37625

Merged
merged 11 commits into from
Sep 6, 2022
8 changes: 4 additions & 4 deletions DEPS.bzl
Original file line number Diff line number Diff line change
Expand Up @@ -2797,8 +2797,8 @@ def go_deps():
name = "com_github_pingcap_kvproto",
build_file_proto_mode = "disable_global",
importpath = "github.com/pingcap/kvproto",
sum = "h1:s1al2ci3MEj5VnNuUCGAfeqpbCxcMeZibOyxw8ClHLE=",
version = "v0.0.0-20220818023518-a0f02b6efcee",
sum = "h1:5q7Ns0R7q6Uj+fpa3lDTijrcqgId4lNdGa2AG7izB5c=",
version = "v0.0.0-20220906053631-2e37953b2b43",
)
go_repository(
name = "com_github_pingcap_log",
Expand Down Expand Up @@ -3401,8 +3401,8 @@ def go_deps():
name = "com_github_tikv_client_go_v2",
build_file_proto_mode = "disable_global",
importpath = "github.com/tikv/client-go/v2",
sum = "h1:pZoPlKWCecxJKL8oRq/se71RTljYDrQlZQ2NzKkMYi0=",
version = "v2.0.1-0.20220830073839-0130f767386c",
sum = "h1:wjRWmUl4QmJF7V0aUskjT8EjjpfWxi5o9SQR5S1nNWA=",
version = "v2.0.1-0.20220906094532-f867f498456f",
)
go_repository(
name = "com_github_tikv_pd_client",
Expand Down
8 changes: 8 additions & 0 deletions docs/design/2022-08-04-pessimistic-lazy-constraint-check.md
Original file line number Diff line number Diff line change
Expand Up @@ -158,6 +158,14 @@ The uniqueness constraints are also preserved. For all the keys with a `PresumeK

Due to the "read committed" semantics of DMLs in pessimistic transactions, the late locking could succeed even if duplicated entries exist at the time of `INSERT` because other transactions remove the duplicated entry after that. From the view of our transaction, it's equivalent to the case when other transactions remove the duplicated entry before our `INSERT`. There will be no data corruption after the transaction commits.

#### Assumptions we make

The safety of the feature depends on these assumptions which are all true in TiDB 6.2.

- TiDB does not acquire pessimistic locks for non-unique index keys.
- TiDB does not mark non-unique index keys as `PresumeKeyNotExists`.
- If a key gets marked as `PresumeKeyNotExists`, it must be in the current statement buffer.

#### Safety with multiple operations in one statement

In current TiDB(<=6.2) implementation, the locking phase of pessimistic DML (except SELECT FOR UPDATE) begins after executors. If there are multiple operations on one key in the execution phase, they may not behave like what we expect. For example in the same statement there are operations:
Expand Down
4 changes: 2 additions & 2 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@ require (
github.com/pingcap/errors v0.11.5-0.20211224045212-9687c2b0f87c
github.com/pingcap/failpoint v0.0.0-20220423142525-ae43b7f4e5c3
github.com/pingcap/fn v0.0.0-20200306044125-d5540d389059
github.com/pingcap/kvproto v0.0.0-20220818023518-a0f02b6efcee
github.com/pingcap/kvproto v0.0.0-20220906053631-2e37953b2b43
github.com/pingcap/log v1.1.0
github.com/pingcap/sysutil v0.0.0-20220114020952-ea68d2dbf5b4
github.com/pingcap/tidb/parser v0.0.0-20211011031125-9b13dc409c5e
Expand All @@ -80,7 +80,7 @@ require (
github.com/stretchr/testify v1.8.0
github.com/tdakkota/asciicheck v0.1.1
github.com/tiancaiamao/appdash v0.0.0-20181126055449-889f96f722a2
github.com/tikv/client-go/v2 v2.0.1-0.20220830073839-0130f767386c
github.com/tikv/client-go/v2 v2.0.1-0.20220906094532-f867f498456f
github.com/tikv/pd/client v0.0.0-20220725055910-7187a7ab72db
github.com/timakin/bodyclose v0.0.0-20210704033933-f49887972144
github.com/twmb/murmur3 v1.1.3
Expand Down
8 changes: 4 additions & 4 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -759,8 +759,8 @@ github.com/pingcap/goleveldb v0.0.0-20191226122134-f82aafb29989 h1:surzm05a8C9dN
github.com/pingcap/goleveldb v0.0.0-20191226122134-f82aafb29989/go.mod h1:O17XtbryoCJhkKGbT62+L2OlrniwqiGLSqrmdHCMzZw=
github.com/pingcap/kvproto v0.0.0-20191211054548-3c6b38ea5107/go.mod h1:WWLmULLO7l8IOcQG+t+ItJ3fEcrL5FxF0Wu+HrMy26w=
github.com/pingcap/kvproto v0.0.0-20220510035547-0e2f26c0a46a/go.mod h1:OYtxs0786qojVTmkVeufx93xe+jUgm56GUYRIKnmaGI=
github.com/pingcap/kvproto v0.0.0-20220818023518-a0f02b6efcee h1:s1al2ci3MEj5VnNuUCGAfeqpbCxcMeZibOyxw8ClHLE=
github.com/pingcap/kvproto v0.0.0-20220818023518-a0f02b6efcee/go.mod h1:OYtxs0786qojVTmkVeufx93xe+jUgm56GUYRIKnmaGI=
github.com/pingcap/kvproto v0.0.0-20220906053631-2e37953b2b43 h1:5q7Ns0R7q6Uj+fpa3lDTijrcqgId4lNdGa2AG7izB5c=
github.com/pingcap/kvproto v0.0.0-20220906053631-2e37953b2b43/go.mod h1:OYtxs0786qojVTmkVeufx93xe+jUgm56GUYRIKnmaGI=
github.com/pingcap/log v0.0.0-20191012051959-b742a5d432e9/go.mod h1:4rbK1p9ILyIfb6hU7OG2CiWSqMXnp3JMbiaVJ6mvoY8=
github.com/pingcap/log v0.0.0-20200511115504-543df19646ad/go.mod h1:4rbK1p9ILyIfb6hU7OG2CiWSqMXnp3JMbiaVJ6mvoY8=
github.com/pingcap/log v0.0.0-20210625125904-98ed8e2eb1c7/go.mod h1:8AanEdAHATuRurdGxZXBz0At+9avep+ub7U1AGYLIMM=
Expand Down Expand Up @@ -913,8 +913,8 @@ github.com/tenntenn/text/transform v0.0.0-20200319021203-7eef512accb3 h1:f+jULpR
github.com/tenntenn/text/transform v0.0.0-20200319021203-7eef512accb3/go.mod h1:ON8b8w4BN/kE1EOhwT0o+d62W65a6aPw1nouo9LMgyY=
github.com/tiancaiamao/appdash v0.0.0-20181126055449-889f96f722a2 h1:mbAskLJ0oJfDRtkanvQPiooDH8HvJ2FBh+iKT/OmiQQ=
github.com/tiancaiamao/appdash v0.0.0-20181126055449-889f96f722a2/go.mod h1:2PfKggNGDuadAa0LElHrByyrz4JPZ9fFx6Gs7nx7ZZU=
github.com/tikv/client-go/v2 v2.0.1-0.20220830073839-0130f767386c h1:pZoPlKWCecxJKL8oRq/se71RTljYDrQlZQ2NzKkMYi0=
github.com/tikv/client-go/v2 v2.0.1-0.20220830073839-0130f767386c/go.mod h1:DqgQZKxPtMRazyELFyj7Ic2iOo+5XGTetxDp2KYnODs=
github.com/tikv/client-go/v2 v2.0.1-0.20220906094532-f867f498456f h1:wjRWmUl4QmJF7V0aUskjT8EjjpfWxi5o9SQR5S1nNWA=
github.com/tikv/client-go/v2 v2.0.1-0.20220906094532-f867f498456f/go.mod h1:tkKDJ88lryb16v7FfCh8pvvfwwCkh4aGeSOqHviPaaE=
github.com/tikv/pd/client v0.0.0-20220725055910-7187a7ab72db h1:r1eMh9Rny3hfWuBuxOnbsCRrR4FhthiNxLQ5rAUtaww=
github.com/tikv/pd/client v0.0.0-20220725055910-7187a7ab72db/go.mod h1:ew8kS0yIcEaSetuuywkTLIUBR+sz3J5XvAYRae11qwc=
github.com/timakin/bodyclose v0.0.0-20210704033933-f49887972144 h1:kl4KhGNsJIbDHS9/4U9yQo1UcPQM0kOMJHn29EoH/Ro=
Expand Down
6 changes: 6 additions & 0 deletions kv/keyflags.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ const (
// the flag indicates the conflict and constraint check of the key should be postponed
// to the next pessimistic lock or prewrite request.
flagNeedConstraintCheckInPrewrite
flagPreviousPresumeKNE
)

// HasPresumeKeyNotExists returns whether the associated key use lazy check.
Expand Down Expand Up @@ -89,6 +90,9 @@ const (
SetAssertNone
// SetNeedConstraintCheckInPrewrite sets the flag flagNeedConstraintCheckInPrewrite
SetNeedConstraintCheckInPrewrite
// SetPreviousPresumeKeyNotExists marks the PNE flag is set in previous statements, thus it cannot be unset when
// retrying or rolling back a statement.
SetPreviousPresumeKeyNotExists
)

// ApplyFlagsOps applys flagspos to origin.
Expand All @@ -110,6 +114,8 @@ func ApplyFlagsOps(origin KeyFlags, ops ...FlagsOp) KeyFlags {
origin |= flagAssertNotExists
case SetNeedConstraintCheckInPrewrite:
origin |= flagNeedConstraintCheckInPrewrite
case SetPreviousPresumeKeyNotExists:
origin |= flagPreviousPresumeKNE
}
}
return origin
Expand Down
13 changes: 13 additions & 0 deletions kv/kv.go
Original file line number Diff line number Diff line change
Expand Up @@ -150,6 +150,8 @@ type MemBuffer interface {
GetFlags(Key) (KeyFlags, error)
// SetWithFlags put key-value into the last active staging buffer with the given KeyFlags.
SetWithFlags(Key, []byte, ...FlagsOp) error
// UpdateFlags updates the flags associated with key.
UpdateFlags(Key, ...FlagsOp)
// DeleteWithFlags delete key with the given KeyFlags
DeleteWithFlags(Key, ...FlagsOp) error

Expand Down Expand Up @@ -180,6 +182,17 @@ type MemBuffer interface {
RemoveFromBuffer(Key)
}

// FindKeysInStage returns all keys in the given stage that satisfies the given condition.
func FindKeysInStage(m MemBuffer, h StagingHandle, predicate func(Key, KeyFlags, []byte) bool) []Key {
result := make([]Key, 0)
m.InspectStage(h, func(k Key, f KeyFlags, v []byte) {
if predicate(k, f, v) {
result = append(result, k)
}
})
return result
}

// LockCtx contains information for LockKeys method.
type LockCtx = tikvstore.LockCtx

Expand Down
14 changes: 14 additions & 0 deletions session/txn.go
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,9 @@ type LazyTxn struct {
sync.RWMutex
txninfo.TxnInfo
}

// mark the txn enables lazy uniqueness check in pessimistic transactions.
lazyUniquenessCheckEnabled bool
}

// GetTableInfo returns the cached index name.
Expand Down Expand Up @@ -123,6 +126,16 @@ func (txn *LazyTxn) flushStmtBuf() {
return
}
buf := txn.Transaction.GetMemBuffer()

if txn.lazyUniquenessCheckEnabled {
keysNeedSetPersistentPNE := kv.FindKeysInStage(buf, txn.stagingHandle, func(k kv.Key, flags kv.KeyFlags, v []byte) bool {
return flags.HasPresumeKeyNotExists()
})
for _, key := range keysNeedSetPersistentPNE {
buf.UpdateFlags(key, kv.SetPreviousPresumeKeyNotExists)
}
}

buf.Release(txn.stagingHandle)
txn.initCnt = buf.Len()
}
Expand Down Expand Up @@ -474,6 +487,7 @@ func (txn *LazyTxn) Wait(ctx context.Context, sctx sessionctx.Context) (kv.Trans
sctx.GetSessionVars().TxnCtx.StartTS = 0
return txn, err
}
txn.lazyUniquenessCheckEnabled = !sctx.GetSessionVars().ConstraintCheckInPlacePessimistic
}
return txn, nil
}
Expand Down
6 changes: 6 additions & 0 deletions store/driver/txn/unionstore_driver.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,10 @@ func (m *memBuffer) DeleteWithFlags(k kv.Key, ops ...kv.FlagsOp) error {
return derr.ToTiDBErr(err)
}

func (m *memBuffer) UpdateFlags(k kv.Key, ops ...kv.FlagsOp) {
m.MemDB.UpdateFlags(k, getTiKVFlagsOps(ops)...)
}

func (m *memBuffer) Get(_ context.Context, key kv.Key) ([]byte, error) {
data, err := m.MemDB.Get(key)
return data, derr.ToTiDBErr(err)
Expand Down Expand Up @@ -182,6 +186,8 @@ func getTiKVFlagsOp(op kv.FlagsOp) tikvstore.FlagsOp {
return tikvstore.SetAssertNone
case kv.SetNeedConstraintCheckInPrewrite:
return tikvstore.SetNeedConstraintCheckInPrewrite
case kv.SetPreviousPresumeKeyNotExists:
return tikvstore.SetPreviousPresumeKNE
}
return 0
}
Expand Down
61 changes: 59 additions & 2 deletions tests/realtikvtest/pessimistictest/pessimistic_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3139,7 +3139,7 @@ func TestPessimisticLockOnPartition(t *testing.T) {
<-ch // wait for goroutine to quit.
}

func TestInsertNotLock(t *testing.T) {
func TestLazyUniquenessCheckForSimpleInserts(t *testing.T) {
store := realtikvtest.CreateMockStoreAndSetup(t)
tk := testkit.NewTestKit(t, store)
tk2 := testkit.NewTestKit(t, store)
Expand Down Expand Up @@ -3174,7 +3174,7 @@ func TestInsertNotLock(t *testing.T) {
tk.MustExec("admin check table t2")
}

func TestDeferConstraintCheck(t *testing.T) {
func TestLazyUniquenessCheck(t *testing.T) {
store := realtikvtest.CreateMockStoreAndSetup(t)
tk := testkit.NewTestKit(t, store)
tk2 := testkit.NewTestKit(t, store)
Expand Down Expand Up @@ -3219,6 +3219,24 @@ func TestDeferConstraintCheck(t *testing.T) {
tk.MustQuery("select * from t2 use index(primary)").Check(testkit.Rows("1 1"))
tk.MustExec("admin check table t2")

// case: a modification of a lazy-checked key will compensate the lock
tk.MustExec("truncate table t2")
tk.MustExec("begin pessimistic")
tk.MustExec("insert into t2 values (1, 1)") // skip lock
tk.MustExec("update t2 set uk = uk + 1") // compensate the lock
ch := make(chan error, 1)
tk2.MustExec("begin pessimistic")
go func() {
tk2.MustExec("update t2 set uk = uk + 10 where id = 1") // should block, and read (1, 2), write (1, 12)
ch <- tk2.ExecToErr("commit")
}()
time.Sleep(500 * time.Millisecond)
tk.MustExec("commit")
err = <-ch
require.NoError(t, err)
tk.MustQuery("select * from t2").Check(testkit.Rows("1 12"))
tk.MustExec("admin check table t")

// case: conflict check failure
tk.MustExec("create table t3 (id int primary key, sk int, key i1(sk))")
tk.MustExec("begin pessimistic")
Expand Down Expand Up @@ -3324,3 +3342,42 @@ func TestLazyUniquenessCheckForInsertIgnore(t *testing.T) {
tk.MustExec("commit")
tk.MustQuery("select * from t").Check(testkit.Rows("1 1"))
}

func TestLazyUniquenessCheckWithStatementRetry(t *testing.T) {
store := realtikvtest.CreateMockStoreAndSetup(t)
tk := testkit.NewTestKit(t, store)
tk2 := testkit.NewTestKit(t, store)
tk.MustExec("use test")
tk2.MustExec("use test")
tk.MustExec("create table t5(id int primary key, uk int, unique key i1(uk))")
tk.MustExec("set @@tidb_constraint_check_in_place_pessimistic=0")

// TiKV will perform a constraint check before reporting assertion failure.
// And constraint violation precedes assertion failure.
if !*realtikvtest.WithRealTiKV {
tk.MustExec("set @@tidb_txn_assertion_level=off")
}

// case: update unique key using point-get, but conflict with concurrent write, return error in DML
tk.MustExec("insert into t5 values (1, 1)")
tk.MustExec("begin pessimistic")
tk.MustExec("insert into t5 values (3, 3)") // skip handle=3, uk=3
tk2.MustExec("insert into t5 values (2, 3)")
err := tk.ExecToErr("update t5 set id = 10 where uk = 3") // write conflict -> unset PresumeKNE -> retry
require.Error(t, err)
require.Contains(t, err.Error(), "Duplicate entry '3' for key 'i1'")
require.False(t, tk.Session().GetSessionVars().InTxn())
tk.MustExec("admin check table t5")

// case: update, but conflict with concurrent write, return error in DML
tk.MustExec("truncate table t5")
tk.MustExec("insert into t5 values (1, 1)")
tk.MustExec("begin pessimistic")
tk.MustExec("insert into t5 values (3, 3)") // skip handle=3, uk=3
tk2.MustExec("insert into t5 values (2, 3)")
err = tk.ExecToErr("update t5 set id = id + 10") // write conflict -> unset PresumeKNE -> retry
require.Error(t, err)
require.Contains(t, err.Error(), "Duplicate entry '3' for key 'i1'")
require.False(t, tk.Session().GetSessionVars().InTxn())
tk.MustExec("admin check table t5")
}