Skip to content

Commit a2fae33

Browse files
committed
Merge branch 'master' into rename-explaintest
2 parents 36b257c + ae442ad commit a2fae33

12 files changed

+452
-48
lines changed

disttask/framework/dispatcher/dispatcher.go

+12-8
Original file line numberDiff line numberDiff line change
@@ -44,8 +44,12 @@ const (
4444
var (
4545
checkTaskFinishedInterval = 500 * time.Millisecond
4646
nonRetrySQLTime = 1
47-
retrySQLTimes = 30
48-
retrySQLInterval = 3 * time.Second
47+
// RetrySQLTimes is the max retry times when executing SQL.
48+
RetrySQLTimes = 30
49+
// RetrySQLInterval is the initial interval between two SQL retries.
50+
RetrySQLInterval = 3 * time.Second
51+
// RetrySQLMaxInterval is the max interval between two SQL retries.
52+
RetrySQLMaxInterval = 30 * time.Second
4953
)
5054

5155
// TaskHandle provides the interface for operations needed by Dispatcher.
@@ -191,7 +195,7 @@ func (d *BaseDispatcher) onReverting() error {
191195
if prevStageFinished {
192196
// Finish the rollback step.
193197
logutil.Logger(d.logCtx).Info("update the task to reverted state")
194-
return d.updateTask(proto.TaskStateReverted, nil, retrySQLTimes)
198+
return d.updateTask(proto.TaskStateReverted, nil, RetrySQLTimes)
195199
}
196200
// Wait all subtasks in this stage finished.
197201
d.OnTick(d.ctx, d.task)
@@ -321,7 +325,7 @@ func (d *BaseDispatcher) updateTask(taskState string, newSubTasks []*proto.Subta
321325
logutil.Logger(d.logCtx).Warn("updateTask first failed", zap.String("from", prevState), zap.String("to", d.task.State),
322326
zap.Int("retry times", retryTimes), zap.Error(err))
323327
}
324-
time.Sleep(retrySQLInterval)
328+
time.Sleep(RetrySQLInterval)
325329
}
326330
if err != nil && retryTimes != nonRetrySQLTime {
327331
logutil.Logger(d.logCtx).Warn("updateTask failed",
@@ -354,7 +358,7 @@ func (d *BaseDispatcher) dispatchSubTask4Revert(task *proto.Task, meta []byte) e
354358
for _, id := range instanceIDs {
355359
subTasks = append(subTasks, proto.NewSubtask(task.ID, task.Type, id, meta))
356360
}
357-
return d.updateTask(proto.TaskStateReverting, subTasks, retrySQLTimes)
361+
return d.updateTask(proto.TaskStateReverting, subTasks, RetrySQLTimes)
358362
}
359363

360364
func (d *BaseDispatcher) onNextStage() error {
@@ -377,7 +381,7 @@ func (d *BaseDispatcher) dispatchSubTask(task *proto.Task, metas [][]byte) error
377381
task.Concurrency = MaxSubtaskConcurrency
378382
}
379383

380-
retryTimes := retrySQLTimes
384+
retryTimes := RetrySQLTimes
381385
// 2. Special handling for the new tasks.
382386
if task.State == proto.TaskStatePending {
383387
// TODO: Consider using TS.
@@ -428,7 +432,7 @@ func (d *BaseDispatcher) dispatchSubTask(task *proto.Task, metas [][]byte) error
428432
logutil.Logger(d.logCtx).Debug("create subtasks", zap.String("instanceID", instanceID))
429433
subTasks = append(subTasks, proto.NewSubtask(task.ID, task.Type, instanceID, meta))
430434
}
431-
return d.updateTask(proto.TaskStateRunning, subTasks, retrySQLTimes)
435+
return d.updateTask(proto.TaskStateRunning, subTasks, RetrySQLTimes)
432436
}
433437

434438
func (d *BaseDispatcher) handlePlanErr(err error) error {
@@ -438,7 +442,7 @@ func (d *BaseDispatcher) handlePlanErr(err error) error {
438442
}
439443
d.task.Error = err
440444
// state transform: pending -> failed.
441-
return d.updateTask(proto.TaskStateFailed, nil, retrySQLTimes)
445+
return d.updateTask(proto.TaskStateFailed, nil, RetrySQLTimes)
442446
}
443447

444448
// GenerateSchedulerNodes generate a eligible TiDB nodes.

disttask/framework/dispatcher/main_test.go

+1-1
Original file line numberDiff line numberDiff line change
@@ -43,7 +43,7 @@ func TestMain(m *testing.M) {
4343
// Make test more fast.
4444
checkTaskRunningInterval = checkTaskRunningInterval / 10
4545
checkTaskFinishedInterval = checkTaskFinishedInterval / 10
46-
retrySQLInterval = retrySQLInterval / 20
46+
RetrySQLInterval = RetrySQLInterval / 20
4747

4848
opts := []goleak.Option{
4949
goleak.IgnoreTopFunction("github.com/golang/glog.(*fileSink).flushDaemon"),

disttask/framework/handle/BUILD.bazel

+4
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@ go_library(
88
deps = [
99
"//disttask/framework/proto",
1010
"//disttask/framework/storage",
11+
"//util/backoff",
1112
"//util/logutil",
1213
"@com_github_pingcap_errors//:errors",
1314
"@org_uber_go_zap//:zap",
@@ -24,7 +25,10 @@ go_test(
2425
"//disttask/framework/proto",
2526
"//disttask/framework/storage",
2627
"//testkit",
28+
"//util/backoff",
2729
"@com_github_ngaut_pools//:pools",
30+
"@com_github_pingcap_errors//:errors",
31+
"@com_github_pingcap_log//:log",
2832
"@com_github_stretchr_testify//require",
2933
"@com_github_tikv_client_go_v2//util",
3034
],

disttask/framework/handle/handle.go

+31
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@ import (
2121
"github.com/pingcap/errors"
2222
"github.com/pingcap/tidb/disttask/framework/proto"
2323
"github.com/pingcap/tidb/disttask/framework/storage"
24+
"github.com/pingcap/tidb/util/backoff"
2425
"github.com/pingcap/tidb/util/logutil"
2526
"go.uber.org/zap"
2627
)
@@ -118,3 +119,33 @@ func CancelGlobalTask(taskKey string) error {
118119
}
119120
return globalTaskManager.CancelGlobalTask(globalTask.ID)
120121
}
122+
123+
// RunWithRetry runs a function with retry, when retry exceed max retry time, it
124+
// returns the last error met.
125+
// if the function fails with err, it should return a bool to indicate whether
126+
// the error is retryable.
127+
// if context done, it will stop early and return ctx.Err().
128+
func RunWithRetry(
129+
ctx context.Context,
130+
maxRetry int,
131+
backoffer backoff.Backoffer,
132+
logger *zap.Logger,
133+
f func(context.Context) (bool, error),
134+
) error {
135+
var lastErr error
136+
for i := 0; i < maxRetry; i++ {
137+
retryable, err := f(ctx)
138+
if err == nil || !retryable {
139+
return err
140+
}
141+
lastErr = err
142+
logger.Warn("met retryable error", zap.Int("retry-count", i),
143+
zap.Int("max-retry", maxRetry), zap.Error(err))
144+
select {
145+
case <-ctx.Done():
146+
return ctx.Err()
147+
case <-time.After(backoffer.Backoff(i)):
148+
}
149+
}
150+
return lastErr
151+
}

disttask/framework/handle/handle_test.go

+66
Original file line numberDiff line numberDiff line change
@@ -16,14 +16,19 @@ package handle_test
1616

1717
import (
1818
"context"
19+
"math"
20+
"sync/atomic"
1921
"testing"
2022
"time"
2123

2224
"github.com/ngaut/pools"
25+
"github.com/pingcap/errors"
26+
"github.com/pingcap/log"
2327
"github.com/pingcap/tidb/disttask/framework/handle"
2428
"github.com/pingcap/tidb/disttask/framework/proto"
2529
"github.com/pingcap/tidb/disttask/framework/storage"
2630
"github.com/pingcap/tidb/testkit"
31+
"github.com/pingcap/tidb/util/backoff"
2732
"github.com/stretchr/testify/require"
2833
"github.com/tikv/client-go/v2/util"
2934
)
@@ -56,3 +61,64 @@ func TestHandle(t *testing.T) {
5661

5762
require.NoError(t, handle.CancelGlobalTask("1"))
5863
}
64+
65+
func TestRunWithRetry(t *testing.T) {
66+
ctx := context.Background()
67+
68+
// retry count exceed
69+
backoffer := backoff.NewExponential(100*time.Millisecond, 1, time.Second)
70+
err := handle.RunWithRetry(ctx, 3, backoffer, log.L(),
71+
func(ctx context.Context) (bool, error) {
72+
return true, errors.New("mock error")
73+
},
74+
)
75+
require.ErrorContains(t, err, "mock error")
76+
77+
// non-retryable error
78+
var end atomic.Bool
79+
go func() {
80+
defer end.Store(true)
81+
backoffer = backoff.NewExponential(100*time.Millisecond, 1, time.Second)
82+
err = handle.RunWithRetry(ctx, math.MaxInt, backoffer, log.L(),
83+
func(ctx context.Context) (bool, error) {
84+
return false, errors.New("mock error")
85+
},
86+
)
87+
require.Error(t, err)
88+
}()
89+
require.Eventually(t, func() bool {
90+
return end.Load()
91+
}, 5*time.Second, 100*time.Millisecond)
92+
93+
// fail with retryable error once, then success
94+
end.Store(false)
95+
go func() {
96+
defer end.Store(true)
97+
backoffer = backoff.NewExponential(100*time.Millisecond, 1, time.Second)
98+
var i int
99+
err = handle.RunWithRetry(ctx, math.MaxInt, backoffer, log.L(),
100+
func(ctx context.Context) (bool, error) {
101+
if i == 0 {
102+
i++
103+
return true, errors.New("mock error")
104+
}
105+
return false, nil
106+
},
107+
)
108+
require.NoError(t, err)
109+
}()
110+
require.Eventually(t, func() bool {
111+
return end.Load()
112+
}, 5*time.Second, 100*time.Millisecond)
113+
114+
// context done
115+
subctx, cancel := context.WithCancel(ctx)
116+
cancel()
117+
backoffer = backoff.NewExponential(100*time.Millisecond, 1, time.Second)
118+
err = handle.RunWithRetry(subctx, math.MaxInt, backoffer, log.L(),
119+
func(ctx context.Context) (bool, error) {
120+
return true, errors.New("mock error")
121+
},
122+
)
123+
require.ErrorIs(t, err, context.Canceled)
124+
}

disttask/importinto/BUILD.bazel

+5-1
Original file line numberDiff line numberDiff line change
@@ -48,6 +48,7 @@ go_library(
4848
"//sessionctx/variable",
4949
"//table/tables",
5050
"//util",
51+
"//util/backoff",
5152
"//util/dbterror/exeerrors",
5253
"//util/etcd",
5354
"//util/logutil",
@@ -68,6 +69,7 @@ go_test(
6869
timeout = "short",
6970
srcs = [
7071
"dispatcher_test.go",
72+
"dispatcher_testkit_test.go",
7173
"encode_and_sort_operator_test.go",
7274
"planner_test.go",
7375
"subtask_executor_test.go",
@@ -76,11 +78,12 @@ go_test(
7678
embed = [":importinto"],
7779
flaky = True,
7880
race = "on",
79-
shard_count = 6,
81+
shard_count = 7,
8082
deps = [
8183
"//br/pkg/lightning/checkpoints",
8284
"//br/pkg/lightning/mydump",
8385
"//br/pkg/lightning/verification",
86+
"//disttask/framework/dispatcher",
8487
"//disttask/framework/mock/execute",
8588
"//disttask/framework/planner",
8689
"//disttask/framework/proto",
@@ -94,6 +97,7 @@ go_test(
9497
"//parser/mysql",
9598
"//testkit",
9699
"//util/logutil",
100+
"//util/sqlexec",
97101
"@com_github_ngaut_pools//:pools",
98102
"@com_github_pingcap_errors//:errors",
99103
"@com_github_pingcap_failpoint//:failpoint",

0 commit comments

Comments
 (0)