diff --git a/br/pkg/streamhelper/BUILD.bazel b/br/pkg/streamhelper/BUILD.bazel index f0b8cb6a27e6f..c8e6c2b45c738 100644 --- a/br/pkg/streamhelper/BUILD.bazel +++ b/br/pkg/streamhelper/BUILD.bazel @@ -68,7 +68,7 @@ go_test( ], flaky = True, race = "on", - shard_count = 21, + shard_count = 22, deps = [ ":streamhelper", "//br/pkg/errors", diff --git a/br/pkg/streamhelper/advancer.go b/br/pkg/streamhelper/advancer.go index 81e1559147510..2a934e2d80f05 100644 --- a/br/pkg/streamhelper/advancer.go +++ b/br/pkg/streamhelper/advancer.go @@ -78,6 +78,22 @@ type CheckpointAdvancer struct { subscriberMu sync.Mutex } +// HasTask returns whether the advancer has been bound to a task. +func (c *CheckpointAdvancer) HasTask() bool { + c.taskMu.Lock() + defer c.taskMu.Unlock() + + return c.task != nil +} + +// HasSubscriber returns whether the advancer is associated with a subscriber. +func (c *CheckpointAdvancer) HasSubscribion() bool { + c.subscriberMu.Lock() + defer c.subscriberMu.Unlock() + + return c.subscriber != nil && len(c.subscriber.subscriptions) > 0 +} + // checkpoint represents the TS with specific range. // it's only used in advancer.go. type checkpoint struct { @@ -361,6 +377,12 @@ func (c *CheckpointAdvancer) StartTaskListener(ctx context.Context) { }() } +func (c *CheckpointAdvancer) setCheckpoints(cps *spans.ValueSortedFull) { + c.checkpointsMu.Lock() + c.checkpoints = cps + c.checkpointsMu.Unlock() +} + func (c *CheckpointAdvancer) onTaskEvent(ctx context.Context, e TaskEvent) error { c.taskMu.Lock() defer c.taskMu.Unlock() @@ -369,7 +391,7 @@ func (c *CheckpointAdvancer) onTaskEvent(ctx context.Context, e TaskEvent) error utils.LogBackupTaskCountInc() c.task = e.Info c.taskRange = spans.Collapse(len(e.Ranges), func(i int) kv.KeyRange { return e.Ranges[i] }) - c.checkpoints = spans.Sorted(spans.NewFullWith(e.Ranges, 0)) + c.setCheckpoints(spans.Sorted(spans.NewFullWith(e.Ranges, 0))) c.lastCheckpoint = newCheckpointWithTS(e.Info.StartTs) p, err := c.env.BlockGCUntil(ctx, c.task.StartTs) if err != nil { @@ -381,12 +403,12 @@ func (c *CheckpointAdvancer) onTaskEvent(ctx context.Context, e TaskEvent) error utils.LogBackupTaskCountDec() c.task = nil c.taskRange = nil - c.checkpoints = nil // This would be synced by `taskMu`, perhaps we'd better rename that to `tickMu`. // Do the null check because some of test cases won't equip the advancer with subscriber. if c.subscriber != nil { c.subscriber.Clear() } + c.setCheckpoints(nil) if err := c.env.ClearV3GlobalCheckpointForTask(ctx, e.Name); err != nil { log.Warn("failed to clear global checkpoint", logutil.ShortError(err)) } @@ -448,8 +470,10 @@ func (c *CheckpointAdvancer) SpawnSubscriptionHandler(ctx context.Context) { defer c.subscriberMu.Unlock() c.subscriber = NewSubscriber(c.env, c.env, WithMasterContext(ctx)) es := c.subscriber.Events() + log.Info("Subscription handler spawned.", zap.String("category", "log backup subscription manager")) go func() { + defer utils.CatchAndLogPanic() for { select { case <-ctx.Done(): @@ -458,12 +482,18 @@ func (c *CheckpointAdvancer) SpawnSubscriptionHandler(ctx context.Context) { if !ok { return } - c.checkpointsMu.Lock() - log.Debug("Accepting region flush event.", - zap.Stringer("range", logutil.StringifyRange(event.Key)), - zap.Uint64("checkpoint", event.Value)) - c.checkpoints.Merge(event) - c.checkpointsMu.Unlock() + failpoint.Inject("subscription-handler-loop", func() {}) + c.WithCheckpoints(func(vsf *spans.ValueSortedFull) { + if vsf == nil { + log.Warn("Span tree not found, perhaps stale event of removed tasks.", + zap.String("category", "log backup subscription manager")) + return + } + log.Debug("Accepting region flush event.", + zap.Stringer("range", logutil.StringifyRange(event.Key)), + zap.Uint64("checkpoint", event.Value)) + vsf.Merge(event) + }) } } }() diff --git a/br/pkg/streamhelper/advancer_test.go b/br/pkg/streamhelper/advancer_test.go index 037ff4949b2e2..1cd41dd0daa87 100644 --- a/br/pkg/streamhelper/advancer_test.go +++ b/br/pkg/streamhelper/advancer_test.go @@ -437,3 +437,29 @@ func TestOwnerDropped(t *testing.T) { require.Equal(t, vsf.MinValue(), cp) }) } + +// TestRemoveTaskAndFlush tests the bug has been described in #50839. +func TestRemoveTaskAndFlush(t *testing.T) { + log.SetLevel(zapcore.DebugLevel) + ctx := context.Background() + c := createFakeCluster(t, 4, true) + installSubscribeSupport(c) + env := &testEnv{ + fakeCluster: c, + testCtx: t, + } + adv := streamhelper.NewCheckpointAdvancer(env) + adv.StartTaskListener(ctx) + adv.SpawnSubscriptionHandler(ctx) + require.NoError(t, adv.OnTick(ctx)) + require.NoError(t, failpoint.Enable("github.com/pingcap/tidb/br/pkg/streamhelper/subscription-handler-loop", "pause")) + c.flushAll() + env.unregisterTask() + require.Eventually(t, func() bool { + return !adv.HasTask() + }, 10*time.Second, 100*time.Millisecond) + require.NoError(t, failpoint.Disable("github.com/pingcap/tidb/br/pkg/streamhelper/subscription-handler-loop")) + require.Eventually(t, func() bool { + return !adv.HasSubscribion() + }, 10*time.Second, 100*time.Millisecond) +} diff --git a/br/pkg/utils/worker.go b/br/pkg/utils/worker.go index ba69752ca5d23..565ee651620a1 100644 --- a/br/pkg/utils/worker.go +++ b/br/pkg/utils/worker.go @@ -130,3 +130,17 @@ func PanicToErr(err *error) { log.Warn("PanicToErr: panicked, recovering and returning error", zap.StackSkip("stack", 1), logutil.ShortError(*err)) } } + +// CatchAndLogPanic recovers when the execution get panicked, and log the panic. +// generally, this would be used with `defer`, like: +// +// func foo() { +// defer utils.CatchAndLogPanic() +// maybePanic() +// } +func CatchAndLogPanic() { + item := recover() + if item != nil { + log.Warn("CatchAndLogPanic: panicked, but ignored.", zap.StackSkip("stack", 1), zap.Any("panic", item)) + } +}