Skip to content

Commit

Permalink
br: Enable checkpoint advancer to pause tasks lagged too large (#51441)…
Browse files Browse the repository at this point in the history
… (#52554)

close #50803
  • Loading branch information
ti-chi-bot authored Apr 18, 2024
1 parent 4c3389e commit be2e310
Show file tree
Hide file tree
Showing 10 changed files with 264 additions and 24 deletions.
5 changes: 4 additions & 1 deletion br/pkg/streamhelper/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,8 @@ go_test(
"subscription_test.go",
],
flaky = True,
shard_count = 22,
race = "on",
shard_count = 25,
deps = [
":streamhelper",
"//br/pkg/errors",
Expand All @@ -88,8 +89,10 @@ go_test(
"@com_github_pingcap_kvproto//pkg/logbackuppb",
"@com_github_pingcap_kvproto//pkg/metapb",
"@com_github_pingcap_log//:log",
"@com_github_stretchr_testify//assert",
"@com_github_stretchr_testify//require",
"@com_github_tikv_client_go_v2//kv",
"@com_github_tikv_client_go_v2//oracle",
"@com_github_tikv_client_go_v2//tikv",
"@com_github_tikv_client_go_v2//tikvrpc",
"@com_github_tikv_client_go_v2//txnkv/txnlock",
Expand Down
41 changes: 40 additions & 1 deletion br/pkg/streamhelper/advancer.go
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,7 @@ type CheckpointAdvancer struct {
lastCheckpoint *checkpoint
lastCheckpointMu sync.Mutex
inResolvingLock atomic.Bool
isPaused atomic.Bool

checkpoints *spans.ValueSortedFull
checkpointsMu sync.Mutex
Expand Down Expand Up @@ -446,6 +447,14 @@ func (c *CheckpointAdvancer) onTaskEvent(ctx context.Context, e TaskEvent) error
log.Warn("failed to remove service GC safepoint", logutil.ShortError(err))
}
metrics.LastCheckpoint.DeleteLabelValues(e.Name)
case EventPause:
if c.task.GetName() == e.Name {
c.isPaused.CompareAndSwap(false, true)
}
case EventResume:
if c.task.GetName() == e.Name {
c.isPaused.CompareAndSwap(true, false)
}
case EventErr:
return e.Err
}
Expand Down Expand Up @@ -543,13 +552,43 @@ func (c *CheckpointAdvancer) subscribeTick(ctx context.Context) error {
return c.subscriber.PendingErrors()
}

func (c *CheckpointAdvancer) isCheckpointLagged(ctx context.Context) (bool, error) {
if c.cfg.CheckPointLagLimit <= 0 {
return false, nil
}

now, err := c.env.FetchCurrentTS(ctx)
if err != nil {
return false, err
}

lagDuration := oracle.GetTimeFromTS(now).Sub(oracle.GetTimeFromTS(c.lastCheckpoint.TS))
if lagDuration > c.cfg.CheckPointLagLimit {
log.Warn("checkpoint lag is too large", zap.String("category", "log backup advancer"),
zap.Stringer("lag", lagDuration))
return true, nil
}
return false, nil
}

func (c *CheckpointAdvancer) importantTick(ctx context.Context) error {
c.checkpointsMu.Lock()
c.setCheckpoint(ctx, c.checkpoints.Min())
c.checkpointsMu.Unlock()
if err := c.env.UploadV3GlobalCheckpointForTask(ctx, c.task.Name, c.lastCheckpoint.TS); err != nil {
return errors.Annotate(err, "failed to upload global checkpoint")
}
isLagged, err := c.isCheckpointLagged(ctx)
if err != nil {
return errors.Annotate(err, "failed to check timestamp")
}
if isLagged {
err := c.env.PauseTask(ctx, c.task.Name)
if err != nil {
return errors.Annotate(err, "failed to pause task")
}
return errors.Annotate(errors.Errorf("check point lagged too large"), "check point lagged too large")
}
p, err := c.env.BlockGCUntil(ctx, c.lastCheckpoint.safeTS())
if err != nil {
return errors.Annotatef(err,
Expand Down Expand Up @@ -605,7 +644,7 @@ func (c *CheckpointAdvancer) optionalTick(cx context.Context) error {
func (c *CheckpointAdvancer) tick(ctx context.Context) error {
c.taskMu.Lock()
defer c.taskMu.Unlock()
if c.task == nil {
if c.task == nil || c.isPaused.Load() {
log.Debug("No tasks yet, skipping advancing.")
return nil
}
Expand Down
88 changes: 66 additions & 22 deletions br/pkg/streamhelper/advancer_cliext.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,8 @@ const (
EventAdd EventType = iota
EventDel
EventErr
EventPause
EventResume
)

func (t EventType) String() string {
Expand All @@ -39,6 +41,10 @@ func (t EventType) String() string {
return "Del"
case EventErr:
return "Err"
case EventPause:
return "Pause"
case EventResume:
return "Resume"
}
return "Unknown"
}
Expand Down Expand Up @@ -70,29 +76,47 @@ func errorEvent(err error) TaskEvent {
}

func (t AdvancerExt) toTaskEvent(ctx context.Context, event *clientv3.Event) (TaskEvent, error) {
if !bytes.HasPrefix(event.Kv.Key, []byte(PrefixOfTask())) {
return TaskEvent{}, errors.Annotatef(berrors.ErrInvalidArgument,
"the path isn't a task path (%s)", string(event.Kv.Key))
te := TaskEvent{}
var prefix string

if bytes.HasPrefix(event.Kv.Key, []byte(PrefixOfTask())) {
prefix = PrefixOfTask()
te.Name = strings.TrimPrefix(string(event.Kv.Key), prefix)
} else if bytes.HasPrefix(event.Kv.Key, []byte(PrefixOfPause())) {
prefix = PrefixOfPause()
te.Name = strings.TrimPrefix(string(event.Kv.Key), prefix)
} else {
return TaskEvent{},
errors.Annotatef(berrors.ErrInvalidArgument, "the path isn't a task/pause path (%s)",
string(event.Kv.Key))
}

te := TaskEvent{}
te.Name = strings.TrimPrefix(string(event.Kv.Key), PrefixOfTask())
if event.Type == clientv3.EventTypeDelete {
te.Type = EventDel
} else if event.Type == clientv3.EventTypePut {
switch {
case event.Type == clientv3.EventTypePut && prefix == PrefixOfTask():
te.Type = EventAdd
} else {
return TaskEvent{}, errors.Annotatef(berrors.ErrInvalidArgument, "event type is wrong (%s)", event.Type)
case event.Type == clientv3.EventTypeDelete && prefix == PrefixOfTask():
te.Type = EventDel
case event.Type == clientv3.EventTypePut && prefix == PrefixOfPause():
te.Type = EventPause
case event.Type == clientv3.EventTypeDelete && prefix == PrefixOfPause():
te.Type = EventResume
default:
return TaskEvent{},
errors.Annotatef(berrors.ErrInvalidArgument,
"invalid event type or prefix: type=%s, prefix=%s", event.Type, prefix)
}

te.Info = new(backuppb.StreamBackupTaskInfo)
if err := proto.Unmarshal(event.Kv.Value, te.Info); err != nil {
return TaskEvent{}, err
}

var err error
te.Ranges, err = t.MetaDataClient.TaskByInfo(*te.Info).Ranges(ctx)
if err != nil {
return TaskEvent{}, err
}

return te, nil
}

Expand All @@ -113,7 +137,10 @@ func (t AdvancerExt) eventFromWatch(ctx context.Context, resp clientv3.WatchResp
}

func (t AdvancerExt) startListen(ctx context.Context, rev int64, ch chan<- TaskEvent) {
c := t.Client.Watcher.Watch(ctx, PrefixOfTask(), clientv3.WithPrefix(), clientv3.WithRev(rev))
taskCh := t.Client.Watcher.Watch(ctx, PrefixOfTask(), clientv3.WithPrefix(), clientv3.WithRev(rev))
pauseCh := t.Client.Watcher.Watch(ctx, PrefixOfPause(), clientv3.WithPrefix(), clientv3.WithRev(rev))

// inner function def
handleResponse := func(resp clientv3.WatchResponse) bool {
events, err := t.eventFromWatch(ctx, resp)
if err != nil {
Expand All @@ -126,21 +153,26 @@ func (t AdvancerExt) startListen(ctx context.Context, rev int64, ch chan<- TaskE
}
return true
}

// inner function def
collectRemaining := func() {
log.Info("[log backup advancer] Start collecting remaining events in the channel.",
zap.Int("remained", len(c)))
defer log.Info("[log backup advancer] Finish collecting remaining events in the channel.")
log.Info("Start collecting remaining events in the channel.", zap.String("category", "log backup advancer"),
zap.Int("remained", len(taskCh)))
defer log.Info("Finish collecting remaining events in the channel.", zap.String("category", "log backup advancer"))
for {
if taskCh == nil && pauseCh == nil {
return
}

select {
case resp, ok := <-c:
if !ok {
return
case resp, ok := <-taskCh:
if !ok || !handleResponse(resp) {
taskCh = nil
}
if !handleResponse(resp) {
return
case resp, ok := <-pauseCh:
if !ok || !handleResponse(resp) {
pauseCh = nil
}
default:
return
}
}
}
Expand All @@ -149,7 +181,7 @@ func (t AdvancerExt) startListen(ctx context.Context, rev int64, ch chan<- TaskE
defer close(ch)
for {
select {
case resp, ok := <-c:
case resp, ok := <-taskCh:
failpoint.Inject("advancer_close_channel", func() {
// We cannot really close the channel, just simulating it.
ok = false
Expand All @@ -161,6 +193,18 @@ func (t AdvancerExt) startListen(ctx context.Context, rev int64, ch chan<- TaskE
if !handleResponse(resp) {
return
}
case resp, ok := <-pauseCh:
failpoint.Inject("advancer_close_pause_channel", func() {
// We cannot really close the channel, just simulating it.
ok = false
})
if !ok {
ch <- errorEvent(io.EOF)
return
}
if !handleResponse(resp) {
return
}
case <-ctx.Done():
collectRemaining()
ch <- errorEvent(ctx.Err())
Expand Down
7 changes: 7 additions & 0 deletions br/pkg/streamhelper/advancer_env.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import (
"github.com/pingcap/tidb/br/pkg/utils"
"github.com/pingcap/tidb/config"
"github.com/pingcap/tidb/util/engine"
"github.com/tikv/client-go/v2/oracle"
"github.com/tikv/client-go/v2/tikv"
"github.com/tikv/client-go/v2/txnkv/txnlock"
pd "github.com/tikv/pd/client"
Expand Down Expand Up @@ -48,6 +49,11 @@ func (c PDRegionScanner) BlockGCUntil(ctx context.Context, at uint64) (uint64, e
return c.UpdateServiceGCSafePoint(ctx, logBackupServiceID, int64(logBackupSafePointTTL.Seconds()), at)
}

// TODO: It should be able to synchoronize the current TS with the PD.
func (c PDRegionScanner) FetchCurrentTS(ctx context.Context) (uint64, error) {
return oracle.ComposeTS(time.Now().UnixMilli(), 0), nil
}

// RegionScan gets a list of regions, starts from the region that contains key.
// Limit limits the maximum number of regions returned.
func (c PDRegionScanner) RegionScan(ctx context.Context, key, endKey []byte, limit int) ([]RegionWithLeader, error) {
Expand Down Expand Up @@ -152,6 +158,7 @@ type StreamMeta interface {
UploadV3GlobalCheckpointForTask(ctx context.Context, taskName string, checkpoint uint64) error
// ClearV3GlobalCheckpointForTask clears the global checkpoint to the meta store.
ClearV3GlobalCheckpointForTask(ctx context.Context, taskName string) error
PauseTask(ctx context.Context, taskName string) error
}

var _ tikv.RegionLockResolver = &AdvancerLockResolver{}
Expand Down
78 changes: 78 additions & 0 deletions br/pkg/streamhelper/advancer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ import (
"github.com/pingcap/tidb/br/pkg/streamhelper/config"
"github.com/pingcap/tidb/br/pkg/streamhelper/spans"
"github.com/pingcap/tidb/kv"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"github.com/tikv/client-go/v2/tikv"
"github.com/tikv/client-go/v2/txnkv/txnlock"
Expand Down Expand Up @@ -463,3 +464,80 @@ func TestRemoveTaskAndFlush(t *testing.T) {
return !adv.HasSubscribion()
}, 10*time.Second, 100*time.Millisecond)
}

func TestEnableCheckPointLimit(t *testing.T) {
c := createFakeCluster(t, 4, false)
defer func() {
fmt.Println(c)
}()
c.splitAndScatter("01", "02", "022", "023", "033", "04", "043")
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
env := &testEnv{fakeCluster: c, testCtx: t}
adv := streamhelper.NewCheckpointAdvancer(env)
adv.UpdateConfigWith(func(c *config.Config) {
c.CheckPointLagLimit = 1 * time.Minute
})
adv.StartTaskListener(ctx)
for i := 0; i < 5; i++ {
c.advanceClusterTimeBy(30 * time.Second)
c.advanceCheckpointBy(20 * time.Second)
require.NoError(t, adv.OnTick(ctx))
}
}

func TestCheckPointLagged(t *testing.T) {
c := createFakeCluster(t, 4, false)
defer func() {
fmt.Println(c)
}()
c.splitAndScatter("01", "02", "022", "023", "033", "04", "043")
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
env := &testEnv{fakeCluster: c, testCtx: t}
adv := streamhelper.NewCheckpointAdvancer(env)
adv.UpdateConfigWith(func(c *config.Config) {
c.CheckPointLagLimit = 1 * time.Minute
})
adv.StartTaskListener(ctx)
c.advanceClusterTimeBy(1 * time.Minute)
require.NoError(t, adv.OnTick(ctx))
c.advanceClusterTimeBy(1 * time.Minute)
require.ErrorContains(t, adv.OnTick(ctx), "lagged too large")
// after some times, the isPaused will be set and ticks are skipped
require.Eventually(t, func() bool {
return assert.NoError(t, adv.OnTick(ctx))
}, 5*time.Second, 100*time.Millisecond)
}

func TestCheckPointResume(t *testing.T) {
c := createFakeCluster(t, 4, false)
defer func() {
fmt.Println(c)
}()
c.splitAndScatter("01", "02", "022", "023", "033", "04", "043")
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
env := &testEnv{fakeCluster: c, testCtx: t}
adv := streamhelper.NewCheckpointAdvancer(env)
adv.UpdateConfigWith(func(c *config.Config) {
c.CheckPointLagLimit = 1 * time.Minute
})
adv.StartTaskListener(ctx)
c.advanceClusterTimeBy(1 * time.Minute)
require.NoError(t, adv.OnTick(ctx))
c.advanceClusterTimeBy(1 * time.Minute)
require.ErrorContains(t, adv.OnTick(ctx), "lagged too large")
require.Eventually(t, func() bool {
return assert.NoError(t, adv.OnTick(ctx))
}, 5*time.Second, 100*time.Millisecond)
//now the checkpoint issue is fixed and resumed
c.advanceCheckpointBy(1 * time.Minute)
env.ResumeTask(ctx)
require.Eventually(t, func() bool {
return assert.NoError(t, adv.OnTick(ctx))
}, 5*time.Second, 100*time.Millisecond)
//with time passed, the checkpoint will exceed the limit again
c.advanceClusterTimeBy(2 * time.Minute)
require.ErrorContains(t, adv.OnTick(ctx), "lagged too large")
}
Loading

0 comments on commit be2e310

Please sign in to comment.