Skip to content

Commit

Permalink
br: pick some EBS backup bug fixes to 7.5 (pingcap#53027)
Browse files Browse the repository at this point in the history
  • Loading branch information
YuJuncen authored May 7, 2024
1 parent dbd8ea2 commit 1e19051
Show file tree
Hide file tree
Showing 14 changed files with 397 additions and 102 deletions.
42 changes: 7 additions & 35 deletions MODULE.bazel.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

3 changes: 2 additions & 1 deletion br/pkg/backup/prepare_snap/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ go_library(
"@com_github_docker_go_units//:go-units",
"@com_github_google_btree//:btree",
"@com_github_pingcap_errors//:errors",
"@com_github_pingcap_failpoint//:failpoint",
"@com_github_pingcap_kvproto//pkg/brpb",
"@com_github_pingcap_kvproto//pkg/errorpb",
"@com_github_pingcap_kvproto//pkg/metapb",
Expand All @@ -35,7 +36,7 @@ go_test(
timeout = "short",
srcs = ["prepare_test.go"],
flaky = True,
shard_count = 7,
shard_count = 10,
deps = [
":prepare_snap",
"//br/pkg/utils",
Expand Down
36 changes: 32 additions & 4 deletions br/pkg/backup/prepare_snap/env.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ package preparesnap
import (
"context"
"slices"
"sync"
"time"

"github.com/docker/go-units"
Expand Down Expand Up @@ -110,6 +111,34 @@ func (c CliEnv) GetAllLiveStores(ctx context.Context) ([]*metapb.Store, error) {
return withoutTiFlash, err
}

func AdaptForGRPCInTest(p PrepareClient) PrepareClient {
return &gRPCGoAdapter{
inner: p,
}
}

// GrpcGoAdapter makes the `Send` call synchronous.
// grpc-go doesn't guarantee concurrency call to `Send` or `Recv` is safe.
// But concurrency call to `send` and `recv` is safe.
// This type is exported for testing.
type gRPCGoAdapter struct {
inner PrepareClient
sendMu sync.Mutex
recvMu sync.Mutex
}

func (s *gRPCGoAdapter) Send(req *brpb.PrepareSnapshotBackupRequest) error {
s.sendMu.Lock()
defer s.sendMu.Unlock()
return s.inner.Send(req)
}

func (s *gRPCGoAdapter) Recv() (*brpb.PrepareSnapshotBackupResponse, error) {
s.recvMu.Lock()
defer s.recvMu.Unlock()
return s.inner.Recv()
}

func (c CliEnv) ConnectToStore(ctx context.Context, storeID uint64) (PrepareClient, error) {
var cli brpb.Backup_PrepareSnapshotBackupClient
err := c.Mgr.TryWithConn(ctx, storeID, func(cc *grpc.ClientConn) error {
Expand All @@ -124,7 +153,7 @@ func (c CliEnv) ConnectToStore(ctx context.Context, storeID uint64) (PrepareClie
if err != nil {
return nil, err
}
return cli, nil
return &gRPCGoAdapter{inner: cli}, nil
}

func (c CliEnv) LoadRegionsInKeyRange(ctx context.Context, startKey []byte, endKey []byte) (regions []Region, err error) {
Expand All @@ -151,9 +180,8 @@ type RetryAndSplitRequestEnv struct {
}

func (r RetryAndSplitRequestEnv) ConnectToStore(ctx context.Context, storeID uint64) (PrepareClient, error) {
// Retry for about 2 minutes.
rs := utils.InitialRetryState(12, 10*time.Second, 10*time.Second)
bo := utils.Backoffer(&rs)
rs := utils.ConstantBackoff(10 * time.Second)
bo := utils.Backoffer(rs)
if r.GetBackoffer != nil {
bo = r.GetBackoffer()
}
Expand Down
97 changes: 75 additions & 22 deletions br/pkg/backup/prepare_snap/prepare.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import (

"github.com/google/btree"
"github.com/pingcap/errors"
"github.com/pingcap/failpoint"
brpb "github.com/pingcap/kvproto/pkg/brpb"
"github.com/pingcap/kvproto/pkg/metapb"
"github.com/pingcap/log"
Expand Down Expand Up @@ -91,6 +92,9 @@ type Preparer struct {
RetryBackoff time.Duration
RetryLimit int
LeaseDuration time.Duration

/* Observers. Initialize them before starting.*/
AfterConnectionsEstablished func()
}

func New(env Env) *Preparer {
Expand Down Expand Up @@ -155,10 +159,13 @@ func (p *Preparer) DriveLoopAndWaitPrepare(ctx context.Context) error {
zap.Int("retry_limit", p.RetryLimit),
zap.Duration("lease_duration", p.LeaseDuration))
p.retryTime = 0
if err := p.prepareConnections(ctx); err != nil {
if err := p.PrepareConnections(ctx); err != nil {
log.Error("failed to prepare connections", logutil.ShortError(err))
return errors.Annotate(err, "failed to prepare connections")
}
if p.AfterConnectionsEstablished != nil {
p.AfterConnectionsEstablished()
}
if err := p.AdvanceState(ctx); err != nil {
log.Error("failed to check the progress of our work", logutil.ShortError(err))
return errors.Annotate(err, "failed to begin step")
Expand Down Expand Up @@ -186,19 +193,36 @@ func (p *Preparer) Finalize(ctx context.Context) error {
return nil
})
}
if err := eg.Wait(); err != nil {
logutil.CL(ctx).Warn("failed to finalize some prepare streams.", logutil.ShortError(err))
return err
}
logutil.CL(ctx).Info("all connections to store have shuted down.")
errCh := make(chan error, 1)
go func() {
if err := eg.Wait(); err != nil {
logutil.CL(ctx).Warn("failed to finalize some prepare streams.", logutil.ShortError(err))
errCh <- err
return
}
logutil.CL(ctx).Info("all connections to store have shuted down.")
errCh <- nil
}()
for {
select {
case event := <-p.eventChan:
case event, ok := <-p.eventChan:
if !ok {
return nil
}
if err := p.onEvent(ctx, event); err != nil {
return err
}
default:
return nil
case err, ok := <-errCh:
if !ok {
panic("unreachable.")
}
if err != nil {
return err
}
// All streams are finialized, they shouldn't send more events to event chan.
close(p.eventChan)
case <-ctx.Done():
return ctx.Err()
}
}
}
Expand Down Expand Up @@ -385,23 +409,35 @@ func (p *Preparer) sendWaitApply(ctx context.Context, reqs pendingRequests) erro
}

func (p *Preparer) streamOf(ctx context.Context, storeID uint64) (*prepareStream, error) {
s, ok := p.clients[storeID]
_, ok := p.clients[storeID]
if !ok {
log.Warn("stream of store found a store not established connection", zap.Uint64("store", storeID))
cli, err := p.env.ConnectToStore(ctx, storeID)
if err != nil {
return nil, errors.Annotatef(err, "failed to dial store %d", storeID)
}
s = new(prepareStream)
s.storeID = storeID
s.output = p.eventChan
s.leaseDuration = p.LeaseDuration
err = s.InitConn(ctx, cli)
if err != nil {
return nil, err
if err := p.createAndCacheStream(ctx, cli, storeID); err != nil {
return nil, errors.Annotatef(err, "failed to create and cache stream for store %d", storeID)
}
p.clients[storeID] = s
}
return s, nil
return p.clients[storeID], nil
}

func (p *Preparer) createAndCacheStream(ctx context.Context, cli PrepareClient, storeID uint64) error {
if _, ok := p.clients[storeID]; ok {
return nil
}

s := new(prepareStream)
s.storeID = storeID
s.output = p.eventChan
s.leaseDuration = p.LeaseDuration
err := s.InitConn(ctx, cli)
if err != nil {
return err
}
p.clients[storeID] = s
return nil
}

func (p *Preparer) pushWaitApply(reqs pendingRequests, region Region) {
Expand All @@ -414,17 +450,34 @@ func (p *Preparer) pushWaitApply(reqs pendingRequests, region Region) {
p.inflightReqs[region.GetMeta().Id] = *region.GetMeta()
}

func (p *Preparer) prepareConnections(ctx context.Context) error {
// PrepareConnections prepares the connections for each store.
// This will pause the admin commands for each store.
func (p *Preparer) PrepareConnections(ctx context.Context) error {
failpoint.Inject("PrepareConnectionsErr", func() {
failpoint.Return(errors.New("mock PrepareConnectionsErr"))
})
log.Info("Preparing connections to stores.")
stores, err := p.env.GetAllLiveStores(ctx)
if err != nil {
return errors.Annotate(err, "failed to get all live stores")
}

log.Info("Start to initialize the connections.", zap.Int("stores", len(stores)))
clients := map[uint64]PrepareClient{}
for _, store := range stores {
_, err := p.streamOf(ctx, store.Id)
cli, err := p.env.ConnectToStore(ctx, store.Id)
if err != nil {
return errors.Annotatef(err, "failed to prepare connection to store %d", store.Id)
return errors.Annotatef(err, "failed to dial the store %d", store.Id)
}
clients[store.Id] = cli
}

for id, cli := range clients {
log.Info("Start to pause the admin commands.", zap.Uint64("store", id))
if err := p.createAndCacheStream(ctx, cli, id); err != nil {
return errors.Annotatef(err, "failed to create and cache stream for store %d", id)
}
}

return nil
}
Loading

0 comments on commit 1e19051

Please sign in to comment.