Skip to content

Commit

Permalink
mpp: Fix the crash or error when mpp generate empty task list. (#31658)…
Browse files Browse the repository at this point in the history
… (#31696)

close #31636
  • Loading branch information
ti-srebot authored Apr 14, 2022
1 parent d4ea430 commit 58d1bd9
Show file tree
Hide file tree
Showing 7 changed files with 48 additions and 11 deletions.
4 changes: 2 additions & 2 deletions distsql/distsql.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,8 +32,8 @@ import (
)

// DispatchMPPTasks dispatches all tasks and returns an iterator.
func DispatchMPPTasks(ctx context.Context, sctx sessionctx.Context, tasks []*kv.MPPDispatchRequest, fieldTypes []*types.FieldType, planIDs []int, rootID int) (SelectResult, error) {
resp := sctx.GetMPPClient().DispatchMPPTasks(ctx, sctx.GetSessionVars().KVVars, tasks)
func DispatchMPPTasks(ctx context.Context, sctx sessionctx.Context, tasks []*kv.MPPDispatchRequest, fieldTypes []*types.FieldType, planIDs []int, rootID int, startTs uint64) (SelectResult, error) {
resp := sctx.GetMPPClient().DispatchMPPTasks(ctx, sctx.GetSessionVars().KVVars, tasks, startTs)
if resp == nil {
err := errors.New("client returns nil response")
return nil, err
Expand Down
2 changes: 1 addition & 1 deletion executor/mpp_gather.go
Original file line number Diff line number Diff line change
Expand Up @@ -117,7 +117,7 @@ func (e *MPPGather) Open(ctx context.Context) (err error) {
failpoint.Return(errors.Errorf("The number of tasks is not right, expect %d tasks but actually there are %d tasks", val.(int), len(e.mppReqs)))
}
})
e.respIter, err = distsql.DispatchMPPTasks(ctx, e.ctx, e.mppReqs, e.retFieldTypes, planIDs, e.id)
e.respIter, err = distsql.DispatchMPPTasks(ctx, e.ctx, e.mppReqs, e.retFieldTypes, planIDs, e.id, e.startTS)
if err != nil {
return errors.Trace(err)
}
Expand Down
2 changes: 1 addition & 1 deletion kv/mpp.go
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,7 @@ type MPPClient interface {
ConstructMPPTasks(context.Context, *MPPBuildTasksRequest, map[string]time.Time, time.Duration) ([]MPPTaskMeta, error)

// DispatchMPPTasks dispatches ALL mpp requests at once, and returns an iterator that transfers the data.
DispatchMPPTasks(ctx context.Context, vars interface{}, reqs []*MPPDispatchRequest) Response
DispatchMPPTasks(ctx context.Context, vars interface{}, reqs []*MPPDispatchRequest, startTs uint64) Response
}

// MPPBuildTasksRequest request the stores allocation for a mpp plan fragment.
Expand Down
5 changes: 1 addition & 4 deletions planner/core/fragment.go
Original file line number Diff line number Diff line change
Expand Up @@ -257,17 +257,14 @@ func (e *mppTaskGenerator) generateMPPTasksForFragment(f *Fragment) (tasks []*kv
for _, r := range f.ExchangeReceivers {
childrenTasks = append(childrenTasks, r.Tasks...)
}
if f.singleton {
if f.singleton && len(childrenTasks) > 0 {
childrenTasks = childrenTasks[0:1]
}
tasks = e.constructMPPTasksByChildrenTasks(childrenTasks)
}
if err != nil {
return nil, errors.Trace(err)
}
if len(tasks) == 0 {
return nil, errors.New("cannot find mpp task")
}
for _, r := range f.ExchangeReceivers {
for _, frag := range r.frags {
frag.ExchangeSender.TargetTasks = append(frag.ExchangeSender.TargetTasks, tasks...)
Expand Down
5 changes: 4 additions & 1 deletion store/copr/batch_coprocessor.go
Original file line number Diff line number Diff line change
Expand Up @@ -104,6 +104,10 @@ func (rs *batchCopResponse) RespTime() time.Duration {
// if there is only 1 available store, then put the region to the related store
// otherwise, use a greedy algorithm to put it into the store with highest weight
func balanceBatchCopTask(ctx context.Context, kvStore *kvStore, originalTasks []*batchCopTask, mppStoreLastFailTime map[string]time.Time, ttl time.Duration) []*batchCopTask {
if len(originalTasks) == 0 {
log.Info("Batch cop task balancer got an empty task set.")
return originalTasks
}
isMPP := mppStoreLastFailTime != nil
// for mpp, we still need to detect the store availability
if len(originalTasks) <= 1 && !isMPP {
Expand Down Expand Up @@ -317,7 +321,6 @@ func buildBatchCopTasks(bo *backoff.Backoffer, store *kvStore, ranges *KeyRanges
const cmdType = tikvrpc.CmdBatchCop
rangesLen := ranges.Len()
for {

locations, err := cache.SplitKeyRangesByLocations(bo, ranges)
if err != nil {
return nil, errors.Trace(err)
Expand Down
37 changes: 37 additions & 0 deletions store/copr/batch_coprocessor_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
// Copyright 2016 PingCAP, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package copr

import (
"testing"
"time"

"github.com/stretchr/testify/require"
)

func TestBalanceBatchCopTaskWithEmptyTaskSet(t *testing.T) {
{
var nilTaskSet []*batchCopTask
nilResult := balanceBatchCopTask(nil, nil, nilTaskSet, nil, time.Second)
require.True(t, nilResult == nil)
}

{
emptyTaskSet := make([]*batchCopTask, 0)
emptyResult := balanceBatchCopTask(nil, nil, emptyTaskSet, nil, time.Second)
require.True(t, emptyResult != nil)
require.True(t, len(emptyResult) == 0)
}
}
4 changes: 2 additions & 2 deletions store/copr/mpp.go
Original file line number Diff line number Diff line change
Expand Up @@ -457,7 +457,7 @@ func (m *mppIterator) Next(ctx context.Context) (kv.ResultSubset, error) {
}

// DispatchMPPTasks dispatches all the mpp task and waits for the responses.
func (c *MPPClient) DispatchMPPTasks(ctx context.Context, variables interface{}, dispatchReqs []*kv.MPPDispatchRequest) kv.Response {
func (c *MPPClient) DispatchMPPTasks(ctx context.Context, variables interface{}, dispatchReqs []*kv.MPPDispatchRequest, startTs uint64) kv.Response {
vars := variables.(*tikv.Variables)
ctxChild, cancelFunc := context.WithCancel(ctx)
iter := &mppIterator{
Expand All @@ -466,7 +466,7 @@ func (c *MPPClient) DispatchMPPTasks(ctx context.Context, variables interface{},
finishCh: make(chan struct{}),
cancelFunc: cancelFunc,
respChan: make(chan *mppResponse, 4096),
startTs: dispatchReqs[0].StartTs,
startTs: startTs,
vars: vars,
}
go iter.run(ctxChild)
Expand Down

0 comments on commit 58d1bd9

Please sign in to comment.