Skip to content

Commit

Permalink
disttask: refine manager retry (#51071)
Browse files Browse the repository at this point in the history
ref #49008
  • Loading branch information
ywqzzy authored Feb 26, 2024
1 parent 6af4bba commit f35778f
Show file tree
Hide file tree
Showing 3 changed files with 36 additions and 47 deletions.
1 change: 1 addition & 0 deletions pkg/disttask/framework/taskexecutor/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,7 @@ go_test(
"//pkg/disttask/framework/mock",
"//pkg/disttask/framework/mock/execute",
"//pkg/disttask/framework/proto",
"//pkg/disttask/framework/scheduler",
"//pkg/disttask/framework/storage",
"//pkg/disttask/framework/testutil",
"//pkg/kv",
Expand Down
75 changes: 31 additions & 44 deletions pkg/disttask/framework/taskexecutor/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,10 +24,13 @@ import (
"github.com/pingcap/errors"
"github.com/pingcap/log"
"github.com/pingcap/tidb/pkg/config"
"github.com/pingcap/tidb/pkg/disttask/framework/handle"
"github.com/pingcap/tidb/pkg/disttask/framework/proto"
"github.com/pingcap/tidb/pkg/disttask/framework/scheduler"
"github.com/pingcap/tidb/pkg/disttask/framework/storage"
"github.com/pingcap/tidb/pkg/metrics"
tidbutil "github.com/pingcap/tidb/pkg/util"
"github.com/pingcap/tidb/pkg/util/backoff"
"github.com/pingcap/tidb/pkg/util/cpu"
"github.com/pingcap/tidb/pkg/util/intest"
"github.com/pingcap/tidb/pkg/util/memory"
Expand All @@ -46,8 +49,6 @@ var (
MaxSubtaskCheckInterval = 2 * time.Second
maxChecksWhenNoSubtask = 7
recoverMetaInterval = 90 * time.Second
retrySQLTimes = 30
retrySQLInterval = 500 * time.Millisecond
unfinishedSubtaskStates = []proto.SubtaskState{
proto.SubtaskStatePending,
proto.SubtaskStateRunning,
Expand Down Expand Up @@ -110,44 +111,16 @@ func NewManager(ctx context.Context, id string, taskTable TaskTable) (*Manager,
// InitMeta initializes the meta of the Manager.
// not a must-success step before start manager,
// manager will try to recover meta periodically.
func (m *Manager) InitMeta() (err error) {
for i := 0; i < retrySQLTimes; i++ {
err = m.taskTable.InitMeta(m.ctx, m.id, config.GetGlobalConfig().Instance.TiDBServiceScope)
if err == nil {
break
}
if err1 := m.ctx.Err(); err1 != nil {
return err1
}
if i%10 == 0 {
m.logger.Warn("start manager failed",
zap.String("scope", config.GetGlobalConfig().Instance.TiDBServiceScope),
zap.Int("retry times", i),
zap.Error(err))
}
time.Sleep(retrySQLInterval)
}
return err
func (m *Manager) InitMeta() error {
return m.runWithRetry(func() error {
return m.taskTable.InitMeta(m.ctx, m.id, config.GetGlobalConfig().Instance.TiDBServiceScope)
}, "init meta failed")
}

func (m *Manager) recoverMeta() (err error) {
for i := 0; i < retrySQLTimes; i++ {
err = m.taskTable.RecoverMeta(m.ctx, m.id, config.GetGlobalConfig().Instance.TiDBServiceScope)
if err == nil {
break
}
if err1 := m.ctx.Err(); err1 != nil {
return err1
}
if i%10 == 0 {
m.logger.Warn("recover meta failed",
zap.String("scope", config.GetGlobalConfig().Instance.TiDBServiceScope),
zap.Int("retry times", i),
zap.Error(err))
}
time.Sleep(retrySQLInterval)
}
return err
func (m *Manager) recoverMeta() error {
return m.runWithRetry(func() error {
return m.taskTable.RecoverMeta(m.ctx, m.id, config.GetGlobalConfig().Instance.TiDBServiceScope)
}, "recover meta failed")
}

// Start starts the Manager.
Expand Down Expand Up @@ -318,13 +291,13 @@ func (m *Manager) startTaskExecutor(task *proto.Task) {
factory := GetTaskExecutorFactory(task.Type)
if factory == nil {
err := errors.Errorf("task type %s not found", task.Type)
m.logErrAndPersist(err, task.ID, nil)
m.failSubtask(err, task.ID, nil)
return
}
executor := factory(m.ctx, m.id, task, m.taskTable)
err := executor.Init(m.ctx)
if err != nil {
m.logErrAndPersist(err, task.ID, executor)
m.failSubtask(err, task.ID, executor)
return
}
m.addTaskExecutor(executor)
Expand Down Expand Up @@ -374,17 +347,31 @@ func (m *Manager) logErr(err error) {
m.logger.Error("task manager met error", zap.Error(err), zap.Stack("stack"))
}

func (m *Manager) logErrAndPersist(err error, taskID int64, taskExecutor TaskExecutor) {
func (m *Manager) failSubtask(err error, taskID int64, taskExecutor TaskExecutor) {
m.logErr(err)
// TODO we want to define err of taskexecutor.Init as fatal, but add-index have
// some code in Init that need retry, remove it after it's decoupled.
if taskExecutor != nil && taskExecutor.IsRetryableError(err) {
m.logger.Error("met retryable err", zap.Error(err), zap.Stack("stack"))
return
}
err1 := m.taskTable.FailSubtask(m.ctx, m.id, taskID, err)
err1 := m.runWithRetry(func() error {
return m.taskTable.FailSubtask(m.ctx, m.id, taskID, err)
}, "update to subtask failed")
if err1 == nil {
m.logger.Error("update error to subtask success", zap.Int64("task-id", taskID), zap.Error(err1), zap.Stack("stack"))
}
}

func (m *Manager) runWithRetry(fn func() error, msg string) error {
backoffer := backoff.NewExponential(scheduler.RetrySQLInterval, 2, scheduler.RetrySQLMaxInterval)
err1 := handle.RunWithRetry(m.ctx, scheduler.RetrySQLTimes, backoffer, m.logger,
func(_ context.Context) (bool, error) {
return true, fn()
},
)
if err1 != nil {
m.logger.Error("update to subtask failed", zap.Error(err1), zap.Stack("stack"))
m.logger.Warn(msg, zap.Error(err1))
}
m.logger.Error("update error to subtask", zap.Int64("task-id", taskID), zap.Error(err1), zap.Stack("stack"))
return err1
}
7 changes: 4 additions & 3 deletions pkg/disttask/framework/taskexecutor/manager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import (

"github.com/pingcap/tidb/pkg/disttask/framework/mock"
"github.com/pingcap/tidb/pkg/disttask/framework/proto"
"github.com/pingcap/tidb/pkg/disttask/framework/scheduler"
"github.com/pingcap/tidb/pkg/disttask/framework/storage"
"github.com/pingcap/tidb/pkg/util/logutil"
"github.com/pingcap/tidb/pkg/util/memory"
Expand Down Expand Up @@ -453,11 +454,11 @@ func TestManagerInitMeta(t *testing.T) {
require.NoError(t, m.InitMeta())
require.True(t, ctrl.Satisfied())

bak := retrySQLTimes
bak := scheduler.RetrySQLTimes
t.Cleanup(func() {
retrySQLTimes = bak
scheduler.RetrySQLTimes = bak
})
retrySQLTimes = 1
scheduler.RetrySQLTimes = 1
mockTaskTable.EXPECT().InitMeta(gomock.Any(), gomock.Any(), gomock.Any()).Return(errors.New("mock err"))
require.ErrorContains(t, m.InitMeta(), "mock err")
require.True(t, ctrl.Satisfied())
Expand Down

0 comments on commit f35778f

Please sign in to comment.