Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

br/streamhelper: fix panic while removing task (#50869) #51305

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion br/pkg/streamhelper/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,7 @@ go_test(
],
flaky = True,
race = "on",
shard_count = 20,
shard_count = 22,
deps = [
":streamhelper",
"//br/pkg/errors",
Expand Down
46 changes: 38 additions & 8 deletions br/pkg/streamhelper/advancer.go
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,22 @@
subscriberMu sync.Mutex
}

// HasTask returns whether the advancer has been bound to a task.
func (c *CheckpointAdvancer) HasTask() bool {
c.taskMu.Lock()
defer c.taskMu.Unlock()

return c.task != nil
}

// HasSubscriber returns whether the advancer is associated with a subscriber.
func (c *CheckpointAdvancer) HasSubscribion() bool {
c.subscriberMu.Lock()
defer c.subscriberMu.Unlock()

return c.subscriber != nil && len(c.subscriber.subscriptions) > 0
}

// checkpoint represents the TS with specific range.
// it's only used in advancer.go.
type checkpoint struct {
Expand Down Expand Up @@ -363,6 +379,12 @@
}()
}

func (c *CheckpointAdvancer) setCheckpoints(cps *spans.ValueSortedFull) {
c.checkpointsMu.Lock()
c.checkpoints = cps
c.checkpointsMu.Unlock()
}

func (c *CheckpointAdvancer) onTaskEvent(ctx context.Context, e TaskEvent) error {
c.taskMu.Lock()
defer c.taskMu.Unlock()
Expand All @@ -371,20 +393,20 @@
utils.LogBackupTaskCountInc()
c.task = e.Info
c.taskRange = spans.Collapse(len(e.Ranges), func(i int) kv.KeyRange { return e.Ranges[i] })
c.checkpoints = spans.Sorted(spans.NewFullWith(e.Ranges, 0))
c.setCheckpoints(spans.Sorted(spans.NewFullWith(e.Ranges, 0)))
c.lastCheckpoint = newCheckpointWithTS(e.Info.StartTs)
log.Info("added event", zap.Stringer("task", e.Info),
zap.Stringer("ranges", logutil.StringifyKeys(c.taskRange)))
case EventDel:
utils.LogBackupTaskCountDec()
c.task = nil
c.taskRange = nil
c.checkpoints = nil
// This would be synced by `taskMu`, perhaps we'd better rename that to `tickMu`.
// Do the null check because some of test cases won't equip the advancer with subscriber.
if c.subscriber != nil {
c.subscriber.Clear()
}
c.setCheckpoints(nil)
if err := c.env.ClearV3GlobalCheckpointForTask(ctx, e.Name); err != nil {
log.Warn("failed to clear global checkpoint", logutil.ShortError(err))
}
Expand Down Expand Up @@ -444,8 +466,10 @@
defer c.subscriberMu.Unlock()
c.subscriber = NewSubscriber(c.env, c.env, WithMasterContext(ctx))
es := c.subscriber.Events()
log.Info("Subscription handler spawned.", zap.String("category", "log backup subscription manager"))

go func() {
defer utils.CatchAndLogPanic()
for {
select {
case <-ctx.Done():
Expand All @@ -454,12 +478,18 @@
if !ok {
return
}
c.checkpointsMu.Lock()
log.Debug("Accepting region flush event.",
zap.Stringer("range", logutil.StringifyRange(event.Key)),
zap.Uint64("checkpoint", event.Value))
c.checkpoints.Merge(event)
c.checkpointsMu.Unlock()
failpoint.Inject("subscription-handler-loop", func() {})
c.WithCheckpoints(func(vsf *spans.ValueSortedFull) {
if vsf == nil {
log.Warn("Span tree not found, perhaps stale event of removed tasks.",
zap.String("category", "log backup subscription manager"))
return
}
log.Debug("Accepting region flush event.",
zap.Stringer("range", logutil.StringifyRange(event.Key)),
zap.Uint64("checkpoint", event.Value))
vsf.Merge(event)

Check warning on line 491 in br/pkg/streamhelper/advancer.go

View check run for this annotation

Codecov / codecov/patch

br/pkg/streamhelper/advancer.go#L488-L491

Added lines #L488 - L491 were not covered by tests
})
}
}
}()
Expand Down
26 changes: 26 additions & 0 deletions br/pkg/streamhelper/advancer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -373,3 +373,29 @@ func TestOwnerDropped(t *testing.T) {
require.Equal(t, vsf.MinValue(), cp)
})
}

// TestRemoveTaskAndFlush tests the bug has been described in #50839.
func TestRemoveTaskAndFlush(t *testing.T) {
log.SetLevel(zapcore.DebugLevel)
ctx := context.Background()
c := createFakeCluster(t, 4, true)
installSubscribeSupport(c)
env := &testEnv{
fakeCluster: c,
testCtx: t,
}
adv := streamhelper.NewCheckpointAdvancer(env)
adv.StartTaskListener(ctx)
adv.SpawnSubscriptionHandler(ctx)
require.NoError(t, adv.OnTick(ctx))
require.NoError(t, failpoint.Enable("github.com/pingcap/tidb/br/pkg/streamhelper/subscription-handler-loop", "pause"))
c.flushAll()
env.unregisterTask()
require.Eventually(t, func() bool {
return !adv.HasTask()
}, 10*time.Second, 100*time.Millisecond)
require.NoError(t, failpoint.Disable("github.com/pingcap/tidb/br/pkg/streamhelper/subscription-handler-loop"))
require.Eventually(t, func() bool {
return !adv.HasSubscribion()
}, 10*time.Second, 100*time.Millisecond)
}
9 changes: 9 additions & 0 deletions br/pkg/streamhelper/basic_lib_for_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -575,6 +575,7 @@ type testEnv struct {
checkpoint uint64
testCtx *testing.T
ranges []kv.KeyRange
taskCh chan<- streamhelper.TaskEvent

resolveLocks func([]*txnlock.Lock, *tikv.KeyLocation) (*tikv.KeyLocation, error)

Expand All @@ -596,6 +597,7 @@ func (t *testEnv) Begin(ctx context.Context, ch chan<- streamhelper.TaskEvent) e
Ranges: rngs,
}
ch <- tsk
t.taskCh = ch
return nil
}

Expand Down Expand Up @@ -625,6 +627,13 @@ func (t *testEnv) getCheckpoint() uint64 {
return t.checkpoint
}

func (t *testEnv) unregisterTask() {
t.taskCh <- streamhelper.TaskEvent{
Type: streamhelper.EventDel,
Name: "whole",
}
}

func (t *testEnv) ScanLocksInOneRegion(bo *tikv.Backoffer, key []byte, maxVersion uint64, limit uint32) ([]*txnlock.Lock, *tikv.KeyLocation, error) {
for _, r := range t.regions {
if len(r.locks) != 0 {
Expand Down
14 changes: 14 additions & 0 deletions br/pkg/utils/worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -130,3 +130,17 @@
log.Warn("PanicToErr: panicked, recovering and returning error", zap.StackSkip("stack", 1), logutil.ShortError(*err))
}
}

// CatchAndLogPanic recovers when the execution get panicked, and log the panic.
// generally, this would be used with `defer`, like:
//
// func foo() {
// defer utils.CatchAndLogPanic()
// maybePanic()
// }
func CatchAndLogPanic() {
item := recover()
if item != nil {
log.Warn("CatchAndLogPanic: panicked, but ignored.", zap.StackSkip("stack", 1), zap.Any("panic", item))
}

Check warning on line 145 in br/pkg/utils/worker.go

View check run for this annotation

Codecov / codecov/patch

br/pkg/utils/worker.go#L144-L145

Added lines #L144 - L145 were not covered by tests
}
Loading