From 58d1bd97c6cd6bb6862b0d15beed83d678210e69 Mon Sep 17 00:00:00 2001 From: ti-srebot <66930949+ti-srebot@users.noreply.github.com> Date: Thu, 14 Apr 2022 17:28:36 +0800 Subject: [PATCH] mpp: Fix the crash or error when mpp generate empty task list. (#31658) (#31696) close pingcap/tidb#31636 --- distsql/distsql.go | 4 +-- executor/mpp_gather.go | 2 +- kv/mpp.go | 2 +- planner/core/fragment.go | 5 +--- store/copr/batch_coprocessor.go | 5 +++- store/copr/batch_coprocessor_test.go | 37 ++++++++++++++++++++++++++++ store/copr/mpp.go | 4 +-- 7 files changed, 48 insertions(+), 11 deletions(-) create mode 100644 store/copr/batch_coprocessor_test.go diff --git a/distsql/distsql.go b/distsql/distsql.go index 9cac775ff037d..66761d3899c7f 100644 --- a/distsql/distsql.go +++ b/distsql/distsql.go @@ -32,8 +32,8 @@ import ( ) // DispatchMPPTasks dispatches all tasks and returns an iterator. -func DispatchMPPTasks(ctx context.Context, sctx sessionctx.Context, tasks []*kv.MPPDispatchRequest, fieldTypes []*types.FieldType, planIDs []int, rootID int) (SelectResult, error) { - resp := sctx.GetMPPClient().DispatchMPPTasks(ctx, sctx.GetSessionVars().KVVars, tasks) +func DispatchMPPTasks(ctx context.Context, sctx sessionctx.Context, tasks []*kv.MPPDispatchRequest, fieldTypes []*types.FieldType, planIDs []int, rootID int, startTs uint64) (SelectResult, error) { + resp := sctx.GetMPPClient().DispatchMPPTasks(ctx, sctx.GetSessionVars().KVVars, tasks, startTs) if resp == nil { err := errors.New("client returns nil response") return nil, err diff --git a/executor/mpp_gather.go b/executor/mpp_gather.go index e517a0130ec4f..3a3bba004681d 100644 --- a/executor/mpp_gather.go +++ b/executor/mpp_gather.go @@ -117,7 +117,7 @@ func (e *MPPGather) Open(ctx context.Context) (err error) { failpoint.Return(errors.Errorf("The number of tasks is not right, expect %d tasks but actually there are %d tasks", val.(int), len(e.mppReqs))) } }) - e.respIter, err = distsql.DispatchMPPTasks(ctx, e.ctx, e.mppReqs, e.retFieldTypes, planIDs, e.id) + e.respIter, err = distsql.DispatchMPPTasks(ctx, e.ctx, e.mppReqs, e.retFieldTypes, planIDs, e.id, e.startTS) if err != nil { return errors.Trace(err) } diff --git a/kv/mpp.go b/kv/mpp.go index c3965831ae05b..22dc7201a17c4 100644 --- a/kv/mpp.go +++ b/kv/mpp.go @@ -80,7 +80,7 @@ type MPPClient interface { ConstructMPPTasks(context.Context, *MPPBuildTasksRequest, map[string]time.Time, time.Duration) ([]MPPTaskMeta, error) // DispatchMPPTasks dispatches ALL mpp requests at once, and returns an iterator that transfers the data. - DispatchMPPTasks(ctx context.Context, vars interface{}, reqs []*MPPDispatchRequest) Response + DispatchMPPTasks(ctx context.Context, vars interface{}, reqs []*MPPDispatchRequest, startTs uint64) Response } // MPPBuildTasksRequest request the stores allocation for a mpp plan fragment. diff --git a/planner/core/fragment.go b/planner/core/fragment.go index 4c53260900670..428d21ca38ce0 100644 --- a/planner/core/fragment.go +++ b/planner/core/fragment.go @@ -257,7 +257,7 @@ func (e *mppTaskGenerator) generateMPPTasksForFragment(f *Fragment) (tasks []*kv for _, r := range f.ExchangeReceivers { childrenTasks = append(childrenTasks, r.Tasks...) } - if f.singleton { + if f.singleton && len(childrenTasks) > 0 { childrenTasks = childrenTasks[0:1] } tasks = e.constructMPPTasksByChildrenTasks(childrenTasks) @@ -265,9 +265,6 @@ func (e *mppTaskGenerator) generateMPPTasksForFragment(f *Fragment) (tasks []*kv if err != nil { return nil, errors.Trace(err) } - if len(tasks) == 0 { - return nil, errors.New("cannot find mpp task") - } for _, r := range f.ExchangeReceivers { for _, frag := range r.frags { frag.ExchangeSender.TargetTasks = append(frag.ExchangeSender.TargetTasks, tasks...) diff --git a/store/copr/batch_coprocessor.go b/store/copr/batch_coprocessor.go index 54c6e2676b85d..2ac84fdaabe0f 100644 --- a/store/copr/batch_coprocessor.go +++ b/store/copr/batch_coprocessor.go @@ -104,6 +104,10 @@ func (rs *batchCopResponse) RespTime() time.Duration { // if there is only 1 available store, then put the region to the related store // otherwise, use a greedy algorithm to put it into the store with highest weight func balanceBatchCopTask(ctx context.Context, kvStore *kvStore, originalTasks []*batchCopTask, mppStoreLastFailTime map[string]time.Time, ttl time.Duration) []*batchCopTask { + if len(originalTasks) == 0 { + log.Info("Batch cop task balancer got an empty task set.") + return originalTasks + } isMPP := mppStoreLastFailTime != nil // for mpp, we still need to detect the store availability if len(originalTasks) <= 1 && !isMPP { @@ -317,7 +321,6 @@ func buildBatchCopTasks(bo *backoff.Backoffer, store *kvStore, ranges *KeyRanges const cmdType = tikvrpc.CmdBatchCop rangesLen := ranges.Len() for { - locations, err := cache.SplitKeyRangesByLocations(bo, ranges) if err != nil { return nil, errors.Trace(err) diff --git a/store/copr/batch_coprocessor_test.go b/store/copr/batch_coprocessor_test.go new file mode 100644 index 0000000000000..3164006ba1292 --- /dev/null +++ b/store/copr/batch_coprocessor_test.go @@ -0,0 +1,37 @@ +// Copyright 2016 PingCAP, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package copr + +import ( + "testing" + "time" + + "github.com/stretchr/testify/require" +) + +func TestBalanceBatchCopTaskWithEmptyTaskSet(t *testing.T) { + { + var nilTaskSet []*batchCopTask + nilResult := balanceBatchCopTask(nil, nil, nilTaskSet, nil, time.Second) + require.True(t, nilResult == nil) + } + + { + emptyTaskSet := make([]*batchCopTask, 0) + emptyResult := balanceBatchCopTask(nil, nil, emptyTaskSet, nil, time.Second) + require.True(t, emptyResult != nil) + require.True(t, len(emptyResult) == 0) + } +} diff --git a/store/copr/mpp.go b/store/copr/mpp.go index 1b8736d0072df..25c02fedbd29c 100644 --- a/store/copr/mpp.go +++ b/store/copr/mpp.go @@ -457,7 +457,7 @@ func (m *mppIterator) Next(ctx context.Context) (kv.ResultSubset, error) { } // DispatchMPPTasks dispatches all the mpp task and waits for the responses. -func (c *MPPClient) DispatchMPPTasks(ctx context.Context, variables interface{}, dispatchReqs []*kv.MPPDispatchRequest) kv.Response { +func (c *MPPClient) DispatchMPPTasks(ctx context.Context, variables interface{}, dispatchReqs []*kv.MPPDispatchRequest, startTs uint64) kv.Response { vars := variables.(*tikv.Variables) ctxChild, cancelFunc := context.WithCancel(ctx) iter := &mppIterator{ @@ -466,7 +466,7 @@ func (c *MPPClient) DispatchMPPTasks(ctx context.Context, variables interface{}, finishCh: make(chan struct{}), cancelFunc: cancelFunc, respChan: make(chan *mppResponse, 4096), - startTs: dispatchReqs[0].StartTs, + startTs: startTs, vars: vars, } go iter.run(ctxChild)