Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

executor, store/tikv: locks exist keys for point_get & batch_point_get #21229

Merged
merged 17 commits into from
Dec 3, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
52 changes: 32 additions & 20 deletions executor/batch_point_get.go
Original file line number Diff line number Diff line change
Expand Up @@ -177,10 +177,11 @@ func (e *BatchPointGetExec) initialize(ctx context.Context) error {
var indexKeys []kv.Key
var err error
batchGetter := e.batchGetter
rc := e.ctx.GetSessionVars().IsPessimisticReadConsistency()
if e.idxInfo != nil && !isCommonHandleRead(e.tblInfo, e.idxInfo) {
// `SELECT a, b FROM t WHERE (a, b) IN ((1, 2), (1, 2), (2, 1), (1, 2))` should not return duplicated rows
dedup := make(map[hack.MutableString]struct{})
keys := make([]kv.Key, 0, len(e.idxVals))
toFetchIndexKeys := make([]kv.Key, 0, len(e.idxVals))
for _, idxVals := range e.idxVals {
// For all x, 'x IN (null)' evaluate to null, so the query get no result.
if datumsContainNull(idxVals) {
Expand All @@ -197,34 +198,42 @@ func (e *BatchPointGetExec) initialize(ctx context.Context) error {
continue
}
dedup[s] = struct{}{}
keys = append(keys, idxKey)
toFetchIndexKeys = append(toFetchIndexKeys, idxKey)
}
if e.keepOrder {
sort.Slice(keys, func(i int, j int) bool {
sort.Slice(toFetchIndexKeys, func(i int, j int) bool {
if e.desc {
return keys[i].Cmp(keys[j]) > 0
return toFetchIndexKeys[i].Cmp(toFetchIndexKeys[j]) > 0
}
return keys[i].Cmp(keys[j]) < 0
return toFetchIndexKeys[i].Cmp(toFetchIndexKeys[j]) < 0
})
}
indexKeys = keys

// lock all keys in repeatable read isolation.
// for read consistency, only lock exist keys,
// indexKeys will be generated after getting handles.
if !rc {
indexKeys = toFetchIndexKeys
} else {
indexKeys = make([]kv.Key, 0, len(toFetchIndexKeys))
}

// SELECT * FROM t WHERE x IN (null), in this case there is no key.
if len(keys) == 0 {
if len(toFetchIndexKeys) == 0 {
return nil
}

// Fetch all handles.
handleVals, err = batchGetter.BatchGet(ctx, keys)
handleVals, err = batchGetter.BatchGet(ctx, toFetchIndexKeys)
if err != nil {
return err
}

e.handles = make([]kv.Handle, 0, len(keys))
e.handles = make([]kv.Handle, 0, len(toFetchIndexKeys))
if e.tblInfo.Partition != nil {
e.physIDs = make([]int64, 0, len(keys))
e.physIDs = make([]int64, 0, len(toFetchIndexKeys))
}
for _, key := range keys {
for _, key := range toFetchIndexKeys {
handleVal := handleVals[string(key)]
if len(handleVal) == 0 {
continue
Expand All @@ -234,6 +243,9 @@ func (e *BatchPointGetExec) initialize(ctx context.Context) error {
return err1
}
e.handles = append(e.handles, handle)
if rc {
indexKeys = append(indexKeys, key)
}
if e.tblInfo.Partition != nil {
pid := tablecodec.DecodeTableID(key)
e.physIDs = append(e.physIDs, pid)
Expand Down Expand Up @@ -310,17 +322,11 @@ func (e *BatchPointGetExec) initialize(ctx context.Context) error {
}

var values map[string][]byte
rc := e.ctx.GetSessionVars().IsPessimisticReadConsistency()
// Lock keys (include exists and non-exists keys) before fetch all values for Repeatable Read Isolation.
if e.lock && !rc {
lockKeys := make([]kv.Key, len(keys), len(keys)+len(indexKeys))
lockKeys := make([]kv.Key, len(keys)+len(indexKeys))
you06 marked this conversation as resolved.
Show resolved Hide resolved
copy(lockKeys, keys)
for _, idxKey := range indexKeys {
// lock the non-exist index key, using len(val) in case BatchGet result contains some zero len entries
if val := handleVals[string(idxKey)]; len(val) == 0 {
lockKeys = append(lockKeys, idxKey)
}
}
copy(lockKeys[len(keys):], indexKeys)
err = LockKeys(ctx, e.ctx, e.waitTime, lockKeys...)
if err != nil {
return err
Expand All @@ -334,7 +340,7 @@ func (e *BatchPointGetExec) initialize(ctx context.Context) error {
handles := make([]kv.Handle, 0, len(values))
var existKeys []kv.Key
if e.lock && rc {
existKeys = make([]kv.Key, 0, len(values))
existKeys = make([]kv.Key, 0, 2*len(values))
}
e.values = make([][]byte, 0, len(values))
for i, key := range keys {
Expand All @@ -350,6 +356,12 @@ func (e *BatchPointGetExec) initialize(ctx context.Context) error {
handles = append(handles, e.handles[i])
if e.lock && rc {
existKeys = append(existKeys, key)
// when e.handles is set in builder directly, index should be primary key and the plan is CommonHandleRead
// with clustered index enabled, indexKeys is empty in this situation
// lock primary key for clustered index table is redundant
if len(indexKeys) != 0 {
existKeys = append(existKeys, indexKeys[i])
}
}
}
// Lock exists keys only for Read Committed Isolation.
Expand Down
147 changes: 147 additions & 0 deletions executor/batch_point_get_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,10 @@
package executor_test

import (
"fmt"
"sync"
"time"

. "github.com/pingcap/check"
"github.com/pingcap/errors"
"github.com/pingcap/tidb/domain"
Expand Down Expand Up @@ -162,3 +166,146 @@ func (s *testBatchPointGetSuite) TestBatchPointGetUnsignedHandleWithSort(c *C) {
tk.MustQuery("select id from t2 where id in (8738875760185212610, 1, 9814441339970117597) order by id").Check(testkit.Rows("1", "8738875760185212610", "9814441339970117597"))
tk.MustQuery("select id from t2 where id in (8738875760185212610, 1, 9814441339970117597) order by id desc").Check(testkit.Rows("9814441339970117597", "8738875760185212610", "1"))
}

func (s *testBatchPointGetSuite) TestBatchPointGetLockExistKey(c *C) {
var wg sync.WaitGroup
errCh := make(chan error)

testLock := func(rc bool, key string, tableName string) {
doneCh := make(chan struct{}, 1)
tk1, tk2 := testkit.NewTestKit(c, s.store), testkit.NewTestKit(c, s.store)

errCh <- tk1.ExecToErr("use test")
errCh <- tk2.ExecToErr("use test")
errCh <- tk1.ExecToErr("set session tidb_enable_clustered_index = 0")

errCh <- tk1.ExecToErr(fmt.Sprintf("drop table if exists %s", tableName))
errCh <- tk1.ExecToErr(fmt.Sprintf("create table %s(id int, v int, k int, %s key0(id, v))", tableName, key))
errCh <- tk1.ExecToErr(fmt.Sprintf("insert into %s values(1, 1, 1), (2, 2, 2)", tableName))

if rc {
errCh <- tk1.ExecToErr("set tx_isolation = 'READ-COMMITTED'")
errCh <- tk2.ExecToErr("set tx_isolation = 'READ-COMMITTED'")
}

errCh <- tk1.ExecToErr("begin pessimistic")
errCh <- tk2.ExecToErr("begin pessimistic")

// select for update
if !rc {
// lock exist key only for repeatable read
errCh <- tk1.ExecToErr(fmt.Sprintf("select * from %s where (id, v) in ((1, 1), (2, 2)) for update", tableName))
} else {
// read committed will not lock non-exist key
errCh <- tk1.ExecToErr(fmt.Sprintf("select * from %s where (id, v) in ((1, 1), (2, 2), (3, 3)) for update", tableName))
}
errCh <- tk2.ExecToErr(fmt.Sprintf("insert into %s values(3, 3, 3)", tableName))
go func() {
errCh <- tk2.ExecToErr(fmt.Sprintf("insert into %s values(1, 1, 10)", tableName))
doneCh <- struct{}{}
}()

time.Sleep(150 * time.Millisecond)
errCh <- tk1.ExecToErr(fmt.Sprintf("update %s set v = 2 where id = 1 and v = 1", tableName))

errCh <- tk1.ExecToErr("commit")
<-doneCh
errCh <- tk2.ExecToErr("commit")
tk1.MustQuery(fmt.Sprintf("select * from %s", tableName)).Check(testkit.Rows(
"1 2 1",
"2 2 2",
"3 3 3",
"1 1 10",
))

// update
errCh <- tk1.ExecToErr("begin pessimistic")
errCh <- tk2.ExecToErr("begin pessimistic")
if !rc {
// lock exist key only for repeatable read
errCh <- tk1.ExecToErr(fmt.Sprintf("update %s set v = v + 1 where (id, v) in ((2, 2), (3, 3))", tableName))
} else {
// read committed will not lock non-exist key
errCh <- tk1.ExecToErr(fmt.Sprintf("update %s set v = v + 1 where (id, v) in ((2, 2), (3, 3), (4, 4))", tableName))
}
errCh <- tk2.ExecToErr(fmt.Sprintf("insert into %s values(4, 4, 4)", tableName))
go func() {
errCh <- tk2.ExecToErr(fmt.Sprintf("insert into %s values(3, 3, 30)", tableName))
doneCh <- struct{}{}
}()
time.Sleep(150 * time.Millisecond)
errCh <- tk1.ExecToErr("commit")
<-doneCh
errCh <- tk2.ExecToErr("commit")
tk1.MustQuery(fmt.Sprintf("select * from %s", tableName)).Check(testkit.Rows(
"1 2 1",
"2 3 2",
"3 4 3",
"1 1 10",
"4 4 4",
"3 3 30",
))

// delete
errCh <- tk1.ExecToErr("begin pessimistic")
errCh <- tk2.ExecToErr("begin pessimistic")
if !rc {
// lock exist key only for repeatable read
errCh <- tk1.ExecToErr(fmt.Sprintf("delete from %s where (id, v) in ((3, 4), (4, 4))", tableName))
} else {
// read committed will not lock non-exist key
errCh <- tk1.ExecToErr(fmt.Sprintf("delete from %s where (id, v) in ((3, 4), (4, 4), (5, 5))", tableName))
}
errCh <- tk2.ExecToErr(fmt.Sprintf("insert into %s values(5, 5, 5)", tableName))
go func() {
errCh <- tk2.ExecToErr(fmt.Sprintf("insert into %s values(4, 4,40)", tableName))
doneCh <- struct{}{}
}()
time.Sleep(150 * time.Millisecond)
errCh <- tk1.ExecToErr("commit")
<-doneCh
errCh <- tk2.ExecToErr("commit")
tk1.MustQuery(fmt.Sprintf("select * from %s", tableName)).Check(testkit.Rows(
"1 2 1",
"2 3 2",
"1 1 10",
"3 3 30",
"5 5 5",
"4 4 40",
))
wg.Done()
}

for i, one := range []struct {
rc bool
key string
}{
{rc: false, key: "primary key"},
{rc: false, key: "unique key"},
{rc: true, key: "primary key"},
{rc: true, key: "unique key"},
} {
wg.Add(1)
tableName := fmt.Sprintf("t_%d", i)
go testLock(one.rc, one.key, tableName)
}

// should works for common handle in clustered index
tk := testkit.NewTestKit(c, s.store)
tk.MustExec("use test")
tk.MustExec("drop table if exists t")
tk.MustExec("create table t(id varchar(40) primary key)")
tk.MustExec("insert into t values('1'), ('2')")
tk.MustExec("set tx_isolation = 'READ-COMMITTED'")
tk.MustExec("begin pessimistic")
tk.MustExec("select * from t where id in('1', '2') for update")
tk.MustExec("commit")

go func() {
wg.Wait()
close(errCh)
}()
for err := range errCh {
c.Assert(err, IsNil)
}
}
16 changes: 11 additions & 5 deletions executor/point_get.go
Original file line number Diff line number Diff line change
Expand Up @@ -199,13 +199,19 @@ func (e *PointGetExecutor) Next(ctx context.Context, req *chunk.Chunk) error {
return err
}
}
if len(e.handleVal) == 0 {
// handle is not found, try lock the index key if isolation level is not read consistency
if e.ctx.GetSessionVars().IsPessimisticReadConsistency() {
return nil

// try lock the index key if isolation level is not read consistency
// also lock key if read consistency read a value
if !e.ctx.GetSessionVars().IsPessimisticReadConsistency() || len(e.handleVal) > 0 {
err = e.lockKeyIfNeeded(ctx, e.idxKey)
if err != nil {
return err
}
return e.lockKeyIfNeeded(ctx, e.idxKey)
}
if len(e.handleVal) == 0 {
return nil
}

var iv kv.Handle
iv, err = tablecodec.DecodeHandleInUniqueIndexValue(e.handleVal, e.tblInfo.IsCommonHandle)
if err != nil {
Expand Down
Loading