Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

br/operator: fix stuck when terminating #52264

Merged
merged 1 commit into from
Apr 3, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion br/pkg/backup/prepare_snap/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ go_test(
timeout = "short",
srcs = ["prepare_test.go"],
flaky = True,
shard_count = 9,
shard_count = 10,
deps = [
":prepare_snap",
"//br/pkg/utils",
Expand Down
37 changes: 29 additions & 8 deletions br/pkg/backup/prepare_snap/prepare.go
Original file line number Diff line number Diff line change
Expand Up @@ -192,19 +192,36 @@ func (p *Preparer) Finalize(ctx context.Context) error {
return nil
})
}
if err := eg.Wait(); err != nil {
logutil.CL(ctx).Warn("failed to finalize some prepare streams.", logutil.ShortError(err))
return err
}
logutil.CL(ctx).Info("all connections to store have shuted down.")
errCh := make(chan error, 1)
go func() {
if err := eg.Wait(); err != nil {
logutil.CL(ctx).Warn("failed to finalize some prepare streams.", logutil.ShortError(err))
errCh <- err
return
}
logutil.CL(ctx).Info("all connections to store have shuted down.")
errCh <- nil
}()
for {
select {
case event := <-p.eventChan:
case event, ok := <-p.eventChan:
if !ok {
return nil
}
if err := p.onEvent(ctx, event); err != nil {
return err
}
default:
return nil
case err, ok := <-errCh:
if !ok {
panic("unreachable.")
}
if err != nil {
return err
}
// All streams are finialized, they shouldn't send more events to event chan.
close(p.eventChan)
case <-ctx.Done():
return ctx.Err()
}
}
}
Expand Down Expand Up @@ -407,6 +424,10 @@ func (p *Preparer) streamOf(ctx context.Context, storeID uint64) (*prepareStream
}

func (p *Preparer) createAndCacheStream(ctx context.Context, cli PrepareClient, storeID uint64) error {
if _, ok := p.clients[storeID]; ok {
return nil
}

s := new(prepareStream)
s.storeID = storeID
s.output = p.eventChan
Expand Down
72 changes: 65 additions & 7 deletions br/pkg/backup/prepare_snap/prepare_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,12 @@ type mockStore struct {

successRegions []metapb.Region
onWaitApply func(*metapb.Region) error
now func() time.Time

waitApplyDelay func()
delaiedWaitApplies sync.WaitGroup

injectConnErr <-chan error
now func() time.Time
}

func (s *mockStore) Send(req *brpb.PrepareSnapshotBackupRequest) error {
Expand All @@ -67,7 +72,16 @@ func (s *mockStore) Send(req *brpb.PrepareSnapshotBackupRequest) error {
}
}
}
s.sendResp(resp)
if s.waitApplyDelay != nil {
s.delaiedWaitApplies.Add(1)
go func() {
defer s.delaiedWaitApplies.Done()
s.waitApplyDelay()
s.sendResp(resp)
}()
} else {
s.sendResp(resp)
}
if resp.Error == nil {
s.successRegions = append(s.successRegions, *region)
}
Expand Down Expand Up @@ -100,11 +114,21 @@ func (s *mockStore) sendResp(resp brpb.PrepareSnapshotBackupResponse) {
}

func (s *mockStore) Recv() (*brpb.PrepareSnapshotBackupResponse, error) {
out, ok := <-s.output
if !ok {
return nil, io.EOF
for {
select {
case out, ok := <-s.output:
if !ok {
return nil, io.EOF
}
return &out, nil
case err, ok := <-s.injectConnErr:
if ok {
return nil, err
} else {
s.injectConnErr = nil
}
}
}
return &out, nil
}

type mockStores struct {
Expand Down Expand Up @@ -167,7 +191,7 @@ func (m *mockStores) ConnectToStore(ctx context.Context, storeID uint64) (Prepar
s, ok := m.stores[storeID]
if !ok || s == nil {
m.stores[storeID] = &mockStore{
output: make(chan brpb.PrepareSnapshotBackupResponse, 16),
output: make(chan brpb.PrepareSnapshotBackupResponse, 20480),
successRegions: []metapb.Region{},
onWaitApply: func(r *metapb.Region) error {
return nil
Expand Down Expand Up @@ -538,3 +562,37 @@ func TestHooks(t *testing.T) {
req.NoError(adv.Finalize(context.Background()))
ms.AssertIsNormalMode(t)
}

func TestManyMessagesWhenFinalizing(t *testing.T) {
req := require.New(t)
pdc := fakeCluster(t, 3, dummyRegions(10240)...)
ms := newTestEnv(pdc)
blockCh := make(chan struct{})
injectErr := make(chan error)
ms.onCreateStore = func(ms *mockStore) {
ms.waitApplyDelay = func() {
<-blockCh
}
ms.injectConnErr = injectErr
}
prep := New(ms)
ctx := context.Background()
req.NoError(prep.PrepareConnections(ctx))
errC := async(func() error { return prep.DriveLoopAndWaitPrepare(ctx) })
injectErr <- errors.NewNoStackError("whoa!")
req.Error(<-errC)
close(blockCh)
for _, s := range ms.stores {
s.delaiedWaitApplies.Wait()
}
// Closing the stream should be error.
req.Error(prep.Finalize(ctx))
}

func async[T any](f func() T) <-chan T {
ch := make(chan T)
go func() {
ch <- f()
}()
return ch
}
10 changes: 9 additions & 1 deletion br/pkg/backup/prepare_snap/stream.go
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,9 @@ func (p *prepareStream) InitConn(ctx context.Context, cli PrepareClient) error {
return p.GoLeaseLoop(ctx, p.leaseDuration)
}

// Finalize cuts down this connection and remove the lease.
// This will block until all messages has been flushed to `output` channel.
// After this return, no more messages should be appended to the `output` channel.
func (p *prepareStream) Finalize(ctx context.Context) error {
log.Info("shutting down", zap.Uint64("store", p.storeID))
return p.stopClientLoop(ctx)
Expand Down Expand Up @@ -151,7 +154,8 @@ func (p *prepareStream) clientLoop(ctx context.Context, dur time.Duration) error
return nil
case res := <-p.serverStream:
if err := p.onResponse(ctx, res); err != nil {
p.sendErr(errors.Annotate(err, "failed to recv from the stream"))
err = errors.Annotate(err, "failed to recv from the stream")
p.sendErr(err)
return err
}
case <-ticker.C:
Expand Down Expand Up @@ -186,6 +190,10 @@ func (p *prepareStream) sendErr(err error) {
}

func (p *prepareStream) convertToEvent(resp *brpb.PrepareSnapshotBackupResponse) (event, bool) {
if resp == nil {
log.Warn("Received nil message, that shouldn't happen in a normal cluster.", zap.Uint64("store", p.storeID))
return event{}, false
}
switch resp.Ty {
case brpb.PrepareSnapshotBackupEventType_WaitApplyDone:
return event{
Expand Down
Loading