diff --git a/executor/batch_point_get.go b/executor/batch_point_get.go index 0ce745d172e4a..ce917f7a650ec 100644 --- a/executor/batch_point_get.go +++ b/executor/batch_point_get.go @@ -450,15 +450,16 @@ func (e *BatchPointGetExec) initialize(ctx context.Context) error { if !e.txn.Valid() { return kv.ErrInvalidTxn } - membuf := e.txn.GetMemBuffer() - for _, idxKey := range indexKeys { - handleVal := handleVals[string(idxKey)] - if len(handleVal) == 0 { - continue - } - err = membuf.Set(idxKey, handleVal) - if err != nil { - return err + txn, ok := e.txn.(interface { + ChangeLockIntoPut(context.Context, kv.Key, []byte) bool + }) + if ok { + for _, idxKey := range indexKeys { + handleVal := handleVals[string(idxKey)] + if len(handleVal) == 0 { + continue + } + txn.ChangeLockIntoPut(ctx, idxKey, handleVal) } } } diff --git a/executor/point_get.go b/executor/point_get.go index f33ba20b5dd5a..a9cc5cb8fa3cd 100644 --- a/executor/point_get.go +++ b/executor/point_get.go @@ -270,10 +270,11 @@ func (e *PointGetExecutor) Next(ctx context.Context, req *chunk.Chunk) error { if !e.txn.Valid() { return kv.ErrInvalidTxn } - memBuffer := e.txn.GetMemBuffer() - err = memBuffer.Set(e.idxKey, e.handleVal) - if err != nil { - return err + txn, ok := e.txn.(interface { + ChangeLockIntoPut(context.Context, kv.Key, []byte) bool + }) + if ok { + txn.ChangeLockIntoPut(ctx, e.idxKey, e.handleVal) } } } diff --git a/go.mod b/go.mod index c47a8ba917b65..c6c2bcc16c5b7 100644 --- a/go.mod +++ b/go.mod @@ -63,7 +63,7 @@ require ( github.com/spf13/pflag v1.0.5 github.com/stretchr/testify v1.7.2-0.20220504104629-106ec21d14df github.com/tiancaiamao/appdash v0.0.0-20181126055449-889f96f722a2 - github.com/tikv/client-go/v2 v2.0.1-0.20220614073425-1693f8c71524 + github.com/tikv/client-go/v2 v2.0.1-0.20230328072015-603f10d1c677 github.com/tikv/pd/client v0.0.0-20220307081149-841fa61e9710 github.com/twmb/murmur3 v1.1.3 github.com/uber/jaeger-client-go v2.22.1+incompatible diff --git a/go.sum b/go.sum index 48646ce77dde5..4a99ac866ff5f 100644 --- a/go.sum +++ b/go.sum @@ -755,8 +755,8 @@ github.com/stretchr/testify v1.7.2-0.20220504104629-106ec21d14df/go.mod h1:6Fq8o github.com/subosito/gotenv v1.2.0/go.mod h1:N0PQaV/YGNqwC0u51sEeR/aUtSLEXKX9iv69rRypqCw= github.com/tiancaiamao/appdash v0.0.0-20181126055449-889f96f722a2 h1:mbAskLJ0oJfDRtkanvQPiooDH8HvJ2FBh+iKT/OmiQQ= github.com/tiancaiamao/appdash v0.0.0-20181126055449-889f96f722a2/go.mod h1:2PfKggNGDuadAa0LElHrByyrz4JPZ9fFx6Gs7nx7ZZU= -github.com/tikv/client-go/v2 v2.0.1-0.20220614073425-1693f8c71524 h1:nFTlY55m4gaRML/H44qw2Vg0KpkTISrsHJl5shzfm/g= -github.com/tikv/client-go/v2 v2.0.1-0.20220614073425-1693f8c71524/go.mod h1:VTlli8fRRpcpISj9I2IqroQmcAFfaTyBquiRhofOcDs= +github.com/tikv/client-go/v2 v2.0.1-0.20230328072015-603f10d1c677 h1:i4uNjYAs2J2AGQxHyKHVYhfogwNJsUICfjrenylLQCQ= +github.com/tikv/client-go/v2 v2.0.1-0.20230328072015-603f10d1c677/go.mod h1:VTlli8fRRpcpISj9I2IqroQmcAFfaTyBquiRhofOcDs= github.com/tikv/pd/client v0.0.0-20220307081149-841fa61e9710 h1:jxgmKOscXSjaFEKQGRyY5qOpK8hLqxs2irb/uDJMtwk= github.com/tikv/pd/client v0.0.0-20220307081149-841fa61e9710/go.mod h1:AtvppPwkiyUgQlR1W9qSqfTB+OsOIu19jDCOxOsPkmU= github.com/tklauser/go-sysconf v0.3.9 h1:JeUVdAOWhhxVcU6Eqr/ATFHgXk/mmiItdKeJPev3vTo= diff --git a/session/txn.go b/session/txn.go index ce36621ba8afe..dee4473178d01 100644 --- a/session/txn.go +++ b/session/txn.go @@ -400,6 +400,24 @@ func (txn *LazyTxn) LockKeys(ctx context.Context, lockCtx *kv.LockCtx, keys ...k return err } +// ChangeLockIntoPut tries to cache a locked key-value pair that might be converted to PUT on commit, returns true if +// the key-value pair has been cached. +func (txn *LazyTxn) ChangeLockIntoPut(ctx context.Context, key kv.Key, value []byte) bool { + if len(value) == 0 { + return false + } + cache, ok := txn.Transaction.(interface{ SetLockedKeyValue([]byte, []byte) }) + if !ok { + return false + } + _, err := txn.GetMemBuffer().Get(ctx, key) + if !kv.IsErrNotFound(err) { + return false + } + cache.SetLockedKeyValue(key, value) + return true +} + func (txn *LazyTxn) reset() { txn.cleanup() txn.changeToInvalid() diff --git a/tests/realtikvtest/pessimistictest/pessimistic_test.go b/tests/realtikvtest/pessimistictest/pessimistic_test.go index 2f1fe3478643e..6aaa8eb2ecd88 100644 --- a/tests/realtikvtest/pessimistictest/pessimistic_test.go +++ b/tests/realtikvtest/pessimistictest/pessimistic_test.go @@ -2913,6 +2913,40 @@ func TestChangeLockToPut(t *testing.T) { tk.MustExec("admin check table t1") } +func TestIssue28011(t *testing.T) { + store, clean := realtikvtest.CreateMockStoreAndSetup(t) + defer clean() + + tk := testkit.NewTestKit(t, store) + tk.MustExec("use test") + + for _, tt := range []struct { + name string + lockQuery string + finalRows [][]interface{} + }{ + {"Update", "update t set b = 'x' where a = 'a'", testkit.Rows("a x", "b y", "c z")}, + {"BatchUpdate", "update t set b = 'x' where a in ('a', 'b', 'c')", testkit.Rows("a x", "b y", "c x")}, + {"SelectForUpdate", "select a from t where a = 'a' for update", testkit.Rows("a x", "b y", "c z")}, + {"BatchSelectForUpdate", "select a from t where a in ('a', 'b', 'c') for update", testkit.Rows("a x", "b y", "c z")}, + } { + t.Run(tt.name, func(t *testing.T) { + tk.MustExec("drop table if exists t") + tk.MustExec("create table t (a varchar(10) primary key nonclustered, b varchar(10))") + tk.MustExec("insert into t values ('a', 'x'), ('b', 'x'), ('c', 'z')") + tk.MustExec("begin pessimistic") + tk.MustExec(tt.lockQuery) + tk.MustQuery("select a from t").Check(testkit.Rows("a", "b", "c")) + tk.MustExec("replace into t values ('b', 'y')") + tk.MustQuery("select a from t").Check(testkit.Rows("a", "b", "c")) + tk.MustQuery("select a, b from t order by a").Check(tt.finalRows) + tk.MustExec("commit") + tk.MustQuery("select a, b from t order by a").Check(tt.finalRows) + tk.MustExec("admin check table t") + }) + } +} + func createTable(part bool, columnNames []string, columnTypes []string) string { var str string str = "create table t("