Skip to content

Commit

Permalink
executor: fix unstable test in the TestPointGetLockExistKey (#32949)
Browse files Browse the repository at this point in the history
close #32948
  • Loading branch information
hawkingrei authored Mar 9, 2022
1 parent 7408e68 commit b88d62e
Showing 1 changed file with 50 additions and 59 deletions.
109 changes: 50 additions & 59 deletions executor/point_get_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@ import (
"context"
"fmt"
"strings"
"sync"
"testing"
"time"

Expand All @@ -32,6 +31,7 @@ import (
"github.com/pingcap/tidb/testkit"
"github.com/pingcap/tidb/testkit/testdata"
"github.com/pingcap/tidb/types"
"github.com/pingcap/tidb/util"
"github.com/pingcap/tidb/util/codec"
"github.com/stretchr/testify/require"
"github.com/tikv/client-go/v2/tikv"
Expand Down Expand Up @@ -682,72 +682,70 @@ func TestPointGetWriteLock(t *testing.T) {
}

func TestPointGetLockExistKey(t *testing.T) {
var wg sync.WaitGroup
errCh := make(chan error)
var wg util.WaitGroupWrapper

testLock := func(rc bool, key string, tableName string) {
doneCh := make(chan struct{}, 1)
store, clean := testkit.CreateMockStore(t)
defer clean()
tk1, tk2 := testkit.NewTestKit(t, store), testkit.NewTestKit(t, store)

errCh <- tk1.ExecToErr("use test")
errCh <- tk2.ExecToErr("use test")
tk1.MustExec("use test")
tk2.MustExec("use test")
tk1.Session().GetSessionVars().EnableClusteredIndex = variable.ClusteredIndexDefModeIntOnly

errCh <- tk1.ExecToErr(fmt.Sprintf("drop table if exists %s", tableName))
errCh <- tk1.ExecToErr(fmt.Sprintf("create table %s(id int, v int, k int, %s key0(id, v))", tableName, key))
errCh <- tk1.ExecToErr(fmt.Sprintf("insert into %s values(1, 1, 1)", tableName))
tk1.MustExec(fmt.Sprintf("drop table if exists %s", tableName))
tk1.MustExec(fmt.Sprintf("create table %s(id int, v int, k int, %s key0(id, v))", tableName, key))
tk1.MustExec(fmt.Sprintf("insert into %s values(1, 1, 1)", tableName))

if rc {
errCh <- tk1.ExecToErr("set tx_isolation = 'READ-COMMITTED'")
errCh <- tk2.ExecToErr("set tx_isolation = 'READ-COMMITTED'")
tk1.MustExec("set tx_isolation = 'READ-COMMITTED'")
tk2.MustExec("set tx_isolation = 'READ-COMMITTED'")
}

// select for update
errCh <- tk1.ExecToErr("begin pessimistic")
errCh <- tk2.ExecToErr("begin pessimistic")
tk1.MustExec("begin pessimistic")
tk2.MustExec("begin pessimistic")
// lock exist key
errCh <- tk1.ExecToErr(fmt.Sprintf("select * from %s where id = 1 and v = 1 for update", tableName))
tk1.MustExec(fmt.Sprintf("select * from %s where id = 1 and v = 1 for update", tableName))
// read committed will not lock non-exist key
if rc {
errCh <- tk1.ExecToErr(fmt.Sprintf("select * from %s where id = 2 and v = 2 for update", tableName))
tk1.MustExec(fmt.Sprintf("select * from %s where id = 2 and v = 2 for update", tableName))
}
errCh <- tk2.ExecToErr(fmt.Sprintf("insert into %s values(2, 2, 2)", tableName))
go func() {
errCh <- tk2.ExecToErr(fmt.Sprintf("insert into %s values(1, 1, 10)", tableName))
tk2.MustExec(fmt.Sprintf("insert into %s values(2, 2, 2)", tableName))
var wg3 util.WaitGroupWrapper
wg3.Run(func() {
tk2.MustExec(fmt.Sprintf("insert into %s values(1, 1, 10)", tableName))
// tk2.MustExec(fmt.Sprintf("insert into %s values(1, 1, 10)", tableName))
doneCh <- struct{}{}
}()
})
time.Sleep(150 * time.Millisecond)
errCh <- tk1.ExecToErr(fmt.Sprintf("update %s set v = 2 where id = 1 and v = 1", tableName))
errCh <- tk1.ExecToErr("commit")
<-doneCh
errCh <- tk2.ExecToErr("commit")
tk1.MustExec(fmt.Sprintf("update %s set v = 2 where id = 1 and v = 1", tableName))
tk1.MustExec("commit")
wg3.Wait()
tk2.MustExec("commit")
tk1.MustQuery(fmt.Sprintf("select * from %s", tableName)).Check(testkit.Rows(
"1 2 1",
"2 2 2",
"1 1 10",
))

// update
errCh <- tk1.ExecToErr("begin pessimistic")
errCh <- tk2.ExecToErr("begin pessimistic")
tk1.MustExec("begin pessimistic")
tk2.MustExec("begin pessimistic")
// lock exist key
errCh <- tk1.ExecToErr(fmt.Sprintf("update %s set v = 3 where id = 2 and v = 2", tableName))
tk1.MustExec(fmt.Sprintf("update %s set v = 3 where id = 2 and v = 2", tableName))
// read committed will not lock non-exist key
if rc {
errCh <- tk1.ExecToErr(fmt.Sprintf("update %s set v =4 where id = 3 and v = 3", tableName))
tk1.MustExec(fmt.Sprintf("update %s set v =4 where id = 3 and v = 3", tableName))
}
errCh <- tk2.ExecToErr(fmt.Sprintf("insert into %s values(3, 3, 3)", tableName))
go func() {
errCh <- tk2.ExecToErr(fmt.Sprintf("insert into %s values(2, 2, 20)", tableName))
doneCh <- struct{}{}
}()
tk2.MustExec(fmt.Sprintf("insert into %s values(3, 3, 3)", tableName))
var wg2 util.WaitGroupWrapper
wg2.Run(func() {
tk2.MustExec(fmt.Sprintf("insert into %s values(2, 2, 20)", tableName))
})
time.Sleep(150 * time.Millisecond)
errCh <- tk1.ExecToErr("commit")
<-doneCh
errCh <- tk2.ExecToErr("commit")
tk1.MustExec("commit")
wg2.Wait()
tk2.MustExec("commit")
tk1.MustQuery(fmt.Sprintf("select * from %s", tableName)).Check(testkit.Rows(
"1 2 1",
"2 3 2",
Expand All @@ -757,23 +755,23 @@ func TestPointGetLockExistKey(t *testing.T) {
))

// delete
errCh <- tk1.ExecToErr("begin pessimistic")
errCh <- tk2.ExecToErr("begin pessimistic")
tk1.MustExec("begin pessimistic")
tk2.MustExec("begin pessimistic")
// lock exist key
errCh <- tk1.ExecToErr(fmt.Sprintf("delete from %s where id = 3 and v = 3", tableName))
tk1.MustExec(fmt.Sprintf("delete from %s where id = 3 and v = 3", tableName))
// read committed will not lock non-exist key
if rc {
errCh <- tk1.ExecToErr(fmt.Sprintf("delete from %s where id = 4 and v = 4", tableName))
tk1.MustExec(fmt.Sprintf("delete from %s where id = 4 and v = 4", tableName))
}
errCh <- tk2.ExecToErr(fmt.Sprintf("insert into %s values(4, 4, 4)", tableName))
go func() {
errCh <- tk2.ExecToErr(fmt.Sprintf("insert into %s values(3, 3, 30)", tableName))
doneCh <- struct{}{}
}()
tk2.MustExec(fmt.Sprintf("insert into %s values(4, 4, 4)", tableName))
var wg1 util.WaitGroupWrapper
wg1.Run(func() {
tk2.MustExec(fmt.Sprintf("insert into %s values(3, 3, 30)", tableName))
})
time.Sleep(50 * time.Millisecond)
errCh <- tk1.ExecToErr("commit")
<-doneCh
errCh <- tk2.ExecToErr("commit")
tk1.MustExec("commit")
wg1.Wait()
tk2.MustExec("commit")
tk1.MustQuery(fmt.Sprintf("select * from %s", tableName)).Check(testkit.Rows(
"1 2 1",
"2 3 2",
Expand All @@ -782,7 +780,6 @@ func TestPointGetLockExistKey(t *testing.T) {
"4 4 4",
"3 3 30",
))
wg.Done()
}

for i, one := range []struct {
Expand All @@ -794,18 +791,12 @@ func TestPointGetLockExistKey(t *testing.T) {
{rc: true, key: "primary key"},
{rc: true, key: "unique key"},
} {
wg.Add(1)
tableName := fmt.Sprintf("t_%d", i)
go testLock(one.rc, one.key, tableName)
}

go func() {
wg.Wait()
close(errCh)
}()
for err := range errCh {
require.NoError(t, err)
wg.Run(func() {
testLock(one.rc, one.key, tableName)
})
}
wg.Wait()
}

func TestWithTiDBSnapshot(t *testing.T) {
Expand Down

0 comments on commit b88d62e

Please sign in to comment.