Skip to content

Commit

Permalink
br/streamhelper: fix panic while removing task (pingcap#50869)
Browse files Browse the repository at this point in the history
  • Loading branch information
YuJuncen authored and BornChanger committed Feb 26, 2024
1 parent cd073ea commit df231d0
Show file tree
Hide file tree
Showing 5 changed files with 88 additions and 9 deletions.
2 changes: 1 addition & 1 deletion br/pkg/streamhelper/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,7 @@ go_test(
],
flaky = True,
race = "on",
shard_count = 20,
shard_count = 22,
deps = [
":streamhelper",
"//br/pkg/errors",
Expand Down
46 changes: 38 additions & 8 deletions br/pkg/streamhelper/advancer.go
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,22 @@ type CheckpointAdvancer struct {
subscriberMu sync.Mutex
}

// HasTask returns whether the advancer has been bound to a task.
func (c *CheckpointAdvancer) HasTask() bool {
c.taskMu.Lock()
defer c.taskMu.Unlock()

return c.task != nil
}

// HasSubscriber returns whether the advancer is associated with a subscriber.
func (c *CheckpointAdvancer) HasSubscribion() bool {
c.subscriberMu.Lock()
defer c.subscriberMu.Unlock()

return c.subscriber != nil && len(c.subscriber.subscriptions) > 0
}

// checkpoint represents the TS with specific range.
// it's only used in advancer.go.
type checkpoint struct {
Expand Down Expand Up @@ -363,6 +379,12 @@ func (c *CheckpointAdvancer) StartTaskListener(ctx context.Context) {
}()
}

func (c *CheckpointAdvancer) setCheckpoints(cps *spans.ValueSortedFull) {
c.checkpointsMu.Lock()
c.checkpoints = cps
c.checkpointsMu.Unlock()
}

func (c *CheckpointAdvancer) onTaskEvent(ctx context.Context, e TaskEvent) error {
c.taskMu.Lock()
defer c.taskMu.Unlock()
Expand All @@ -371,20 +393,20 @@ func (c *CheckpointAdvancer) onTaskEvent(ctx context.Context, e TaskEvent) error
utils.LogBackupTaskCountInc()
c.task = e.Info
c.taskRange = spans.Collapse(len(e.Ranges), func(i int) kv.KeyRange { return e.Ranges[i] })
c.checkpoints = spans.Sorted(spans.NewFullWith(e.Ranges, 0))
c.setCheckpoints(spans.Sorted(spans.NewFullWith(e.Ranges, 0)))
c.lastCheckpoint = newCheckpointWithTS(e.Info.StartTs)
log.Info("added event", zap.Stringer("task", e.Info),
zap.Stringer("ranges", logutil.StringifyKeys(c.taskRange)))
case EventDel:
utils.LogBackupTaskCountDec()
c.task = nil
c.taskRange = nil
c.checkpoints = nil
// This would be synced by `taskMu`, perhaps we'd better rename that to `tickMu`.
// Do the null check because some of test cases won't equip the advancer with subscriber.
if c.subscriber != nil {
c.subscriber.Clear()
}
c.setCheckpoints(nil)
if err := c.env.ClearV3GlobalCheckpointForTask(ctx, e.Name); err != nil {
log.Warn("failed to clear global checkpoint", logutil.ShortError(err))
}
Expand Down Expand Up @@ -444,8 +466,10 @@ func (c *CheckpointAdvancer) SpawnSubscriptionHandler(ctx context.Context) {
defer c.subscriberMu.Unlock()
c.subscriber = NewSubscriber(c.env, c.env, WithMasterContext(ctx))
es := c.subscriber.Events()
log.Info("Subscription handler spawned.", zap.String("category", "log backup subscription manager"))

go func() {
defer utils.CatchAndLogPanic()
for {
select {
case <-ctx.Done():
Expand All @@ -454,12 +478,18 @@ func (c *CheckpointAdvancer) SpawnSubscriptionHandler(ctx context.Context) {
if !ok {
return
}
c.checkpointsMu.Lock()
log.Debug("Accepting region flush event.",
zap.Stringer("range", logutil.StringifyRange(event.Key)),
zap.Uint64("checkpoint", event.Value))
c.checkpoints.Merge(event)
c.checkpointsMu.Unlock()
failpoint.Inject("subscription-handler-loop", func() {})
c.WithCheckpoints(func(vsf *spans.ValueSortedFull) {
if vsf == nil {
log.Warn("Span tree not found, perhaps stale event of removed tasks.",
zap.String("category", "log backup subscription manager"))
return
}
log.Debug("Accepting region flush event.",
zap.Stringer("range", logutil.StringifyRange(event.Key)),
zap.Uint64("checkpoint", event.Value))
vsf.Merge(event)
})
}
}
}()
Expand Down
26 changes: 26 additions & 0 deletions br/pkg/streamhelper/advancer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -373,3 +373,29 @@ func TestOwnerDropped(t *testing.T) {
require.Equal(t, vsf.MinValue(), cp)
})
}

// TestRemoveTaskAndFlush tests the bug has been described in #50839.
func TestRemoveTaskAndFlush(t *testing.T) {
log.SetLevel(zapcore.DebugLevel)
ctx := context.Background()
c := createFakeCluster(t, 4, true)
installSubscribeSupport(c)
env := &testEnv{
fakeCluster: c,
testCtx: t,
}
adv := streamhelper.NewCheckpointAdvancer(env)
adv.StartTaskListener(ctx)
adv.SpawnSubscriptionHandler(ctx)
require.NoError(t, adv.OnTick(ctx))
require.NoError(t, failpoint.Enable("github.com/pingcap/tidb/br/pkg/streamhelper/subscription-handler-loop", "pause"))
c.flushAll()
env.unregisterTask()
require.Eventually(t, func() bool {
return !adv.HasTask()
}, 10*time.Second, 100*time.Millisecond)
require.NoError(t, failpoint.Disable("github.com/pingcap/tidb/br/pkg/streamhelper/subscription-handler-loop"))
require.Eventually(t, func() bool {
return !adv.HasSubscribion()
}, 10*time.Second, 100*time.Millisecond)
}
9 changes: 9 additions & 0 deletions br/pkg/streamhelper/basic_lib_for_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -575,6 +575,7 @@ type testEnv struct {
checkpoint uint64
testCtx *testing.T
ranges []kv.KeyRange
taskCh chan<- streamhelper.TaskEvent

resolveLocks func([]*txnlock.Lock, *tikv.KeyLocation) (*tikv.KeyLocation, error)

Expand All @@ -596,6 +597,7 @@ func (t *testEnv) Begin(ctx context.Context, ch chan<- streamhelper.TaskEvent) e
Ranges: rngs,
}
ch <- tsk
t.taskCh = ch
return nil
}

Expand Down Expand Up @@ -625,6 +627,13 @@ func (t *testEnv) getCheckpoint() uint64 {
return t.checkpoint
}

func (t *testEnv) unregisterTask() {
t.taskCh <- streamhelper.TaskEvent{
Type: streamhelper.EventDel,
Name: "whole",
}
}

func (t *testEnv) ScanLocksInOneRegion(bo *tikv.Backoffer, key []byte, maxVersion uint64, limit uint32) ([]*txnlock.Lock, *tikv.KeyLocation, error) {
for _, r := range t.regions {
if len(r.locks) != 0 {
Expand Down
14 changes: 14 additions & 0 deletions br/pkg/utils/worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -130,3 +130,17 @@ func PanicToErr(err *error) {
log.Warn("PanicToErr: panicked, recovering and returning error", zap.StackSkip("stack", 1), logutil.ShortError(*err))
}
}

// CatchAndLogPanic recovers when the execution get panicked, and log the panic.
// generally, this would be used with `defer`, like:
//
// func foo() {
// defer utils.CatchAndLogPanic()
// maybePanic()
// }
func CatchAndLogPanic() {
item := recover()
if item != nil {
log.Warn("CatchAndLogPanic: panicked, but ignored.", zap.StackSkip("stack", 1), zap.Any("panic", item))
}
}

0 comments on commit df231d0

Please sign in to comment.