Skip to content

Commit

Permalink
make gRPC connections synced
Browse files Browse the repository at this point in the history
Signed-off-by: Yu Juncen <yu745514916@live.com>
  • Loading branch information
YuJuncen committed Mar 25, 2024
1 parent 9b6db9d commit 385205f
Show file tree
Hide file tree
Showing 2 changed files with 31 additions and 2 deletions.
31 changes: 30 additions & 1 deletion br/pkg/backup/prepare_snap/env.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ package preparesnap
import (
"context"
"slices"
"sync"
"time"

"github.com/docker/go-units"
Expand Down Expand Up @@ -110,6 +111,34 @@ func (c CliEnv) GetAllLiveStores(ctx context.Context) ([]*metapb.Store, error) {
return withoutTiFlash, err
}

func AdaptForGRPCInTest(p PrepareClient) PrepareClient {
return &gRPCGoAdapter{
inner: p,
}
}

// GrpcGoAdapter makes the `Send` call synchronous.
// grpc-go doesn't guarantee concurrency call to `Send` or `Recv` is safe.
// But concurrency call to `send` and `recv` is safe.
// This type is exported for testing.
type gRPCGoAdapter struct {
inner PrepareClient
sendMu sync.Mutex
recvMu sync.Mutex
}

func (s *gRPCGoAdapter) Send(req *brpb.PrepareSnapshotBackupRequest) error {
s.sendMu.Lock()
defer s.sendMu.Unlock()
return s.inner.Send(req)
}

func (s *gRPCGoAdapter) Recv() (*brpb.PrepareSnapshotBackupResponse, error) {
s.recvMu.Lock()
defer s.recvMu.Unlock()
return s.inner.Recv()
}

func (c CliEnv) ConnectToStore(ctx context.Context, storeID uint64) (PrepareClient, error) {
var cli brpb.Backup_PrepareSnapshotBackupClient
err := c.Mgr.TryWithConn(ctx, storeID, func(cc *grpc.ClientConn) error {
Expand All @@ -124,7 +153,7 @@ func (c CliEnv) ConnectToStore(ctx context.Context, storeID uint64) (PrepareClie
if err != nil {
return nil, err
}
return cli, nil
return &gRPCGoAdapter{inner: cli}, nil
}

func (c CliEnv) LoadRegionsInKeyRange(ctx context.Context, startKey []byte, endKey []byte) (regions []Region, err error) {
Expand Down
2 changes: 1 addition & 1 deletion br/pkg/backup/prepare_snap/prepare_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -177,7 +177,7 @@ func (m *mockStores) ConnectToStore(ctx context.Context, storeID uint64) (Prepar
}
m.onCreateStore(m.stores[storeID])
}
return m.stores[storeID], nil
return AdaptForGRPCInTest(m.stores[storeID]), nil
}

func (m *mockStores) LoadRegionsInKeyRange(ctx context.Context, startKey []byte, endKey []byte) (regions []Region, err error) {
Expand Down

0 comments on commit 385205f

Please sign in to comment.