Skip to content

Commit

Permalink
disttask: Change rand choose TiDB to round selection TiDB to be an ex…
Browse files Browse the repository at this point in the history
…ecutor (#43440)

close #43439
  • Loading branch information
Benjamin2037 committed Apr 27, 2023
1 parent 0b11444 commit d901924
Show file tree
Hide file tree
Showing 3 changed files with 38 additions and 17 deletions.
6 changes: 5 additions & 1 deletion ddl/scheduler.go
Original file line number Diff line number Diff line change
Expand Up @@ -132,7 +132,11 @@ func (b *backfillSchedulerHandle) SplitSubtask(_ context.Context, subtask []byte
pid := sm.PhysicalTableID
parTbl := b.ptbl.(table.PartitionedTable)

startKey, endKey, err := getTableRange(b.jc, d.ddlCtx, parTbl.GetPartition(pid), b.job.SnapshotVer, b.job.Priority)
currentVer, err1 := getValidCurrentVersion(d.store)
if err1 != nil {
return nil, errors.Trace(err1)
}
startKey, endKey, err := getTableRange(b.jc, d.ddlCtx, parTbl.GetPartition(pid), currentVer.Ver, b.job.Priority)
if err != nil {
logutil.BgLogger().Error("[ddl] get table range error", zap.Error(err))
return nil, err
Expand Down
42 changes: 28 additions & 14 deletions disttask/framework/dispatcher/dispatcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ package dispatcher

import (
"context"
"math/rand"
"fmt"
"time"

"github.com/pingcap/errors"
Expand Down Expand Up @@ -401,9 +401,14 @@ func (d *dispatcher) processNormalFlow(gTask *proto.Task) (err error) {
return nil
}

// Generate all available TiDB nodes for this global tasks.
serverNodes, err1 := GenerateSchedulerNodes(d.ctx)
if err1 != nil {
return err1
}
subTasks := make([]*proto.Subtask, 0, len(metas))
for _, meta := range metas {
instanceID, err := GetEligibleInstance(d.ctx)
for i, meta := range metas {
instanceID, err := GetEligibleInstance(serverNodes, i)
if err != nil {
logutil.BgLogger().Warn("get a eligible instance failed", zap.Int64("gTask ID", gTask.ID), zap.Error(err))
return err
Expand All @@ -414,24 +419,33 @@ func (d *dispatcher) processNormalFlow(gTask *proto.Task) (err error) {
}

// GetEligibleInstance gets an eligible instance.
func GetEligibleInstance(ctx context.Context) (string, error) {
func GetEligibleInstance(serverNodes []*infosync.ServerInfo, pos int) (string, error) {
if pos >= len(serverNodes) && pos < 0 {
errMsg := fmt.Sprintf("available TiDB nodes range is 0 to %d, but request position: %d", len(serverNodes)-1, pos)
return "", errors.New(errMsg)
}
if len(serverNodes) == 0 {
return "", errors.New("no available TiDB node")
}
pos = pos % len(serverNodes)
return serverNodes[pos].ID, nil
}

// GenerateSchedulerNodes generate a eligible TiDB nodes.
func GenerateSchedulerNodes(ctx context.Context) ([]*infosync.ServerInfo, error) {
serverInfos, err := infosync.GetAllServerInfo(ctx)
if err != nil {
return "", err
return nil, err
}
if len(serverInfos) == 0 {
return "", errors.New("not found instance")
return nil, errors.New("not found instance")
}

// TODO: Consider valid instances, and then consider scheduling strategies.
num := rand.Intn(len(serverInfos))
for _, info := range serverInfos {
if num == 0 {
return info.ID, nil
}
num--
serverNodes := make([]*infosync.ServerInfo, 0, len(serverInfos))
for _, serverInfo := range serverInfos {
serverNodes = append(serverNodes, serverInfo)
}
return "", errors.New("not found instance")
return serverNodes, nil
}

// GetAllSchedulerIDs gets all the scheduler IDs.
Expand Down
7 changes: 5 additions & 2 deletions disttask/framework/dispatcher/dispatcher_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,8 @@ func TestGetInstance(t *testing.T) {
// test no server
mockedAllServerInfos := map[string]*infosync.ServerInfo{}
require.NoError(t, failpoint.Enable("github.com/pingcap/tidb/domain/infosync/mockGetAllServerInfo", makeFailpointRes(mockedAllServerInfos)))
instanceID, err := dispatcher.GetEligibleInstance(ctx)
serverNodes, err := dispatcher.GenerateSchedulerNodes(ctx)
instanceID, _ := dispatcher.GetEligibleInstance(serverNodes, 0)
require.Lenf(t, instanceID, 0, "instanceID:%d", instanceID)
require.EqualError(t, err, "not found instance")
instanceIDs, err := dsp.GetAllSchedulerIDs(ctx, 1)
Expand All @@ -89,7 +90,9 @@ func TestGetInstance(t *testing.T) {
},
}
require.NoError(t, failpoint.Enable("github.com/pingcap/tidb/domain/infosync/mockGetAllServerInfo", makeFailpointRes(mockedAllServerInfos)))
instanceID, err = dispatcher.GetEligibleInstance(ctx)
serverNodes, err = dispatcher.GenerateSchedulerNodes(ctx)
require.NoError(t, err)
instanceID, err = dispatcher.GetEligibleInstance(serverNodes, 0)
require.NoError(t, err)
if instanceID != uuids[0] && instanceID != uuids[1] {
require.FailNowf(t, "expected uuids:%d,%d, actual uuid:%d", uuids[0], uuids[1], instanceID)
Expand Down

0 comments on commit d901924

Please sign in to comment.