Skip to content

Commit

Permalink
client, server: add ScanRegions gRPC protocol support (#1535)
Browse files Browse the repository at this point in the history
* client, server: support ScanRegions gRPC protocol

Signed-off-by: disksing <i@disksing.com>
  • Loading branch information
disksing authored May 17, 2019
1 parent 6febb24 commit 65eb45a
Show file tree
Hide file tree
Showing 5 changed files with 111 additions and 3 deletions.
27 changes: 27 additions & 0 deletions client/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,11 @@ type Client interface {
GetPrevRegion(ctx context.Context, key []byte) (*metapb.Region, *metapb.Peer, error)
// GetRegionByID gets a region and its leader Peer from PD by id.
GetRegionByID(ctx context.Context, regionID uint64) (*metapb.Region, *metapb.Peer, error)
// ScanRegion gets a list of regions, starts from the region that contains key.
// Limit limits the maximum number of regions returned.
// If a region has no leader, corresponding leader will be placed by a peer
// with empty value (PeerID is 0).
ScanRegions(ctx context.Context, key []byte, limit int) ([]*metapb.Region, []*metapb.Peer, error)
// GetStore gets a store from PD by store id.
// The store may expire later. Caller is responsible for caching and taking care
// of store change.
Expand Down Expand Up @@ -681,6 +686,28 @@ func (c *client) GetRegionByID(ctx context.Context, regionID uint64) (*metapb.Re
return resp.GetRegion(), resp.GetLeader(), nil
}

func (c *client) ScanRegions(ctx context.Context, key []byte, limit int) ([]*metapb.Region, []*metapb.Peer, error) {
if span := opentracing.SpanFromContext(ctx); span != nil {
span = opentracing.StartSpan("pdclient.ScanRegions", opentracing.ChildOf(span.Context()))
defer span.Finish()
}
start := time.Now()
defer cmdDuration.WithLabelValues("scan_regions").Observe(time.Since(start).Seconds())
ctx, cancel := context.WithTimeout(ctx, pdTimeout)
resp, err := c.leaderClient().ScanRegions(ctx, &pdpb.ScanRegionsRequest{
Header: c.requestHeader(),
StartKey: key,
Limit: int32(limit),
})
cancel()
if err != nil {
cmdFailedDuration.WithLabelValues("scan_regions").Observe(time.Since(start).Seconds())
c.ScheduleCheckLeader()
return nil, nil, errors.WithStack(err)
}
return resp.GetRegions(), resp.GetLeaders(), nil
}

func (c *client) GetStore(ctx context.Context, storeID uint64) (*metapb.Store, error) {
if span := opentracing.SpanFromContext(ctx); span != nil {
span = opentracing.StartSpan("pdclient.GetStore", opentracing.ChildOf(span.Context()))
Expand Down
58 changes: 58 additions & 0 deletions client/client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -260,6 +260,64 @@ func (s *testClientSuite) TestGetPrevRegion(c *C) {
c.Succeed()
}

func (s *testClientSuite) TestScanRegions(c *C) {
regionLen := 10
regions := make([]*metapb.Region, 0, regionLen)
for i := 0; i < regionLen; i++ {
regionID := regionIDAllocator.alloc()
r := &metapb.Region{
Id: regionID,
RegionEpoch: &metapb.RegionEpoch{
ConfVer: 1,
Version: 1,
},
StartKey: []byte{byte(i)},
EndKey: []byte{byte(i + 1)},
Peers: peers,
}
regions = append(regions, r)
req := &pdpb.RegionHeartbeatRequest{
Header: newHeader(s.srv),
Region: r,
Leader: peers[0],
}
err := s.regionHeartbeat.Send(req)
c.Assert(err, IsNil)
}

// Wait for region heartbeats.
testutil.WaitUntil(c, func(c *C) bool {
scanRegions, _, err := s.client.ScanRegions(context.Background(), []byte{0}, 10)
return err == nil && len(scanRegions) == 10
})

// Set leader of region3 to nil.
region3 := core.NewRegionInfo(regions[3], nil)
s.srv.GetRaftCluster().HandleRegionHeartbeat(region3)

check := func(start []byte, limit int, expect []*metapb.Region) {
scanRegions, leaders, err := s.client.ScanRegions(context.Background(), start, limit)
c.Assert(err, IsNil)
c.Assert(scanRegions, HasLen, len(expect))
c.Assert(leaders, HasLen, len(expect))
c.Log("scanRegions", scanRegions)
c.Log("expect", expect)
c.Log("scanLeaders", leaders)
for i := range expect {
c.Assert(scanRegions[i], DeepEquals, expect[i])
if scanRegions[i].GetId() == region3.GetID() {
c.Assert(leaders[i], DeepEquals, &metapb.Peer{})
} else {
c.Assert(leaders[i], DeepEquals, expect[i].Peers[0])
}
}
}

check([]byte{0}, 10, regions)
check([]byte{1}, 5, regions[1:6])
check([]byte{100}, 1, nil)
}

func (s *testClientSuite) TestGetRegionByID(c *C) {
regionID := regionIDAllocator.alloc()
region := &metapb.Region{
Expand Down
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ require (
github.com/pingcap/errcode v0.0.0-20180921232412-a1a7271709d9
github.com/pingcap/errors v0.10.1 // indirect
github.com/pingcap/failpoint v0.0.0-20190512135322-30cc7431d99c
github.com/pingcap/kvproto v0.0.0-20190327032727-3d8cb3a30d5d
github.com/pingcap/kvproto v0.0.0-20190516013202-4cf58ad90b6c
github.com/pingcap/log v0.0.0-20190214045112-b37da76f67a7
github.com/pkg/errors v0.8.1
github.com/prometheus/client_golang v0.8.0
Expand Down
4 changes: 2 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -107,8 +107,8 @@ github.com/pingcap/errors v0.10.1 h1:fGVuPMtwNcxbzQ3aoRyyi6kxvXKMkEsceP81f3b8wsk
github.com/pingcap/errors v0.10.1/go.mod h1:Oi8TUi2kEtXXLMJk9l1cGmz20kV3TaQ0usTwv5KuLY8=
github.com/pingcap/failpoint v0.0.0-20190512135322-30cc7431d99c h1:hvQd3aOLKLF7xvRV6DzvPkKY4QXzfVbjU1BhW0d9yL8=
github.com/pingcap/failpoint v0.0.0-20190512135322-30cc7431d99c/go.mod h1:DNS3Qg7bEDhU6EXNHF+XSv/PGznQaMJ5FWvctpm6pQI=
github.com/pingcap/kvproto v0.0.0-20190327032727-3d8cb3a30d5d h1:LJYJl+cBhkkTWD79n+n9Bp4agQ85SdF9YKY4zEtL9Kw=
github.com/pingcap/kvproto v0.0.0-20190327032727-3d8cb3a30d5d/go.mod h1:QMdbTAXCHzzygQzqcG9uVUgU2fKeSN1GmfMiykdSzzY=
github.com/pingcap/kvproto v0.0.0-20190516013202-4cf58ad90b6c h1:pY/MQQ5UajEHfSnQS8rFAM9gw9bBKzqBl414cdfhpRQ=
github.com/pingcap/kvproto v0.0.0-20190516013202-4cf58ad90b6c/go.mod h1:QMdbTAXCHzzygQzqcG9uVUgU2fKeSN1GmfMiykdSzzY=
github.com/pingcap/log v0.0.0-20190214045112-b37da76f67a7 h1:kOHAMalwF69bJrtWrOdVaCSvZjLucrJhP4NQKIu6uM4=
github.com/pingcap/log v0.0.0-20190214045112-b37da76f67a7/go.mod h1:xsfkWVaFVV5B8e1K9seWfyJWFrIhbtUTAD8NV1Pq3+w=
github.com/pkg/errors v0.8.0/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0=
Expand Down
23 changes: 23 additions & 0 deletions server/grpc_service.go
Original file line number Diff line number Diff line change
Expand Up @@ -447,6 +447,29 @@ func (s *Server) GetRegionByID(ctx context.Context, request *pdpb.GetRegionByIDR
}, nil
}

// ScanRegions implements gRPC PDServer.
func (s *Server) ScanRegions(ctx context.Context, request *pdpb.ScanRegionsRequest) (*pdpb.ScanRegionsResponse, error) {
if err := s.validateRequest(request.GetHeader()); err != nil {
return nil, err
}

cluster := s.GetRaftCluster()
if cluster == nil {
return &pdpb.ScanRegionsResponse{Header: s.notBootstrappedHeader()}, nil
}
regions := cluster.ScanRegionsByKey(request.GetStartKey(), int(request.GetLimit()))
resp := &pdpb.ScanRegionsResponse{Header: s.header()}
for _, r := range regions {
leader := r.GetLeader()
if leader == nil {
leader = &metapb.Peer{}
}
resp.Regions = append(resp.Regions, r.GetMeta())
resp.Leaders = append(resp.Leaders, leader)
}
return resp, nil
}

// AskSplit implements gRPC PDServer.
func (s *Server) AskSplit(ctx context.Context, request *pdpb.AskSplitRequest) (*pdpb.AskSplitResponse, error) {
if err := s.validateRequest(request.GetHeader()); err != nil {
Expand Down

0 comments on commit 65eb45a

Please sign in to comment.