Skip to content

Commit

Permalink
This is an automated cherry-pick of pingcap#50869
Browse files Browse the repository at this point in the history
Signed-off-by: ti-chi-bot <ti-community-prow-bot@tidb.io>
  • Loading branch information
YuJuncen authored and ti-chi-bot committed Feb 7, 2024
1 parent 8f2144a commit 5ddab6c
Show file tree
Hide file tree
Showing 4 changed files with 114 additions and 8 deletions.
4 changes: 4 additions & 0 deletions br/pkg/streamhelper/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,11 @@ go_test(
],
flaky = True,
race = "on",
<<<<<<< HEAD
shard_count = 20,
=======
shard_count = 22,
>>>>>>> 00f99ab1c41 (br/streamhelper: fix panic while removing task (#50869))
deps = [
":streamhelper",
"//br/pkg/errors",
Expand Down
46 changes: 38 additions & 8 deletions br/pkg/streamhelper/advancer.go
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,22 @@ type CheckpointAdvancer struct {
subscriberMu sync.Mutex
}

// HasTask returns whether the advancer has been bound to a task.
func (c *CheckpointAdvancer) HasTask() bool {
c.taskMu.Lock()
defer c.taskMu.Unlock()

return c.task != nil
}

// HasSubscriber returns whether the advancer is associated with a subscriber.
func (c *CheckpointAdvancer) HasSubscribion() bool {
c.subscriberMu.Lock()
defer c.subscriberMu.Unlock()

return c.subscriber != nil && len(c.subscriber.subscriptions) > 0
}

// checkpoint represents the TS with specific range.
// it's only used in advancer.go.
type checkpoint struct {
Expand Down Expand Up @@ -361,6 +377,12 @@ func (c *CheckpointAdvancer) StartTaskListener(ctx context.Context) {
}()
}

func (c *CheckpointAdvancer) setCheckpoints(cps *spans.ValueSortedFull) {
c.checkpointsMu.Lock()
c.checkpoints = cps
c.checkpointsMu.Unlock()
}

func (c *CheckpointAdvancer) onTaskEvent(ctx context.Context, e TaskEvent) error {
c.taskMu.Lock()
defer c.taskMu.Unlock()
Expand All @@ -369,7 +391,7 @@ func (c *CheckpointAdvancer) onTaskEvent(ctx context.Context, e TaskEvent) error
utils.LogBackupTaskCountInc()
c.task = e.Info
c.taskRange = spans.Collapse(len(e.Ranges), func(i int) kv.KeyRange { return e.Ranges[i] })
c.checkpoints = spans.Sorted(spans.NewFullWith(e.Ranges, 0))
c.setCheckpoints(spans.Sorted(spans.NewFullWith(e.Ranges, 0)))
c.lastCheckpoint = newCheckpointWithTS(e.Info.StartTs)
p, err := c.env.BlockGCUntil(ctx, c.task.StartTs)
if err != nil {
Expand All @@ -381,12 +403,12 @@ func (c *CheckpointAdvancer) onTaskEvent(ctx context.Context, e TaskEvent) error
utils.LogBackupTaskCountDec()
c.task = nil
c.taskRange = nil
c.checkpoints = nil
// This would be synced by `taskMu`, perhaps we'd better rename that to `tickMu`.
// Do the null check because some of test cases won't equip the advancer with subscriber.
if c.subscriber != nil {
c.subscriber.Clear()
}
c.setCheckpoints(nil)
if err := c.env.ClearV3GlobalCheckpointForTask(ctx, e.Name); err != nil {
log.Warn("failed to clear global checkpoint", logutil.ShortError(err))
}
Expand Down Expand Up @@ -448,8 +470,10 @@ func (c *CheckpointAdvancer) SpawnSubscriptionHandler(ctx context.Context) {
defer c.subscriberMu.Unlock()
c.subscriber = NewSubscriber(c.env, c.env, WithMasterContext(ctx))
es := c.subscriber.Events()
log.Info("Subscription handler spawned.", zap.String("category", "log backup subscription manager"))

go func() {
defer utils.CatchAndLogPanic()
for {
select {
case <-ctx.Done():
Expand All @@ -458,12 +482,18 @@ func (c *CheckpointAdvancer) SpawnSubscriptionHandler(ctx context.Context) {
if !ok {
return
}
c.checkpointsMu.Lock()
log.Debug("Accepting region flush event.",
zap.Stringer("range", logutil.StringifyRange(event.Key)),
zap.Uint64("checkpoint", event.Value))
c.checkpoints.Merge(event)
c.checkpointsMu.Unlock()
failpoint.Inject("subscription-handler-loop", func() {})
c.WithCheckpoints(func(vsf *spans.ValueSortedFull) {
if vsf == nil {
log.Warn("Span tree not found, perhaps stale event of removed tasks.",
zap.String("category", "log backup subscription manager"))
return
}
log.Debug("Accepting region flush event.",
zap.Stringer("range", logutil.StringifyRange(event.Key)),
zap.Uint64("checkpoint", event.Value))
vsf.Merge(event)
})
}
}
}()
Expand Down
26 changes: 26 additions & 0 deletions br/pkg/streamhelper/advancer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -396,3 +396,29 @@ func TestOwnerDropped(t *testing.T) {
require.Equal(t, vsf.MinValue(), cp)
})
}

// TestRemoveTaskAndFlush tests the bug has been described in #50839.
func TestRemoveTaskAndFlush(t *testing.T) {
log.SetLevel(zapcore.DebugLevel)
ctx := context.Background()
c := createFakeCluster(t, 4, true)
installSubscribeSupport(c)
env := &testEnv{
fakeCluster: c,
testCtx: t,
}
adv := streamhelper.NewCheckpointAdvancer(env)
adv.StartTaskListener(ctx)
adv.SpawnSubscriptionHandler(ctx)
require.NoError(t, adv.OnTick(ctx))
require.NoError(t, failpoint.Enable("github.com/pingcap/tidb/br/pkg/streamhelper/subscription-handler-loop", "pause"))
c.flushAll()
env.unregisterTask()
require.Eventually(t, func() bool {
return !adv.HasTask()
}, 10*time.Second, 100*time.Millisecond)
require.NoError(t, failpoint.Disable("github.com/pingcap/tidb/br/pkg/streamhelper/subscription-handler-loop"))
require.Eventually(t, func() bool {
return !adv.HasSubscribion()
}, 10*time.Second, 100*time.Millisecond)
}
46 changes: 46 additions & 0 deletions br/pkg/utils/worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -130,3 +130,49 @@ func PanicToErr(err *error) {
log.Warn("PanicToErr: panicked, recovering and returning error", zap.StackSkip("stack", 1), logutil.ShortError(*err))
}
}
<<<<<<< HEAD
=======

// CatchAndLogPanic recovers when the execution get panicked, and log the panic.
// generally, this would be used with `defer`, like:
//
// func foo() {
// defer utils.CatchAndLogPanic()
// maybePanic()
// }
func CatchAndLogPanic() {
item := recover()
if item != nil {
log.Warn("CatchAndLogPanic: panicked, but ignored.", zap.StackSkip("stack", 1), zap.Any("panic", item))
}
}

type Result[T any] struct {
Err error
Item T
}

func AsyncStreamBy[T any](generator func() (T, error)) <-chan Result[T] {
out := make(chan Result[T])
go func() {
defer close(out)
for {
item, err := generator()
if err != nil {
out <- Result[T]{Err: err}
return
}
out <- Result[T]{Item: item}
}
}()
return out
}

func BuildWorkerTokenChannel(size uint) chan struct{} {
ch := make(chan struct{}, size)
for i := 0; i < int(size); i += 1 {
ch <- struct{}{}
}
return ch
}
>>>>>>> 00f99ab1c41 (br/streamhelper: fix panic while removing task (#50869))

0 comments on commit 5ddab6c

Please sign in to comment.