Skip to content

Commit

Permalink
copr: do not hide original error for MPP query that will not faillbac…
Browse files Browse the repository at this point in the history
…k to TiKV (#29342)
  • Loading branch information
windtalker authored Nov 3, 2021
1 parent d7a6554 commit a0cd121
Show file tree
Hide file tree
Showing 4 changed files with 48 additions and 17 deletions.
3 changes: 2 additions & 1 deletion distsql/distsql.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,8 @@ import (

// DispatchMPPTasks dispatches all tasks and returns an iterator.
func DispatchMPPTasks(ctx context.Context, sctx sessionctx.Context, tasks []*kv.MPPDispatchRequest, fieldTypes []*types.FieldType, planIDs []int, rootID int) (SelectResult, error) {
resp := sctx.GetMPPClient().DispatchMPPTasks(ctx, sctx.GetSessionVars().KVVars, tasks)
_, allowTiFlashFallback := sctx.GetSessionVars().AllowFallbackToTiKV[kv.TiFlash]
resp := sctx.GetMPPClient().DispatchMPPTasks(ctx, sctx.GetSessionVars().KVVars, tasks, allowTiFlashFallback)
if resp == nil {
err := errors.New("client returns nil response")
return nil, err
Expand Down
2 changes: 1 addition & 1 deletion kv/mpp.go
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,7 @@ type MPPClient interface {
ConstructMPPTasks(context.Context, *MPPBuildTasksRequest, map[string]time.Time, time.Duration) ([]MPPTaskMeta, error)

// DispatchMPPTasks dispatches ALL mpp requests at once, and returns an iterator that transfers the data.
DispatchMPPTasks(ctx context.Context, vars interface{}, reqs []*MPPDispatchRequest) Response
DispatchMPPTasks(ctx context.Context, vars interface{}, reqs []*MPPDispatchRequest, needTriggerFallback bool) Response
}

// MPPBuildTasksRequest request the stores allocation for a mpp plan fragment.
Expand Down
11 changes: 11 additions & 0 deletions server/conn_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ import (
"github.com/pingcap/tidb/util/arena"
"github.com/pingcap/tidb/util/chunk"
"github.com/stretchr/testify/require"
tikverr "github.com/tikv/client-go/v2/error"
"github.com/tikv/client-go/v2/testutils"
)

Expand Down Expand Up @@ -858,6 +859,16 @@ func TestTiFlashFallback(t *testing.T) {
require.NoError(t, failpoint.Enable("github.com/pingcap/tidb/store/mockstore/unistore/establishMppConnectionErr", "return(true)"))
testFallbackWork(t, tk, cc, "select * from t t1 join t t2 on t1.a = t2.a")
require.NoError(t, failpoint.Disable("github.com/pingcap/tidb/store/mockstore/unistore/establishMppConnectionErr"))

// When fallback is not set, TiFlash mpp will return the original error message
require.NoError(t, failpoint.Enable("github.com/pingcap/tidb/store/mockstore/unistore/mppDispatchTimeout", "return(true)"))
tk.MustExec("set @@tidb_allow_fallback_to_tikv=''")
tk.MustExec("set @@tidb_allow_mpp=ON")
tk.MustExec("set @@tidb_enforce_mpp=ON")
tk.MustExec("set @@tidb_isolation_read_engines='tiflash,tidb'")
err = cc.handleQuery(ctx, "select count(*) from t")
require.Error(t, err)
require.NotEqual(t, err.Error(), tikverr.ErrTiFlashServerTimeout.Error())
}

func testFallbackWork(t *testing.T, tk *testkit.TestKit, cc *clientConn, sql string) {
Expand Down
49 changes: 34 additions & 15 deletions store/copr/mpp.go
Original file line number Diff line number Diff line change
Expand Up @@ -141,6 +141,8 @@ type mppIterator struct {

vars *tikv.Variables

needTriggerFallback bool

mu sync.Mutex
}

Expand Down Expand Up @@ -236,8 +238,12 @@ func (m *mppIterator) handleDispatchReq(ctx context.Context, bo *Backoffer, req
// That's a hard job but we can try it in the future.
if sender.GetRPCError() != nil {
logutil.BgLogger().Warn("mpp dispatch meet io error", zap.String("error", sender.GetRPCError().Error()), zap.Uint64("timestamp", taskMeta.StartTs), zap.Int64("task", taskMeta.TaskId))
// we return timeout to trigger tikv's fallback
err = derr.ErrTiFlashServerTimeout
// if needTriggerFallback is true, we return timeout to trigger tikv's fallback
if m.needTriggerFallback {
err = derr.ErrTiFlashServerTimeout
} else {
err = sender.GetRPCError()
}
}
} else {
rpcResp, err = m.store.GetTiKVClient().SendRequest(ctx, req.Meta.GetAddress(), wrappedReq, tikv.ReadTimeoutMedium)
Expand All @@ -258,8 +264,11 @@ func (m *mppIterator) handleDispatchReq(ctx context.Context, bo *Backoffer, req

if err != nil {
logutil.BgLogger().Error("mpp dispatch meet error", zap.String("error", err.Error()), zap.Uint64("timestamp", taskMeta.StartTs), zap.Int64("task", taskMeta.TaskId))
// we return timeout to trigger tikv's fallback
m.sendError(derr.ErrTiFlashServerTimeout)
// if needTriggerFallback is true, we return timeout to trigger tikv's fallback
if m.needTriggerFallback {
err = derr.ErrTiFlashServerTimeout
}
m.sendError(err)
return
}

Expand Down Expand Up @@ -345,8 +354,12 @@ func (m *mppIterator) establishMPPConns(bo *Backoffer, req *kv.MPPDispatchReques

if err != nil {
logutil.BgLogger().Warn("establish mpp connection meet error and cannot retry", zap.String("error", err.Error()), zap.Uint64("timestamp", taskMeta.StartTs), zap.Int64("task", taskMeta.TaskId))
// we return timeout to trigger tikv's fallback
m.sendError(derr.ErrTiFlashServerTimeout)
// if needTriggerFallback is true, we return timeout to trigger tikv's fallback
if m.needTriggerFallback {
m.sendError(derr.ErrTiFlashServerTimeout)
} else {
m.sendError(err)
}
return
}

Expand Down Expand Up @@ -378,7 +391,12 @@ func (m *mppIterator) establishMPPConns(bo *Backoffer, req *kv.MPPDispatchReques
logutil.BgLogger().Info("stream unknown error", zap.Error(err), zap.Uint64("timestamp", taskMeta.StartTs), zap.Int64("task", taskMeta.TaskId))
}
}
m.sendError(derr.ErrTiFlashServerTimeout)
// if needTriggerFallback is true, we return timeout to trigger tikv's fallback
if m.needTriggerFallback {
m.sendError(derr.ErrTiFlashServerTimeout)
} else {
m.sendError(err)
}
return
}
}
Expand Down Expand Up @@ -470,17 +488,18 @@ func (m *mppIterator) Next(ctx context.Context) (kv.ResultSubset, error) {
}

// DispatchMPPTasks dispatches all the mpp task and waits for the responses.
func (c *MPPClient) DispatchMPPTasks(ctx context.Context, variables interface{}, dispatchReqs []*kv.MPPDispatchRequest) kv.Response {
func (c *MPPClient) DispatchMPPTasks(ctx context.Context, variables interface{}, dispatchReqs []*kv.MPPDispatchRequest, needTriggerFallback bool) kv.Response {
vars := variables.(*tikv.Variables)
ctxChild, cancelFunc := context.WithCancel(ctx)
iter := &mppIterator{
store: c.store,
tasks: dispatchReqs,
finishCh: make(chan struct{}),
cancelFunc: cancelFunc,
respChan: make(chan *mppResponse, 4096),
startTs: dispatchReqs[0].StartTs,
vars: vars,
store: c.store,
tasks: dispatchReqs,
finishCh: make(chan struct{}),
cancelFunc: cancelFunc,
respChan: make(chan *mppResponse, 4096),
startTs: dispatchReqs[0].StartTs,
vars: vars,
needTriggerFallback: needTriggerFallback,
}
go iter.run(ctxChild)
return iter
Expand Down

0 comments on commit a0cd121

Please sign in to comment.