Skip to content

Commit

Permalink
This is an automated cherry-pick of pingcap#52671
Browse files Browse the repository at this point in the history
Signed-off-by: ti-chi-bot <ti-community-prow-bot@tidb.io>
  • Loading branch information
YuJuncen authored and ti-chi-bot committed May 31, 2024
1 parent d9c5bf6 commit 7006811
Show file tree
Hide file tree
Showing 4 changed files with 124 additions and 14 deletions.
4 changes: 4 additions & 0 deletions br/pkg/streamhelper/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,11 @@ go_test(
],
flaky = True,
race = "on",
<<<<<<< HEAD
shard_count = 26,
=======
shard_count = 27,
>>>>>>> f01f305fabb (log_backup: fix panic during advancer owner transfer (#52671))
deps = [
":streamhelper",
"//br/pkg/errors",
Expand Down
9 changes: 9 additions & 0 deletions br/pkg/streamhelper/advancer.go
Original file line number Diff line number Diff line change
Expand Up @@ -697,3 +697,12 @@ func (c *CheckpointAdvancer) asyncResolveLocksForRanges(ctx context.Context, tar
c.inResolvingLock.Store(false)
}()
}

func (c *CheckpointAdvancer) TEST_registerCallbackForSubscriptions(f func()) int {
cnt := 0
for _, sub := range c.subscriber.subscriptions {
sub.onDaemonExit = f
cnt += 1
}
return cnt
}
53 changes: 53 additions & 0 deletions br/pkg/streamhelper/advancer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -520,6 +520,7 @@ func TestCheckPointResume(t *testing.T) {
require.ErrorContains(t, adv.OnTick(ctx), "lagged too large")
}

<<<<<<< HEAD
func TestUnregisterAfterPause(t *testing.T) {
c := createFakeCluster(t, 4, false)
defer func() {
Expand All @@ -546,4 +547,56 @@ func TestUnregisterAfterPause(t *testing.T) {
err := adv.OnTick(ctx)
return err != nil && strings.Contains(err.Error(), "check point lagged too large")
}, 5*time.Second, 300*time.Millisecond)
=======
func TestOwnershipLost(t *testing.T) {
c := createFakeCluster(t, 4, false)
c.splitAndScatter(manyRegions(0, 10240)...)
installSubscribeSupport(c)
ctx, cancel := context.WithCancel(context.Background())
env := &testEnv{fakeCluster: c, testCtx: t}
adv := streamhelper.NewCheckpointAdvancer(env)
adv.OnStart(ctx)
adv.OnBecomeOwner(ctx)
require.NoError(t, adv.OnTick(ctx))
c.advanceCheckpoints()
c.flushAll()
failpoint.Enable("github.com/pingcap/tidb/br/pkg/streamhelper/subscription.listenOver.aboutToSend", "pause")
failpoint.Enable("github.com/pingcap/tidb/br/pkg/streamhelper/FlushSubscriber.Clear.timeoutMs", "return(500)")
wg := new(sync.WaitGroup)
wg.Add(adv.TEST_registerCallbackForSubscriptions(wg.Done))
cancel()
failpoint.Disable("github.com/pingcap/tidb/br/pkg/streamhelper/subscription.listenOver.aboutToSend")
wg.Wait()
}

func TestSubscriptionPanic(t *testing.T) {
c := createFakeCluster(t, 4, false)
c.splitAndScatter(manyRegions(0, 20)...)
installSubscribeSupport(c)
ctx, cancel := context.WithCancel(context.Background())
env := &testEnv{fakeCluster: c, testCtx: t}
adv := streamhelper.NewCheckpointAdvancer(env)
adv.OnStart(ctx)
adv.OnBecomeOwner(ctx)
wg := new(sync.WaitGroup)
wg.Add(adv.TEST_registerCallbackForSubscriptions(wg.Done))

require.NoError(t, adv.OnTick(ctx))
failpoint.Enable("github.com/pingcap/tidb/br/pkg/streamhelper/subscription.listenOver.aboutToSend", "5*panic")
ckpt := c.advanceCheckpoints()
c.flushAll()
cnt := 0
for {
require.NoError(t, adv.OnTick(ctx))
cnt++
if env.checkpoint >= ckpt {
break
}
if cnt > 100 {
t.Fatalf("After 100 times, the progress cannot be advanced.")
}
}
cancel()
wg.Wait()
>>>>>>> f01f305fabb (log_backup: fix panic during advancer owner transfer (#52671))
}
72 changes: 58 additions & 14 deletions br/pkg/streamhelper/flush_subscriber.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,8 +11,10 @@ import (

"github.com/google/uuid"
"github.com/pingcap/errors"
"github.com/pingcap/failpoint"
logbackup "github.com/pingcap/kvproto/pkg/logbackuppb"
"github.com/pingcap/log"
berrors "github.com/pingcap/tidb/br/pkg/errors"
"github.com/pingcap/tidb/br/pkg/logutil"
"github.com/pingcap/tidb/br/pkg/streamhelper/spans"
"github.com/pingcap/tidb/metrics"
Expand All @@ -23,6 +25,11 @@ import (
"google.golang.org/grpc/status"
)

const (
// clearSubscriberTimeOut is the timeout for clearing the subscriber.
clearSubscriberTimeOut = 1 * time.Minute
)

// FlushSubscriber maintains the state of subscribing to the cluster.
type FlushSubscriber struct {
dialer LogBackupService
Expand Down Expand Up @@ -86,17 +93,30 @@ func (f *FlushSubscriber) UpdateStoreTopology(ctx context.Context) error {
for id := range f.subscriptions {
_, ok := storeSet[id]
if !ok {
f.removeSubscription(id)
f.removeSubscription(ctx, id)
}
}
return nil
}

// Clear clears all the subscriptions.
func (f *FlushSubscriber) Clear() {
<<<<<<< HEAD
log.Info("[log backup flush subscriber] Clearing.")
=======
timeout := clearSubscriberTimeOut
failpoint.Inject("FlushSubscriber.Clear.timeoutMs", func(v failpoint.Value) {
//nolint:durationcheck
timeout = time.Duration(v.(int)) * time.Millisecond
})
log.Info("Clearing.",
zap.String("category", "log backup flush subscriber"),
zap.Duration("timeout", timeout))
cx, cancel := context.WithTimeout(context.Background(), timeout)
defer cancel()
>>>>>>> f01f305fabb (log_backup: fix panic during advancer owner transfer (#52671))
for id := range f.subscriptions {
f.removeSubscription(id)
f.removeSubscription(cx, id)
}
}

Expand Down Expand Up @@ -132,15 +152,11 @@ type eventStream = logbackup.LogBackup_SubscribeFlushEventClient

type joinHandle <-chan struct{}

func (jh joinHandle) WaitTimeOut(dur time.Duration) {
var t <-chan time.Time
if dur > 0 {
t = time.After(dur)
}
func (jh joinHandle) Wait(ctx context.Context) {
select {
case <-jh:
case <-t:
log.Warn("join handle timed out.")
case <-ctx.Done():
log.Warn("join handle timed out.", zap.StackSkip("caller", 1))
}
}

Expand Down Expand Up @@ -171,6 +187,8 @@ type subscription struct {
// we need to try reconnect even there is a error cannot be retry.
storeBootAt uint64
output chan<- spans.Valued

onDaemonExit func()
}

func (s *subscription) emitError(err error) {
Expand Down Expand Up @@ -213,7 +231,7 @@ func (s *subscription) doConnect(ctx context.Context, dialer LogBackupService) e
log.Info("[log backup subscription manager] Adding subscription.", zap.Uint64("store", s.storeID), zap.Uint64("boot", s.storeBootAt))
// We should shutdown the background task firstly.
// Once it yields some error during shuting down, the error won't be brought to next run.
s.close()
s.close(ctx)
s.clearError()

c, err := dialer.GetLogBackupClient(ctx, s.storeID)
Expand All @@ -236,10 +254,10 @@ func (s *subscription) doConnect(ctx context.Context, dialer LogBackupService) e
return nil
}

func (s *subscription) close() {
func (s *subscription) close(ctx context.Context) {
if s.cancel != nil {
s.cancel()
s.background.WaitTimeOut(1 * time.Minute)
s.background.Wait(ctx)
}
// HACK: don't close the internal channel here,
// because it is a ever-sharing channel.
Expand All @@ -248,6 +266,16 @@ func (s *subscription) close() {
func (s *subscription) listenOver(ctx context.Context, cli eventStream) {
storeID := s.storeID
logutil.CL(ctx).Info("Listen starting.", zap.Uint64("store", storeID))
defer func() {
if s.onDaemonExit != nil {
s.onDaemonExit()
}

if pData := recover(); pData != nil {
log.Warn("Subscriber paniked.", zap.Uint64("store", storeID), zap.Any("panic-data", pData), zap.Stack("stack"))
s.emitError(errors.Annotatef(berrors.ErrUnknown, "panic during executing: %v", pData))
}
}()
for {
// Shall we use RecvMsg for better performance?
// Note that the spans.Full requires the input slice be immutable.
Expand All @@ -262,6 +290,7 @@ func (s *subscription) listenOver(ctx context.Context, cli eventStream) {
return
}

log.Debug("Sending events.", zap.Int("size", len(msg.Events)))
for _, m := range msg.Events {
start, err := decodeKey(m.StartKey)
if err != nil {
Expand All @@ -275,13 +304,22 @@ func (s *subscription) listenOver(ctx context.Context, cli eventStream) {
logutil.Key("event", m.EndKey), logutil.ShortError(err))
continue
}
s.output <- spans.Valued{
failpoint.Inject("subscription.listenOver.aboutToSend", func() {})

evt := spans.Valued{
Key: spans.Span{
StartKey: start,
EndKey: end,
},
Value: m.Checkpoint,
}
select {
case s.output <- evt:
case <-ctx.Done():
logutil.CL(ctx).Warn("Context canceled while sending events.",
zap.Uint64("store", storeID))
return
}
}
metrics.RegionCheckpointSubscriptionEvent.WithLabelValues(strconv.Itoa(int(storeID))).Add(float64(len(msg.Events)))
}
Expand All @@ -291,11 +329,17 @@ func (f *FlushSubscriber) addSubscription(ctx context.Context, toStore Store) {
f.subscriptions[toStore.ID] = newSubscription(toStore, f.eventsTunnel)
}

func (f *FlushSubscriber) removeSubscription(toStore uint64) {
func (f *FlushSubscriber) removeSubscription(ctx context.Context, toStore uint64) {
subs, ok := f.subscriptions[toStore]
if ok {
<<<<<<< HEAD
log.Info("[log backup subscription manager] Removing subscription.", zap.Uint64("store", toStore))
subs.close()
=======
log.Info("Removing subscription.", zap.String("category", "log backup subscription manager"),
zap.Uint64("store", toStore))
subs.close(ctx)
>>>>>>> f01f305fabb (log_backup: fix panic during advancer owner transfer (#52671))
delete(f.subscriptions, toStore)
}
}
Expand Down

0 comments on commit 7006811

Please sign in to comment.