From df231d0d0b5eb52115498e1d2cf39a5abc9c486d Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E5=B1=B1=E5=B2=9A?= <36239017+YuJuncen@users.noreply.github.com> Date: Wed, 7 Feb 2024 17:40:15 +0800 Subject: [PATCH] br/streamhelper: fix panic while removing task (#50869) close pingcap/tidb#50839 --- br/pkg/streamhelper/BUILD.bazel | 2 +- br/pkg/streamhelper/advancer.go | 46 +++++++++++++++++++---- br/pkg/streamhelper/advancer_test.go | 26 +++++++++++++ br/pkg/streamhelper/basic_lib_for_test.go | 9 +++++ br/pkg/utils/worker.go | 14 +++++++ 5 files changed, 88 insertions(+), 9 deletions(-) diff --git a/br/pkg/streamhelper/BUILD.bazel b/br/pkg/streamhelper/BUILD.bazel index 19926e96aa0df..5f90552cb1a65 100644 --- a/br/pkg/streamhelper/BUILD.bazel +++ b/br/pkg/streamhelper/BUILD.bazel @@ -68,7 +68,7 @@ go_test( ], flaky = True, race = "on", - shard_count = 20, + shard_count = 22, deps = [ ":streamhelper", "//br/pkg/errors", diff --git a/br/pkg/streamhelper/advancer.go b/br/pkg/streamhelper/advancer.go index 797618ea2a001..d475e86420431 100644 --- a/br/pkg/streamhelper/advancer.go +++ b/br/pkg/streamhelper/advancer.go @@ -78,6 +78,22 @@ type CheckpointAdvancer struct { subscriberMu sync.Mutex } +// HasTask returns whether the advancer has been bound to a task. +func (c *CheckpointAdvancer) HasTask() bool { + c.taskMu.Lock() + defer c.taskMu.Unlock() + + return c.task != nil +} + +// HasSubscriber returns whether the advancer is associated with a subscriber. +func (c *CheckpointAdvancer) HasSubscribion() bool { + c.subscriberMu.Lock() + defer c.subscriberMu.Unlock() + + return c.subscriber != nil && len(c.subscriber.subscriptions) > 0 +} + // checkpoint represents the TS with specific range. // it's only used in advancer.go. type checkpoint struct { @@ -363,6 +379,12 @@ func (c *CheckpointAdvancer) StartTaskListener(ctx context.Context) { }() } +func (c *CheckpointAdvancer) setCheckpoints(cps *spans.ValueSortedFull) { + c.checkpointsMu.Lock() + c.checkpoints = cps + c.checkpointsMu.Unlock() +} + func (c *CheckpointAdvancer) onTaskEvent(ctx context.Context, e TaskEvent) error { c.taskMu.Lock() defer c.taskMu.Unlock() @@ -371,7 +393,7 @@ func (c *CheckpointAdvancer) onTaskEvent(ctx context.Context, e TaskEvent) error utils.LogBackupTaskCountInc() c.task = e.Info c.taskRange = spans.Collapse(len(e.Ranges), func(i int) kv.KeyRange { return e.Ranges[i] }) - c.checkpoints = spans.Sorted(spans.NewFullWith(e.Ranges, 0)) + c.setCheckpoints(spans.Sorted(spans.NewFullWith(e.Ranges, 0))) c.lastCheckpoint = newCheckpointWithTS(e.Info.StartTs) log.Info("added event", zap.Stringer("task", e.Info), zap.Stringer("ranges", logutil.StringifyKeys(c.taskRange))) @@ -379,12 +401,12 @@ func (c *CheckpointAdvancer) onTaskEvent(ctx context.Context, e TaskEvent) error utils.LogBackupTaskCountDec() c.task = nil c.taskRange = nil - c.checkpoints = nil // This would be synced by `taskMu`, perhaps we'd better rename that to `tickMu`. // Do the null check because some of test cases won't equip the advancer with subscriber. if c.subscriber != nil { c.subscriber.Clear() } + c.setCheckpoints(nil) if err := c.env.ClearV3GlobalCheckpointForTask(ctx, e.Name); err != nil { log.Warn("failed to clear global checkpoint", logutil.ShortError(err)) } @@ -444,8 +466,10 @@ func (c *CheckpointAdvancer) SpawnSubscriptionHandler(ctx context.Context) { defer c.subscriberMu.Unlock() c.subscriber = NewSubscriber(c.env, c.env, WithMasterContext(ctx)) es := c.subscriber.Events() + log.Info("Subscription handler spawned.", zap.String("category", "log backup subscription manager")) go func() { + defer utils.CatchAndLogPanic() for { select { case <-ctx.Done(): @@ -454,12 +478,18 @@ func (c *CheckpointAdvancer) SpawnSubscriptionHandler(ctx context.Context) { if !ok { return } - c.checkpointsMu.Lock() - log.Debug("Accepting region flush event.", - zap.Stringer("range", logutil.StringifyRange(event.Key)), - zap.Uint64("checkpoint", event.Value)) - c.checkpoints.Merge(event) - c.checkpointsMu.Unlock() + failpoint.Inject("subscription-handler-loop", func() {}) + c.WithCheckpoints(func(vsf *spans.ValueSortedFull) { + if vsf == nil { + log.Warn("Span tree not found, perhaps stale event of removed tasks.", + zap.String("category", "log backup subscription manager")) + return + } + log.Debug("Accepting region flush event.", + zap.Stringer("range", logutil.StringifyRange(event.Key)), + zap.Uint64("checkpoint", event.Value)) + vsf.Merge(event) + }) } } }() diff --git a/br/pkg/streamhelper/advancer_test.go b/br/pkg/streamhelper/advancer_test.go index 7ffe5daafd0b0..0f120df2f83cf 100644 --- a/br/pkg/streamhelper/advancer_test.go +++ b/br/pkg/streamhelper/advancer_test.go @@ -373,3 +373,29 @@ func TestOwnerDropped(t *testing.T) { require.Equal(t, vsf.MinValue(), cp) }) } + +// TestRemoveTaskAndFlush tests the bug has been described in #50839. +func TestRemoveTaskAndFlush(t *testing.T) { + log.SetLevel(zapcore.DebugLevel) + ctx := context.Background() + c := createFakeCluster(t, 4, true) + installSubscribeSupport(c) + env := &testEnv{ + fakeCluster: c, + testCtx: t, + } + adv := streamhelper.NewCheckpointAdvancer(env) + adv.StartTaskListener(ctx) + adv.SpawnSubscriptionHandler(ctx) + require.NoError(t, adv.OnTick(ctx)) + require.NoError(t, failpoint.Enable("github.com/pingcap/tidb/br/pkg/streamhelper/subscription-handler-loop", "pause")) + c.flushAll() + env.unregisterTask() + require.Eventually(t, func() bool { + return !adv.HasTask() + }, 10*time.Second, 100*time.Millisecond) + require.NoError(t, failpoint.Disable("github.com/pingcap/tidb/br/pkg/streamhelper/subscription-handler-loop")) + require.Eventually(t, func() bool { + return !adv.HasSubscribion() + }, 10*time.Second, 100*time.Millisecond) +} diff --git a/br/pkg/streamhelper/basic_lib_for_test.go b/br/pkg/streamhelper/basic_lib_for_test.go index 789d4e5f1d400..8d0381e6aae58 100644 --- a/br/pkg/streamhelper/basic_lib_for_test.go +++ b/br/pkg/streamhelper/basic_lib_for_test.go @@ -575,6 +575,7 @@ type testEnv struct { checkpoint uint64 testCtx *testing.T ranges []kv.KeyRange + taskCh chan<- streamhelper.TaskEvent resolveLocks func([]*txnlock.Lock, *tikv.KeyLocation) (*tikv.KeyLocation, error) @@ -596,6 +597,7 @@ func (t *testEnv) Begin(ctx context.Context, ch chan<- streamhelper.TaskEvent) e Ranges: rngs, } ch <- tsk + t.taskCh = ch return nil } @@ -625,6 +627,13 @@ func (t *testEnv) getCheckpoint() uint64 { return t.checkpoint } +func (t *testEnv) unregisterTask() { + t.taskCh <- streamhelper.TaskEvent{ + Type: streamhelper.EventDel, + Name: "whole", + } +} + func (t *testEnv) ScanLocksInOneRegion(bo *tikv.Backoffer, key []byte, maxVersion uint64, limit uint32) ([]*txnlock.Lock, *tikv.KeyLocation, error) { for _, r := range t.regions { if len(r.locks) != 0 { diff --git a/br/pkg/utils/worker.go b/br/pkg/utils/worker.go index ba69752ca5d23..565ee651620a1 100644 --- a/br/pkg/utils/worker.go +++ b/br/pkg/utils/worker.go @@ -130,3 +130,17 @@ func PanicToErr(err *error) { log.Warn("PanicToErr: panicked, recovering and returning error", zap.StackSkip("stack", 1), logutil.ShortError(*err)) } } + +// CatchAndLogPanic recovers when the execution get panicked, and log the panic. +// generally, this would be used with `defer`, like: +// +// func foo() { +// defer utils.CatchAndLogPanic() +// maybePanic() +// } +func CatchAndLogPanic() { + item := recover() + if item != nil { + log.Warn("CatchAndLogPanic: panicked, but ignored.", zap.StackSkip("stack", 1), zap.Any("panic", item)) + } +}