Skip to content

Commit

Permalink
executor,session: reimplement lock->put by SetLockedKeyValue (#42642)
Browse files Browse the repository at this point in the history
ref #28011
  • Loading branch information
zyguan authored Mar 28, 2023
1 parent 3282527 commit 0c7e204
Show file tree
Hide file tree
Showing 6 changed files with 70 additions and 16 deletions.
19 changes: 10 additions & 9 deletions executor/batch_point_get.go
Original file line number Diff line number Diff line change
Expand Up @@ -450,15 +450,16 @@ func (e *BatchPointGetExec) initialize(ctx context.Context) error {
if !e.txn.Valid() {
return kv.ErrInvalidTxn
}
membuf := e.txn.GetMemBuffer()
for _, idxKey := range indexKeys {
handleVal := handleVals[string(idxKey)]
if len(handleVal) == 0 {
continue
}
err = membuf.Set(idxKey, handleVal)
if err != nil {
return err
txn, ok := e.txn.(interface {
ChangeLockIntoPut(context.Context, kv.Key, []byte) bool
})
if ok {
for _, idxKey := range indexKeys {
handleVal := handleVals[string(idxKey)]
if len(handleVal) == 0 {
continue
}
txn.ChangeLockIntoPut(ctx, idxKey, handleVal)
}
}
}
Expand Down
9 changes: 5 additions & 4 deletions executor/point_get.go
Original file line number Diff line number Diff line change
Expand Up @@ -270,10 +270,11 @@ func (e *PointGetExecutor) Next(ctx context.Context, req *chunk.Chunk) error {
if !e.txn.Valid() {
return kv.ErrInvalidTxn
}
memBuffer := e.txn.GetMemBuffer()
err = memBuffer.Set(e.idxKey, e.handleVal)
if err != nil {
return err
txn, ok := e.txn.(interface {
ChangeLockIntoPut(context.Context, kv.Key, []byte) bool
})
if ok {
txn.ChangeLockIntoPut(ctx, e.idxKey, e.handleVal)
}
}
}
Expand Down
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@ require (
github.com/spf13/pflag v1.0.5
github.com/stretchr/testify v1.7.2-0.20220504104629-106ec21d14df
github.com/tiancaiamao/appdash v0.0.0-20181126055449-889f96f722a2
github.com/tikv/client-go/v2 v2.0.1-0.20220614073425-1693f8c71524
github.com/tikv/client-go/v2 v2.0.1-0.20230328072015-603f10d1c677
github.com/tikv/pd/client v0.0.0-20220307081149-841fa61e9710
github.com/twmb/murmur3 v1.1.3
github.com/uber/jaeger-client-go v2.22.1+incompatible
Expand Down
4 changes: 2 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -755,8 +755,8 @@ github.com/stretchr/testify v1.7.2-0.20220504104629-106ec21d14df/go.mod h1:6Fq8o
github.com/subosito/gotenv v1.2.0/go.mod h1:N0PQaV/YGNqwC0u51sEeR/aUtSLEXKX9iv69rRypqCw=
github.com/tiancaiamao/appdash v0.0.0-20181126055449-889f96f722a2 h1:mbAskLJ0oJfDRtkanvQPiooDH8HvJ2FBh+iKT/OmiQQ=
github.com/tiancaiamao/appdash v0.0.0-20181126055449-889f96f722a2/go.mod h1:2PfKggNGDuadAa0LElHrByyrz4JPZ9fFx6Gs7nx7ZZU=
github.com/tikv/client-go/v2 v2.0.1-0.20220614073425-1693f8c71524 h1:nFTlY55m4gaRML/H44qw2Vg0KpkTISrsHJl5shzfm/g=
github.com/tikv/client-go/v2 v2.0.1-0.20220614073425-1693f8c71524/go.mod h1:VTlli8fRRpcpISj9I2IqroQmcAFfaTyBquiRhofOcDs=
github.com/tikv/client-go/v2 v2.0.1-0.20230328072015-603f10d1c677 h1:i4uNjYAs2J2AGQxHyKHVYhfogwNJsUICfjrenylLQCQ=
github.com/tikv/client-go/v2 v2.0.1-0.20230328072015-603f10d1c677/go.mod h1:VTlli8fRRpcpISj9I2IqroQmcAFfaTyBquiRhofOcDs=
github.com/tikv/pd/client v0.0.0-20220307081149-841fa61e9710 h1:jxgmKOscXSjaFEKQGRyY5qOpK8hLqxs2irb/uDJMtwk=
github.com/tikv/pd/client v0.0.0-20220307081149-841fa61e9710/go.mod h1:AtvppPwkiyUgQlR1W9qSqfTB+OsOIu19jDCOxOsPkmU=
github.com/tklauser/go-sysconf v0.3.9 h1:JeUVdAOWhhxVcU6Eqr/ATFHgXk/mmiItdKeJPev3vTo=
Expand Down
18 changes: 18 additions & 0 deletions session/txn.go
Original file line number Diff line number Diff line change
Expand Up @@ -400,6 +400,24 @@ func (txn *LazyTxn) LockKeys(ctx context.Context, lockCtx *kv.LockCtx, keys ...k
return err
}

// ChangeLockIntoPut tries to cache a locked key-value pair that might be converted to PUT on commit, returns true if
// the key-value pair has been cached.
func (txn *LazyTxn) ChangeLockIntoPut(ctx context.Context, key kv.Key, value []byte) bool {
if len(value) == 0 {
return false
}
cache, ok := txn.Transaction.(interface{ SetLockedKeyValue([]byte, []byte) })
if !ok {
return false
}
_, err := txn.GetMemBuffer().Get(ctx, key)
if !kv.IsErrNotFound(err) {
return false
}
cache.SetLockedKeyValue(key, value)
return true
}

func (txn *LazyTxn) reset() {
txn.cleanup()
txn.changeToInvalid()
Expand Down
34 changes: 34 additions & 0 deletions tests/realtikvtest/pessimistictest/pessimistic_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2913,6 +2913,40 @@ func TestChangeLockToPut(t *testing.T) {
tk.MustExec("admin check table t1")
}

func TestIssue28011(t *testing.T) {
store, clean := realtikvtest.CreateMockStoreAndSetup(t)
defer clean()

tk := testkit.NewTestKit(t, store)
tk.MustExec("use test")

for _, tt := range []struct {
name string
lockQuery string
finalRows [][]interface{}
}{
{"Update", "update t set b = 'x' where a = 'a'", testkit.Rows("a x", "b y", "c z")},
{"BatchUpdate", "update t set b = 'x' where a in ('a', 'b', 'c')", testkit.Rows("a x", "b y", "c x")},
{"SelectForUpdate", "select a from t where a = 'a' for update", testkit.Rows("a x", "b y", "c z")},
{"BatchSelectForUpdate", "select a from t where a in ('a', 'b', 'c') for update", testkit.Rows("a x", "b y", "c z")},
} {
t.Run(tt.name, func(t *testing.T) {
tk.MustExec("drop table if exists t")
tk.MustExec("create table t (a varchar(10) primary key nonclustered, b varchar(10))")
tk.MustExec("insert into t values ('a', 'x'), ('b', 'x'), ('c', 'z')")
tk.MustExec("begin pessimistic")
tk.MustExec(tt.lockQuery)
tk.MustQuery("select a from t").Check(testkit.Rows("a", "b", "c"))
tk.MustExec("replace into t values ('b', 'y')")
tk.MustQuery("select a from t").Check(testkit.Rows("a", "b", "c"))
tk.MustQuery("select a, b from t order by a").Check(tt.finalRows)
tk.MustExec("commit")
tk.MustQuery("select a, b from t order by a").Check(tt.finalRows)
tk.MustExec("admin check table t")
})
}
}

func createTable(part bool, columnNames []string, columnTypes []string) string {
var str string
str = "create table t("
Expand Down

0 comments on commit 0c7e204

Please sign in to comment.