Skip to content

Commit 7548df7

Browse files
authored
br: Enable checkpoint advancer to pause tasks lagged too large (#51441)
close #50803
1 parent 0d742d3 commit 7548df7

10 files changed

+261
-22
lines changed

br/pkg/streamhelper/BUILD.bazel

+3-1
Original file line numberDiff line numberDiff line change
@@ -68,7 +68,7 @@ go_test(
6868
],
6969
flaky = True,
7070
race = "on",
71-
shard_count = 22,
71+
shard_count = 25,
7272
deps = [
7373
":streamhelper",
7474
"//br/pkg/errors",
@@ -89,8 +89,10 @@ go_test(
8989
"@com_github_pingcap_kvproto//pkg/logbackuppb",
9090
"@com_github_pingcap_kvproto//pkg/metapb",
9191
"@com_github_pingcap_log//:log",
92+
"@com_github_stretchr_testify//assert",
9293
"@com_github_stretchr_testify//require",
9394
"@com_github_tikv_client_go_v2//kv",
95+
"@com_github_tikv_client_go_v2//oracle",
9496
"@com_github_tikv_client_go_v2//tikv",
9597
"@com_github_tikv_client_go_v2//tikvrpc",
9698
"@com_github_tikv_client_go_v2//txnkv/txnlock",

br/pkg/streamhelper/advancer.go

+40-1
Original file line numberDiff line numberDiff line change
@@ -71,6 +71,7 @@ type CheckpointAdvancer struct {
7171
lastCheckpoint *checkpoint
7272
lastCheckpointMu sync.Mutex
7373
inResolvingLock atomic.Bool
74+
isPaused atomic.Bool
7475

7576
checkpoints *spans.ValueSortedFull
7677
checkpointsMu sync.Mutex
@@ -446,6 +447,14 @@ func (c *CheckpointAdvancer) onTaskEvent(ctx context.Context, e TaskEvent) error
446447
log.Warn("failed to remove service GC safepoint", logutil.ShortError(err))
447448
}
448449
metrics.LastCheckpoint.DeleteLabelValues(e.Name)
450+
case EventPause:
451+
if c.task.GetName() == e.Name {
452+
c.isPaused.CompareAndSwap(false, true)
453+
}
454+
case EventResume:
455+
if c.task.GetName() == e.Name {
456+
c.isPaused.CompareAndSwap(true, false)
457+
}
449458
case EventErr:
450459
return e.Err
451460
}
@@ -544,13 +553,43 @@ func (c *CheckpointAdvancer) subscribeTick(ctx context.Context) error {
544553
return c.subscriber.PendingErrors()
545554
}
546555

556+
func (c *CheckpointAdvancer) isCheckpointLagged(ctx context.Context) (bool, error) {
557+
if c.cfg.CheckPointLagLimit <= 0 {
558+
return false, nil
559+
}
560+
561+
now, err := c.env.FetchCurrentTS(ctx)
562+
if err != nil {
563+
return false, err
564+
}
565+
566+
lagDuration := oracle.GetTimeFromTS(now).Sub(oracle.GetTimeFromTS(c.lastCheckpoint.TS))
567+
if lagDuration > c.cfg.CheckPointLagLimit {
568+
log.Warn("checkpoint lag is too large", zap.String("category", "log backup advancer"),
569+
zap.Stringer("lag", lagDuration))
570+
return true, nil
571+
}
572+
return false, nil
573+
}
574+
547575
func (c *CheckpointAdvancer) importantTick(ctx context.Context) error {
548576
c.checkpointsMu.Lock()
549577
c.setCheckpoint(ctx, c.checkpoints.Min())
550578
c.checkpointsMu.Unlock()
551579
if err := c.env.UploadV3GlobalCheckpointForTask(ctx, c.task.Name, c.lastCheckpoint.TS); err != nil {
552580
return errors.Annotate(err, "failed to upload global checkpoint")
553581
}
582+
isLagged, err := c.isCheckpointLagged(ctx)
583+
if err != nil {
584+
return errors.Annotate(err, "failed to check timestamp")
585+
}
586+
if isLagged {
587+
err := c.env.PauseTask(ctx, c.task.Name)
588+
if err != nil {
589+
return errors.Annotate(err, "failed to pause task")
590+
}
591+
return errors.Annotate(errors.Errorf("check point lagged too large"), "check point lagged too large")
592+
}
554593
p, err := c.env.BlockGCUntil(ctx, c.lastCheckpoint.safeTS())
555594
if err != nil {
556595
return errors.Annotatef(err,
@@ -606,7 +645,7 @@ func (c *CheckpointAdvancer) optionalTick(cx context.Context) error {
606645
func (c *CheckpointAdvancer) tick(ctx context.Context) error {
607646
c.taskMu.Lock()
608647
defer c.taskMu.Unlock()
609-
if c.task == nil {
648+
if c.task == nil || c.isPaused.Load() {
610649
log.Debug("No tasks yet, skipping advancing.")
611650
return nil
612651
}

br/pkg/streamhelper/advancer_cliext.go

+64-20
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,8 @@ const (
2929
EventAdd EventType = iota
3030
EventDel
3131
EventErr
32+
EventPause
33+
EventResume
3234
)
3335

3436
func (t EventType) String() string {
@@ -39,6 +41,10 @@ func (t EventType) String() string {
3941
return "Del"
4042
case EventErr:
4143
return "Err"
44+
case EventPause:
45+
return "Pause"
46+
case EventResume:
47+
return "Resume"
4248
}
4349
return "Unknown"
4450
}
@@ -70,29 +76,47 @@ func errorEvent(err error) TaskEvent {
7076
}
7177

7278
func (t AdvancerExt) toTaskEvent(ctx context.Context, event *clientv3.Event) (TaskEvent, error) {
73-
if !bytes.HasPrefix(event.Kv.Key, []byte(PrefixOfTask())) {
74-
return TaskEvent{}, errors.Annotatef(berrors.ErrInvalidArgument,
75-
"the path isn't a task path (%s)", string(event.Kv.Key))
79+
te := TaskEvent{}
80+
var prefix string
81+
82+
if bytes.HasPrefix(event.Kv.Key, []byte(PrefixOfTask())) {
83+
prefix = PrefixOfTask()
84+
te.Name = strings.TrimPrefix(string(event.Kv.Key), prefix)
85+
} else if bytes.HasPrefix(event.Kv.Key, []byte(PrefixOfPause())) {
86+
prefix = PrefixOfPause()
87+
te.Name = strings.TrimPrefix(string(event.Kv.Key), prefix)
88+
} else {
89+
return TaskEvent{},
90+
errors.Annotatef(berrors.ErrInvalidArgument, "the path isn't a task/pause path (%s)",
91+
string(event.Kv.Key))
7692
}
7793

78-
te := TaskEvent{}
79-
te.Name = strings.TrimPrefix(string(event.Kv.Key), PrefixOfTask())
80-
if event.Type == clientv3.EventTypeDelete {
81-
te.Type = EventDel
82-
} else if event.Type == clientv3.EventTypePut {
94+
switch {
95+
case event.Type == clientv3.EventTypePut && prefix == PrefixOfTask():
8396
te.Type = EventAdd
84-
} else {
85-
return TaskEvent{}, errors.Annotatef(berrors.ErrInvalidArgument, "event type is wrong (%s)", event.Type)
97+
case event.Type == clientv3.EventTypeDelete && prefix == PrefixOfTask():
98+
te.Type = EventDel
99+
case event.Type == clientv3.EventTypePut && prefix == PrefixOfPause():
100+
te.Type = EventPause
101+
case event.Type == clientv3.EventTypeDelete && prefix == PrefixOfPause():
102+
te.Type = EventResume
103+
default:
104+
return TaskEvent{},
105+
errors.Annotatef(berrors.ErrInvalidArgument,
106+
"invalid event type or prefix: type=%s, prefix=%s", event.Type, prefix)
86107
}
108+
87109
te.Info = new(backuppb.StreamBackupTaskInfo)
88110
if err := proto.Unmarshal(event.Kv.Value, te.Info); err != nil {
89111
return TaskEvent{}, err
90112
}
113+
91114
var err error
92115
te.Ranges, err = t.MetaDataClient.TaskByInfo(*te.Info).Ranges(ctx)
93116
if err != nil {
94117
return TaskEvent{}, err
95118
}
119+
96120
return te, nil
97121
}
98122

@@ -113,7 +137,10 @@ func (t AdvancerExt) eventFromWatch(ctx context.Context, resp clientv3.WatchResp
113137
}
114138

115139
func (t AdvancerExt) startListen(ctx context.Context, rev int64, ch chan<- TaskEvent) {
116-
c := t.Client.Watcher.Watch(ctx, PrefixOfTask(), clientv3.WithPrefix(), clientv3.WithRev(rev))
140+
taskCh := t.Client.Watcher.Watch(ctx, PrefixOfTask(), clientv3.WithPrefix(), clientv3.WithRev(rev))
141+
pauseCh := t.Client.Watcher.Watch(ctx, PrefixOfPause(), clientv3.WithPrefix(), clientv3.WithRev(rev))
142+
143+
// inner function def
117144
handleResponse := func(resp clientv3.WatchResponse) bool {
118145
events, err := t.eventFromWatch(ctx, resp)
119146
if err != nil {
@@ -127,21 +154,26 @@ func (t AdvancerExt) startListen(ctx context.Context, rev int64, ch chan<- TaskE
127154
}
128155
return true
129156
}
157+
158+
// inner function def
130159
collectRemaining := func() {
131160
log.Info("Start collecting remaining events in the channel.", zap.String("category", "log backup advancer"),
132-
zap.Int("remained", len(c)))
161+
zap.Int("remained", len(taskCh)))
133162
defer log.Info("Finish collecting remaining events in the channel.", zap.String("category", "log backup advancer"))
134163
for {
164+
if taskCh == nil && pauseCh == nil {
165+
return
166+
}
167+
135168
select {
136-
case resp, ok := <-c:
137-
if !ok {
138-
return
169+
case resp, ok := <-taskCh:
170+
if !ok || !handleResponse(resp) {
171+
taskCh = nil
139172
}
140-
if !handleResponse(resp) {
141-
return
173+
case resp, ok := <-pauseCh:
174+
if !ok || !handleResponse(resp) {
175+
pauseCh = nil
142176
}
143-
default:
144-
return
145177
}
146178
}
147179
}
@@ -150,7 +182,7 @@ func (t AdvancerExt) startListen(ctx context.Context, rev int64, ch chan<- TaskE
150182
defer close(ch)
151183
for {
152184
select {
153-
case resp, ok := <-c:
185+
case resp, ok := <-taskCh:
154186
failpoint.Inject("advancer_close_channel", func() {
155187
// We cannot really close the channel, just simulating it.
156188
ok = false
@@ -162,6 +194,18 @@ func (t AdvancerExt) startListen(ctx context.Context, rev int64, ch chan<- TaskE
162194
if !handleResponse(resp) {
163195
return
164196
}
197+
case resp, ok := <-pauseCh:
198+
failpoint.Inject("advancer_close_pause_channel", func() {
199+
// We cannot really close the channel, just simulating it.
200+
ok = false
201+
})
202+
if !ok {
203+
ch <- errorEvent(io.EOF)
204+
return
205+
}
206+
if !handleResponse(resp) {
207+
return
208+
}
165209
case <-ctx.Done():
166210
collectRemaining()
167211
ch <- errorEvent(ctx.Err())

br/pkg/streamhelper/advancer_env.go

+7
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@ import (
1010
"github.com/pingcap/tidb/br/pkg/utils"
1111
"github.com/pingcap/tidb/pkg/config"
1212
"github.com/pingcap/tidb/pkg/util/engine"
13+
"github.com/tikv/client-go/v2/oracle"
1314
"github.com/tikv/client-go/v2/tikv"
1415
"github.com/tikv/client-go/v2/txnkv/txnlock"
1516
pd "github.com/tikv/pd/client"
@@ -48,6 +49,11 @@ func (c PDRegionScanner) BlockGCUntil(ctx context.Context, at uint64) (uint64, e
4849
return c.UpdateServiceGCSafePoint(ctx, logBackupServiceID, int64(logBackupSafePointTTL.Seconds()), at)
4950
}
5051

52+
// TODO: It should be able to synchoronize the current TS with the PD.
53+
func (c PDRegionScanner) FetchCurrentTS(ctx context.Context) (uint64, error) {
54+
return oracle.ComposeTS(time.Now().UnixMilli(), 0), nil
55+
}
56+
5157
// RegionScan gets a list of regions, starts from the region that contains key.
5258
// Limit limits the maximum number of regions returned.
5359
func (c PDRegionScanner) RegionScan(ctx context.Context, key, endKey []byte, limit int) ([]RegionWithLeader, error) {
@@ -152,6 +158,7 @@ type StreamMeta interface {
152158
UploadV3GlobalCheckpointForTask(ctx context.Context, taskName string, checkpoint uint64) error
153159
// ClearV3GlobalCheckpointForTask clears the global checkpoint to the meta store.
154160
ClearV3GlobalCheckpointForTask(ctx context.Context, taskName string) error
161+
PauseTask(ctx context.Context, taskName string) error
155162
}
156163

157164
var _ tikv.RegionLockResolver = &AdvancerLockResolver{}

br/pkg/streamhelper/advancer_test.go

+78
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@ import (
1717
"github.com/pingcap/tidb/br/pkg/streamhelper/config"
1818
"github.com/pingcap/tidb/br/pkg/streamhelper/spans"
1919
"github.com/pingcap/tidb/pkg/kv"
20+
"github.com/stretchr/testify/assert"
2021
"github.com/stretchr/testify/require"
2122
"github.com/tikv/client-go/v2/tikv"
2223
"github.com/tikv/client-go/v2/txnkv/txnlock"
@@ -463,3 +464,80 @@ func TestRemoveTaskAndFlush(t *testing.T) {
463464
return !adv.HasSubscribion()
464465
}, 10*time.Second, 100*time.Millisecond)
465466
}
467+
468+
func TestEnableCheckPointLimit(t *testing.T) {
469+
c := createFakeCluster(t, 4, false)
470+
defer func() {
471+
fmt.Println(c)
472+
}()
473+
c.splitAndScatter("01", "02", "022", "023", "033", "04", "043")
474+
ctx, cancel := context.WithCancel(context.Background())
475+
defer cancel()
476+
env := &testEnv{fakeCluster: c, testCtx: t}
477+
adv := streamhelper.NewCheckpointAdvancer(env)
478+
adv.UpdateConfigWith(func(c *config.Config) {
479+
c.CheckPointLagLimit = 1 * time.Minute
480+
})
481+
adv.StartTaskListener(ctx)
482+
for i := 0; i < 5; i++ {
483+
c.advanceClusterTimeBy(30 * time.Second)
484+
c.advanceCheckpointBy(20 * time.Second)
485+
require.NoError(t, adv.OnTick(ctx))
486+
}
487+
}
488+
489+
func TestCheckPointLagged(t *testing.T) {
490+
c := createFakeCluster(t, 4, false)
491+
defer func() {
492+
fmt.Println(c)
493+
}()
494+
c.splitAndScatter("01", "02", "022", "023", "033", "04", "043")
495+
ctx, cancel := context.WithCancel(context.Background())
496+
defer cancel()
497+
env := &testEnv{fakeCluster: c, testCtx: t}
498+
adv := streamhelper.NewCheckpointAdvancer(env)
499+
adv.UpdateConfigWith(func(c *config.Config) {
500+
c.CheckPointLagLimit = 1 * time.Minute
501+
})
502+
adv.StartTaskListener(ctx)
503+
c.advanceClusterTimeBy(1 * time.Minute)
504+
require.NoError(t, adv.OnTick(ctx))
505+
c.advanceClusterTimeBy(1 * time.Minute)
506+
require.ErrorContains(t, adv.OnTick(ctx), "lagged too large")
507+
// after some times, the isPaused will be set and ticks are skipped
508+
require.Eventually(t, func() bool {
509+
return assert.NoError(t, adv.OnTick(ctx))
510+
}, 5*time.Second, 100*time.Millisecond)
511+
}
512+
513+
func TestCheckPointResume(t *testing.T) {
514+
c := createFakeCluster(t, 4, false)
515+
defer func() {
516+
fmt.Println(c)
517+
}()
518+
c.splitAndScatter("01", "02", "022", "023", "033", "04", "043")
519+
ctx, cancel := context.WithCancel(context.Background())
520+
defer cancel()
521+
env := &testEnv{fakeCluster: c, testCtx: t}
522+
adv := streamhelper.NewCheckpointAdvancer(env)
523+
adv.UpdateConfigWith(func(c *config.Config) {
524+
c.CheckPointLagLimit = 1 * time.Minute
525+
})
526+
adv.StartTaskListener(ctx)
527+
c.advanceClusterTimeBy(1 * time.Minute)
528+
require.NoError(t, adv.OnTick(ctx))
529+
c.advanceClusterTimeBy(1 * time.Minute)
530+
require.ErrorContains(t, adv.OnTick(ctx), "lagged too large")
531+
require.Eventually(t, func() bool {
532+
return assert.NoError(t, adv.OnTick(ctx))
533+
}, 5*time.Second, 100*time.Millisecond)
534+
//now the checkpoint issue is fixed and resumed
535+
c.advanceCheckpointBy(1 * time.Minute)
536+
env.ResumeTask(ctx)
537+
require.Eventually(t, func() bool {
538+
return assert.NoError(t, adv.OnTick(ctx))
539+
}, 5*time.Second, 100*time.Millisecond)
540+
//with time passed, the checkpoint will exceed the limit again
541+
c.advanceClusterTimeBy(2 * time.Minute)
542+
require.ErrorContains(t, adv.OnTick(ctx), "lagged too large")
543+
}

0 commit comments

Comments
 (0)