Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

MPP: Kill mpp queries #23056

Merged
merged 21 commits into from
Mar 12, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion distsql/distsql.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ import (

// DispatchMPPTasks dispathes all tasks and returns an iterator.
func DispatchMPPTasks(ctx context.Context, sctx sessionctx.Context, tasks []*kv.MPPDispatchRequest, fieldTypes []*types.FieldType, planIDs []int, rootID int) (SelectResult, error) {
resp := sctx.GetMPPClient().DispatchMPPTasks(ctx, tasks)
resp := sctx.GetMPPClient().DispatchMPPTasks(ctx, sctx.GetSessionVars().KVVars, tasks)
if resp == nil {
err := errors.New("client returns nil response")
return nil, err
Expand Down
1 change: 1 addition & 0 deletions executor/mpp_gather.go
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,7 @@ func (e *MPPGather) appendMPPDispatchReq(pf *plannercore.Fragment, tasks []*kv.M
Timeout: 10,
SchemaVar: e.is.SchemaMetaVersion(),
StartTs: e.startTS,
State: kv.MppTaskReady,
}
e.mppReqs = append(e.mppReqs, req)
}
Expand Down
79 changes: 79 additions & 0 deletions executor/tiflash_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,18 +15,25 @@ package executor_test

import (
"fmt"
"sync"
"sync/atomic"
"time"

. "github.com/pingcap/check"
"github.com/pingcap/errors"
"github.com/pingcap/failpoint"
"github.com/pingcap/kvproto/pkg/metapb"
"github.com/pingcap/parser"
"github.com/pingcap/parser/terror"
"github.com/pingcap/tidb/domain"
"github.com/pingcap/tidb/executor"
"github.com/pingcap/tidb/kv"
"github.com/pingcap/tidb/session"
"github.com/pingcap/tidb/store/mockstore"
"github.com/pingcap/tidb/store/mockstore/unistore"
"github.com/pingcap/tidb/store/tikv/mockstore/cluster"
"github.com/pingcap/tidb/util/testkit"
"github.com/pingcap/tidb/util/testleak"
)

type tiflashTestSuite struct {
Expand All @@ -36,6 +43,7 @@ type tiflashTestSuite struct {
}

func (s *tiflashTestSuite) SetUpSuite(c *C) {
testleak.BeforeTest()
var err error
s.store, err = mockstore.NewMockStore(
mockstore.WithClusterInspector(func(c cluster.Cluster) {
Expand Down Expand Up @@ -271,3 +279,74 @@ func (s *tiflashTestSuite) TestPartitionTable(c *C) {
failpoint.Disable("github.com/pingcap/tidb/executor/checkTotalMPPTasks")
failpoint.Disable("github.com/pingcap/tidb/executor/checkUseMPP")
}

func (s *tiflashTestSuite) TestCancelMppTasks(c *C) {
var hang = "github.com/pingcap/tidb/store/mockstore/unistore/mppRecvHang"
tk := testkit.NewTestKit(c, s.store)
tk.MustExec("use test")
tk.MustExec("drop table if exists t")
tk.MustExec("create table t(a int not null primary key, b int not null)")
tk.MustExec("alter table t set tiflash replica 1")
tk.MustExec("insert into t values(1,0)")
tk.MustExec("insert into t values(2,0)")
tk.MustExec("insert into t values(3,0)")
tk.MustExec("insert into t values(4,0)")
tb := testGetTableByName(c, tk.Se, "test", "t")
err := domain.GetDomain(tk.Se).DDL().UpdateTableReplicaInfo(tk.Se, tb.Meta().ID, true)
c.Assert(err, IsNil)
tk.MustExec("set @@session.tidb_isolation_read_engines=\"tiflash\"")
tk.MustExec("set @@session.tidb_allow_mpp=ON")
atomic.StoreUint32(&tk.Se.GetSessionVars().Killed, 0)
c.Assert(failpoint.Enable(hang, `return(true)`), IsNil)
wg := &sync.WaitGroup{}
wg.Add(1)
go func() {
defer wg.Done()
err := tk.QueryToErr("select count(*) from t as t1 , t where t1.a = t.a")
c.Assert(err, NotNil)
c.Assert(int(terror.ToSQLError(errors.Cause(err).(*terror.Error)).Code), Equals, int(executor.ErrQueryInterrupted.Code()))
}()
time.Sleep(1 * time.Second)
fzhedu marked this conversation as resolved.
Show resolved Hide resolved
atomic.StoreUint32(&tk.Se.GetSessionVars().Killed, 1)
wg.Wait()
c.Assert(failpoint.Disable(hang), IsNil)
}

// all goroutines exit if one goroutine hangs but another return errors
func (s *tiflashTestSuite) TestMppGoroutinesExitFromErrors(c *C) {
defer testleak.AfterTest(c)()
// mock non-root tasks return error
var mppNonRootTaskError = "github.com/pingcap/tidb/store/copr/mppNonRootTaskError"
// mock root tasks hang
var hang = "github.com/pingcap/tidb/store/mockstore/unistore/mppRecvHang"
tk := testkit.NewTestKit(c, s.store)
tk.MustExec("use test")
tk.MustExec("drop table if exists t")
tk.MustExec("create table t(a int not null primary key, b int not null)")
tk.MustExec("alter table t set tiflash replica 1")
tb := testGetTableByName(c, tk.Se, "test", "t")
err := domain.GetDomain(tk.Se).DDL().UpdateTableReplicaInfo(tk.Se, tb.Meta().ID, true)
c.Assert(err, IsNil)
tk.MustExec("insert into t values(1,0)")
tk.MustExec("insert into t values(2,0)")
tk.MustExec("insert into t values(3,0)")
tk.MustExec("drop table if exists t1")
tk.MustExec("create table t1(a int not null primary key, b int not null)")
tk.MustExec("alter table t1 set tiflash replica 1")
tb = testGetTableByName(c, tk.Se, "test", "t1")
err = domain.GetDomain(tk.Se).DDL().UpdateTableReplicaInfo(tk.Se, tb.Meta().ID, true)
c.Assert(err, IsNil)
tk.MustExec("insert into t1 values(1,0)")
tk.MustExec("insert into t1 values(2,0)")
tk.MustExec("insert into t1 values(3,0)")
tk.MustExec("set @@session.tidb_isolation_read_engines=\"tiflash\"")
tk.MustExec("set @@session.tidb_allow_mpp=ON")
c.Assert(failpoint.Enable(mppNonRootTaskError, `return(true)`), IsNil)
c.Assert(failpoint.Enable(hang, `return(true)`), IsNil)

// generate 2 root tasks, one will hang and another will return errors
err = tk.QueryToErr("select count(*) from t as t1 , t where t1.a = t.a")
c.Assert(err, NotNil)
c.Assert(failpoint.Disable(mppNonRootTaskError), IsNil)
c.Assert(failpoint.Disable(hang), IsNil)
}
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ require (
github.com/kr/text v0.2.0 // indirect
github.com/ngaut/pools v0.0.0-20180318154953-b7bc8c42aac7
github.com/ngaut/sync2 v0.0.0-20141008032647-7a24ed77b2ef
github.com/ngaut/unistore v0.0.0-20210304095907-0ebafaf44efb
github.com/ngaut/unistore v0.0.0-20210310131351-7ad6a204de87
github.com/niemeyer/pretty v0.0.0-20200227124842-a10e7caefd8e // indirect
github.com/opentracing/basictracer-go v1.0.0
github.com/opentracing/opentracing-go v1.1.0
Expand Down
5 changes: 3 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -353,8 +353,8 @@ github.com/ngaut/pools v0.0.0-20180318154953-b7bc8c42aac7 h1:7KAv7KMGTTqSmYZtNdc
github.com/ngaut/pools v0.0.0-20180318154953-b7bc8c42aac7/go.mod h1:iWMfgwqYW+e8n5lC/jjNEhwcjbRDpl5NT7n2h+4UNcI=
github.com/ngaut/sync2 v0.0.0-20141008032647-7a24ed77b2ef h1:K0Fn+DoFqNqktdZtdV3bPQ/0cuYh2H4rkg0tytX/07k=
github.com/ngaut/sync2 v0.0.0-20141008032647-7a24ed77b2ef/go.mod h1:7WjlapSfwQyo6LNmIvEWzsW1hbBQfpUO4JWnuQRmva8=
github.com/ngaut/unistore v0.0.0-20210304095907-0ebafaf44efb h1:2rGvEhflp/uK1l1rNUmoHA4CiHpbddHGxg52H71Fke8=
github.com/ngaut/unistore v0.0.0-20210304095907-0ebafaf44efb/go.mod h1:ZR3NH+HzqfiYetwdoAivApnIy8iefPZHTMLfrFNm8g4=
github.com/ngaut/unistore v0.0.0-20210310131351-7ad6a204de87 h1:lVRrhmqIT2zMbmoalrgxQLwWzFd3VtFaaWy0fnMwPro=
github.com/ngaut/unistore v0.0.0-20210310131351-7ad6a204de87/go.mod h1:ZR3NH+HzqfiYetwdoAivApnIy8iefPZHTMLfrFNm8g4=
github.com/nicksnyder/go-i18n v1.10.0/go.mod h1:HrK7VCrbOvQoUAQ7Vpy7i87N7JZZZ7R2xBGjv0j365Q=
github.com/niemeyer/pretty v0.0.0-20200227124842-a10e7caefd8e h1:fD57ERR4JtEqsWbfPhv4DMiApHyliiK5xCTNVSPiaAs=
github.com/niemeyer/pretty v0.0.0-20200227124842-a10e7caefd8e/go.mod h1:zD1mROLANZcx1PVRCS0qkT7pwLkGfwJo4zjcN/Tysno=
Expand Down Expand Up @@ -844,6 +844,7 @@ honnef.co/go/tools v0.0.0-20190102054323-c2f93a96b099/go.mod h1:rf3lG4BRIbNafJWh
honnef.co/go/tools v0.0.0-20190106161140-3f1c8253044a/go.mod h1:rf3lG4BRIbNafJWhAfAdb/ePZxsR/4RtNHQocxwk9r4=
honnef.co/go/tools v0.0.0-20190418001031-e561f6794a2a/go.mod h1:rf3lG4BRIbNafJWhAfAdb/ePZxsR/4RtNHQocxwk9r4=
honnef.co/go/tools v0.0.0-20190523083050-ea95bdfd59fc/go.mod h1:rf3lG4BRIbNafJWhAfAdb/ePZxsR/4RtNHQocxwk9r4=
honnef.co/go/tools v0.0.1-2019.2.3 h1:3JgtbtFHMiCmsznwGVTUWbgGov+pVqnlf1dEJTNAXeM=
honnef.co/go/tools v0.0.1-2019.2.3/go.mod h1:a3bituU0lyd329TUQxRnasdCoJDkEUEAqEt0JzvZhAg=
honnef.co/go/tools v0.1.3 h1:qTakTkI6ni6LFD5sBwwsdSO+AQqbSIxOauHTTQKZ/7o=
honnef.co/go/tools v0.1.3/go.mod h1:NgwopIslSNH47DimFoV78dnkksY2EFtX0ajyb3K/las=
Expand Down
17 changes: 16 additions & 1 deletion kv/mpp.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,20 @@ func (t *MPPTask) ToPB() *mpp.TaskMeta {
return meta
}

//MppTaskStates denotes the state of mpp tasks
type MppTaskStates uint8

const (
// MppTaskReady means the task is ready
MppTaskReady MppTaskStates = iota
// MppTaskRunning means the task is running
MppTaskRunning
// MppTaskCancelled means the task is cancelled
MppTaskCancelled
// MppTaskDone means the task is done
MppTaskDone
)

// MPPDispatchRequest stands for a dispatching task.
type MPPDispatchRequest struct {
Data []byte // data encodes the dag coprocessor request.
Expand All @@ -55,6 +69,7 @@ type MPPDispatchRequest struct {
SchemaVar int64
StartTs uint64
ID int64 // identify a single task
State MppTaskStates
}

// MPPClient accepts and processes mpp requests.
Expand All @@ -64,7 +79,7 @@ type MPPClient interface {
ConstructMPPTasks(context.Context, *MPPBuildTasksRequest) ([]MPPTaskMeta, error)

// DispatchMPPTasks dispatches ALL mpp requests at once, and returns an iterator that transfers the data.
DispatchMPPTasks(context.Context, []*MPPDispatchRequest) Response
DispatchMPPTasks(context.Context, *Variables, []*MPPDispatchRequest) Response
}

// MPPBuildTasksRequest request the stores allocation for a mpp plan fragment.
Expand Down
98 changes: 78 additions & 20 deletions store/copr/mpp.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import (
"time"

"github.com/pingcap/errors"
"github.com/pingcap/failpoint"
"github.com/pingcap/kvproto/pkg/coprocessor"
"github.com/pingcap/kvproto/pkg/kvrpcpb"
"github.com/pingcap/kvproto/pkg/metapb"
Expand Down Expand Up @@ -123,15 +124,31 @@ type mppIterator struct {

respChan chan *mppResponse

rpcCancel *tikv.RPCCanceller
cancelFunc context.CancelFunc

wg sync.WaitGroup

closed uint32

vars *kv.Variables

mu sync.Mutex
}

func (m *mppIterator) run(ctx context.Context) {
for _, task := range m.tasks {
if atomic.LoadUint32(&m.closed) == 1 {
break
}
m.mu.Lock()
switch task.State {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

state should be atomic type so that we don't have to use mutex

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

the mutex has another work: ensure the cancelMppTasks juste being called exclusively. besides, there are not atomic enum type.

case kv.MppTaskReady:
task.State = kv.MppTaskRunning
m.mu.Unlock()
default:
m.mu.Unlock()
break
}
m.wg.Add(1)
bo := tikv.NewBackoffer(ctx, copNextMaxBackoff)
go m.handleDispatchReq(ctx, bo, task)
Expand All @@ -142,6 +159,7 @@ func (m *mppIterator) run(ctx context.Context) {

func (m *mppIterator) sendError(err error) {
m.sendToRespCh(&mppResponse{err: err})
m.cancelMppTasks()
}

func (m *mppIterator) sendToRespCh(resp *mppResponse) (exit bool) {
Expand Down Expand Up @@ -223,14 +241,53 @@ func (m *mppIterator) handleDispatchReq(ctx context.Context, bo *tikv.Backoffer,
m.sendError(errors.New(realResp.Error.Msg))
return
}

failpoint.Inject("mppNonRootTaskError", func(val failpoint.Value) {
if val.(bool) && !req.IsRoot {
time.Sleep(1 * time.Second)
m.sendError(tikv.ErrTiFlashServerTimeout)
return
}
})
if !req.IsRoot {
return
}

m.establishMPPConns(bo, req, taskMeta)
}

// NOTE: We do not retry here, because retry is helpless when errors result from TiFlash or Network. If errors occur, the execution on TiFlash will finally stop after some minutes.
// This function is exclusively called, and only the first call succeeds sending tasks and setting all tasks as cancelled, while others will not work.
func (m *mppIterator) cancelMppTasks() {
m.mu.Lock()
defer m.mu.Unlock()
killReq := &mpp.CancelTaskRequest{
Meta: &mpp.TaskMeta{StartTs: m.startTs},
}

wrappedReq := tikvrpc.NewRequest(tikvrpc.CmdMPPCancel, killReq, kvrpcpb.Context{})
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As we discussed, put it inside the iter so that avoid redundant cancel.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

here I use another way as described on the top of this func.

wrappedReq.StoreTp = kv.TiFlash

usedStoreAddrs := make(map[string]bool)
for _, task := range m.tasks {
// get the store address of running tasks
if task.State == kv.MppTaskRunning && !usedStoreAddrs[task.Meta.GetAddress()] {
usedStoreAddrs[task.Meta.GetAddress()] = true
} else if task.State == kv.MppTaskCancelled {
return
}
task.State = kv.MppTaskCancelled
}

// send cancel cmd to all stores where tasks run
for addr := range usedStoreAddrs {
_, err := m.store.GetTiKVClient().SendRequest(context.Background(), addr, wrappedReq, tikv.ReadTimeoutUltraLong)
logutil.BgLogger().Debug("cancel task ", zap.Uint64("query id ", m.startTs), zap.String(" on addr ", addr))
if err != nil {
logutil.BgLogger().Error("cancel task error: ", zap.Error(err), zap.Uint64(" for query id ", m.startTs), zap.String(" on addr ", addr))
}
}
}

func (m *mppIterator) establishMPPConns(bo *tikv.Backoffer, req *kv.MPPDispatchRequest, taskMeta *mpp.TaskMeta) {
connReq := &mpp.EstablishMPPConnectionRequest{
SenderMeta: taskMeta,
Expand Down Expand Up @@ -260,13 +317,13 @@ func (m *mppIterator) establishMPPConns(bo *tikv.Backoffer, req *kv.MPPDispatchR
return
}

// TODO: cancel the whole process when some error happens
for {
err := m.handleMPPStreamResponse(bo, resp, req)
if err != nil {
m.sendError(err)
return
}

resp, err = stream.Recv()
if err != nil {
if errors.Cause(err) == io.EOF {
Expand All @@ -280,9 +337,7 @@ func (m *mppIterator) establishMPPConns(bo *tikv.Backoffer, req *kv.MPPDispatchR
logutil.BgLogger().Info("stream unknown error", zap.Error(err))
}
}
m.sendToRespCh(&mppResponse{
err: tikv.ErrTiFlashServerTimeout,
})
m.sendError(tikv.ErrTiFlashServerTimeout)
return
}
}
Expand All @@ -293,7 +348,7 @@ func (m *mppIterator) Close() error {
if atomic.CompareAndSwapUint32(&m.closed, 0, 1) {
close(m.finishCh)
}
m.rpcCancel.CancelAll()
m.cancelFunc()
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Test this logic: two root tasks , one returns error and the other hangs. Without kill, we can end the hanging goroutine by just closing the resources correctly.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

added

m.wg.Wait()
return nil
}
Expand Down Expand Up @@ -336,7 +391,11 @@ func (m *mppIterator) nextImpl(ctx context.Context) (resp *mppResponse, ok bool,
case resp, ok = <-m.respChan:
return
case <-ticker.C:
//TODO: kill query
if m.vars != nil && m.vars.Killed != nil && atomic.LoadUint32(m.vars.Killed) == 1 {
err = tikv.ErrQueryInterrupted
exit = true
return
}
case <-m.finishCh:
exit = true
return
Expand Down Expand Up @@ -370,19 +429,18 @@ func (m *mppIterator) Next(ctx context.Context) (kv.ResultSubset, error) {
return resp, nil
}

// DispatchMPPTasks dispatches all the mpp task and waits for the reponses.
func (c *MPPClient) DispatchMPPTasks(ctx context.Context, dispatchReqs []*kv.MPPDispatchRequest) kv.Response {
// DispatchMPPTasks dispatches all the mpp task and waits for the responses.
func (c *MPPClient) DispatchMPPTasks(ctx context.Context, vars *kv.Variables, dispatchReqs []*kv.MPPDispatchRequest) kv.Response {
ctxChild, cancelFunc := context.WithCancel(ctx)
iter := &mppIterator{
store: c.store,
tasks: dispatchReqs,
finishCh: make(chan struct{}),
rpcCancel: tikv.NewRPCanceller(),
respChan: make(chan *mppResponse, 4096),
startTs: dispatchReqs[0].StartTs,
store: c.store,
tasks: dispatchReqs,
finishCh: make(chan struct{}),
cancelFunc: cancelFunc,
respChan: make(chan *mppResponse, 4096),
startTs: dispatchReqs[0].StartTs,
vars: vars,
}
ctx = context.WithValue(ctx, tikv.RPCCancellerCtxKey{}, iter.rpcCancel)

// TODO: Process the case of query cancellation.
go iter.run(ctx)
go iter.run(ctxChild)
return iter
}
2 changes: 2 additions & 0 deletions store/mockstore/unistore/cophandler/cop_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ package cophandler

import (
"bytes"
"context"
"fmt"
"time"

Expand Down Expand Up @@ -46,6 +47,7 @@ type MPPCtx struct {
RPCClient client.Client
StoreAddr string
TaskHandler *MPPTaskHandler
Ctx context.Context
}

// HandleCopRequest handles coprocessor request.
Expand Down
Loading