Skip to content

Commit

Permalink
update test
Browse files Browse the repository at this point in the history
  • Loading branch information
fzhedu committed Mar 12, 2021
1 parent ff4026d commit bba1eb9
Show file tree
Hide file tree
Showing 2 changed files with 12 additions and 16 deletions.
21 changes: 11 additions & 10 deletions executor/tiflash_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ import (
"github.com/pingcap/tidb/store/mockstore/unistore"
"github.com/pingcap/tidb/store/tikv/mockstore/cluster"
"github.com/pingcap/tidb/util/testkit"
"github.com/pingcap/tidb/util/testleak"
)

type tiflashTestSuite struct {
Expand All @@ -42,6 +43,7 @@ type tiflashTestSuite struct {
}

func (s *tiflashTestSuite) SetUpSuite(c *C) {
testleak.BeforeTest()
var err error
s.store, err = mockstore.NewMockStore(
mockstore.WithClusterInspector(func(c cluster.Cluster) {
Expand Down Expand Up @@ -71,6 +73,7 @@ func (s *tiflashTestSuite) SetUpSuite(c *C) {
}

func (s *tiflashTestSuite) TestReadPartitionTable(c *C) {
defer testleak.AfterTest(c)()
tk := testkit.NewTestKit(c, s.store)
tk.MustExec("use test")
tk.MustExec("drop table if exists t")
Expand Down Expand Up @@ -98,6 +101,7 @@ func (s *tiflashTestSuite) TestReadPartitionTable(c *C) {
}

func (s *tiflashTestSuite) TestReadUnsigedPK(c *C) {
defer testleak.AfterTest(c)()
tk := testkit.NewTestKit(c, s.store)
tk.MustExec("use test")
tk.MustExec("drop table if exists t")
Expand Down Expand Up @@ -133,6 +137,7 @@ func (s *tiflashTestSuite) TestReadUnsigedPK(c *C) {
}

func (s *tiflashTestSuite) TestMppExecution(c *C) {
defer testleak.AfterTest(c)()
tk := testkit.NewTestKit(c, s.store)
tk.MustExec("use test")
tk.MustExec("drop table if exists t")
Expand Down Expand Up @@ -191,6 +196,7 @@ func (s *tiflashTestSuite) TestMppExecution(c *C) {
}

func (s *tiflashTestSuite) TestPartitionTable(c *C) {
defer testleak.AfterTest(c)()
tk := testkit.NewTestKit(c, s.store)
tk.MustExec("use test")
tk.MustExec("drop table if exists t")
Expand Down Expand Up @@ -279,6 +285,7 @@ func (s *tiflashTestSuite) TestPartitionTable(c *C) {
}

func (s *tiflashTestSuite) TestCancelMppTasks(c *C) {
defer testleak.AfterTest(c)()
var hang = "github.com/pingcap/tidb/store/mockstore/unistore/mppRecvHang"
tk := testkit.NewTestKit(c, s.store)
tk.MustExec("use test")
Expand Down Expand Up @@ -312,12 +319,11 @@ func (s *tiflashTestSuite) TestCancelMppTasks(c *C) {

// all goroutines exit if one goroutine hangs but another return errors
func (s *tiflashTestSuite) TestMppGoroutinesExitFromErrors(c *C) {
defer testleak.AfterTest(c)()
// mock non-root tasks return error
var mockTaskError = "github.com/pingcap/tidb/store/copr/mppExitFromErrors"
var mppNonRootTaskError = "github.com/pingcap/tidb/store/copr/mppNonRootTaskError"
// mock root tasks hang
var hang = "github.com/pingcap/tidb/store/mockstore/unistore/mppRecvHang"
// check all goroutines done
var exitDone = "github.com/pingcap/tidb/store/copr/mppExitsDone"
tk := testkit.NewTestKit(c, s.store)
tk.MustExec("use test")
tk.MustExec("drop table if exists t")
Expand All @@ -338,19 +344,14 @@ func (s *tiflashTestSuite) TestMppGoroutinesExitFromErrors(c *C) {
tk.MustExec("insert into t1 values(1,0)")
tk.MustExec("insert into t1 values(2,0)")
tk.MustExec("insert into t1 values(3,0)")
atomic.StoreUint32(&tk.Se.GetSessionVars().Killed, 0)
tk.MustExec("set @@session.tidb_isolation_read_engines=\"tiflash\"")
tk.MustExec("set @@session.tidb_allow_mpp=ON")
c.Assert(failpoint.Enable(exitDone, `return(true)`), IsNil)
c.Assert(failpoint.Enable(mockTaskError, `return(true)`), IsNil)
c.Assert(failpoint.Enable(mppNonRootTaskError, `return(true)`), IsNil)
c.Assert(failpoint.Enable(hang, `return(true)`), IsNil)

// generate 2 root tasks, one will hang and another will return errors
err = tk.QueryToErr("select count(*) from t as t1 , t where t1.a = t.a")
c.Assert(err, NotNil)
// check that all goroutine exit normally.
c.Assert(atomic.LoadUint32(&tk.Se.GetSessionVars().Killed), Equals, uint32(20))
c.Assert(failpoint.Disable(exitDone), IsNil)
c.Assert(failpoint.Disable(mockTaskError), IsNil)
c.Assert(failpoint.Disable(mppNonRootTaskError), IsNil)
c.Assert(failpoint.Disable(hang), IsNil)
}
7 changes: 1 addition & 6 deletions store/copr/mpp.go
Original file line number Diff line number Diff line change
Expand Up @@ -241,7 +241,7 @@ func (m *mppIterator) handleDispatchReq(ctx context.Context, bo *tikv.Backoffer,
m.sendError(errors.New(realResp.Error.Msg))
return
}
failpoint.Inject("mppExitFromErrors", func(val failpoint.Value) {
failpoint.Inject("mppNonRootTaskError", func(val failpoint.Value) {
if val.(bool) && !req.IsRoot {
time.Sleep(1 * time.Second)
m.sendError(tikv.ErrTiFlashServerTimeout)
Expand Down Expand Up @@ -350,11 +350,6 @@ func (m *mppIterator) Close() error {
}
m.cancelFunc()
m.wg.Wait()
failpoint.Inject("mppExitsDone", func(val failpoint.Value) {
if val.(bool) {
atomic.StoreUint32(m.vars.Killed, 20)
}
})
return nil
}

Expand Down

0 comments on commit bba1eb9

Please sign in to comment.