From 65eb45afb61862cf278124ed50dba12bbdda50a0 Mon Sep 17 00:00:00 2001 From: disksing Date: Fri, 17 May 2019 12:02:14 +0800 Subject: [PATCH] client, server: add ScanRegions gRPC protocol support (#1535) * client, server: support ScanRegions gRPC protocol Signed-off-by: disksing --- client/client.go | 27 ++++++++++++++++++++ client/client_test.go | 58 ++++++++++++++++++++++++++++++++++++++++++ go.mod | 2 +- go.sum | 4 +-- server/grpc_service.go | 23 +++++++++++++++++ 5 files changed, 111 insertions(+), 3 deletions(-) diff --git a/client/client.go b/client/client.go index dcc6466cc72..5e00aea837f 100644 --- a/client/client.go +++ b/client/client.go @@ -52,6 +52,11 @@ type Client interface { GetPrevRegion(ctx context.Context, key []byte) (*metapb.Region, *metapb.Peer, error) // GetRegionByID gets a region and its leader Peer from PD by id. GetRegionByID(ctx context.Context, regionID uint64) (*metapb.Region, *metapb.Peer, error) + // ScanRegion gets a list of regions, starts from the region that contains key. + // Limit limits the maximum number of regions returned. + // If a region has no leader, corresponding leader will be placed by a peer + // with empty value (PeerID is 0). + ScanRegions(ctx context.Context, key []byte, limit int) ([]*metapb.Region, []*metapb.Peer, error) // GetStore gets a store from PD by store id. // The store may expire later. Caller is responsible for caching and taking care // of store change. @@ -681,6 +686,28 @@ func (c *client) GetRegionByID(ctx context.Context, regionID uint64) (*metapb.Re return resp.GetRegion(), resp.GetLeader(), nil } +func (c *client) ScanRegions(ctx context.Context, key []byte, limit int) ([]*metapb.Region, []*metapb.Peer, error) { + if span := opentracing.SpanFromContext(ctx); span != nil { + span = opentracing.StartSpan("pdclient.ScanRegions", opentracing.ChildOf(span.Context())) + defer span.Finish() + } + start := time.Now() + defer cmdDuration.WithLabelValues("scan_regions").Observe(time.Since(start).Seconds()) + ctx, cancel := context.WithTimeout(ctx, pdTimeout) + resp, err := c.leaderClient().ScanRegions(ctx, &pdpb.ScanRegionsRequest{ + Header: c.requestHeader(), + StartKey: key, + Limit: int32(limit), + }) + cancel() + if err != nil { + cmdFailedDuration.WithLabelValues("scan_regions").Observe(time.Since(start).Seconds()) + c.ScheduleCheckLeader() + return nil, nil, errors.WithStack(err) + } + return resp.GetRegions(), resp.GetLeaders(), nil +} + func (c *client) GetStore(ctx context.Context, storeID uint64) (*metapb.Store, error) { if span := opentracing.SpanFromContext(ctx); span != nil { span = opentracing.StartSpan("pdclient.GetStore", opentracing.ChildOf(span.Context())) diff --git a/client/client_test.go b/client/client_test.go index 03fa97b036b..2ab45e970dc 100644 --- a/client/client_test.go +++ b/client/client_test.go @@ -260,6 +260,64 @@ func (s *testClientSuite) TestGetPrevRegion(c *C) { c.Succeed() } +func (s *testClientSuite) TestScanRegions(c *C) { + regionLen := 10 + regions := make([]*metapb.Region, 0, regionLen) + for i := 0; i < regionLen; i++ { + regionID := regionIDAllocator.alloc() + r := &metapb.Region{ + Id: regionID, + RegionEpoch: &metapb.RegionEpoch{ + ConfVer: 1, + Version: 1, + }, + StartKey: []byte{byte(i)}, + EndKey: []byte{byte(i + 1)}, + Peers: peers, + } + regions = append(regions, r) + req := &pdpb.RegionHeartbeatRequest{ + Header: newHeader(s.srv), + Region: r, + Leader: peers[0], + } + err := s.regionHeartbeat.Send(req) + c.Assert(err, IsNil) + } + + // Wait for region heartbeats. + testutil.WaitUntil(c, func(c *C) bool { + scanRegions, _, err := s.client.ScanRegions(context.Background(), []byte{0}, 10) + return err == nil && len(scanRegions) == 10 + }) + + // Set leader of region3 to nil. + region3 := core.NewRegionInfo(regions[3], nil) + s.srv.GetRaftCluster().HandleRegionHeartbeat(region3) + + check := func(start []byte, limit int, expect []*metapb.Region) { + scanRegions, leaders, err := s.client.ScanRegions(context.Background(), start, limit) + c.Assert(err, IsNil) + c.Assert(scanRegions, HasLen, len(expect)) + c.Assert(leaders, HasLen, len(expect)) + c.Log("scanRegions", scanRegions) + c.Log("expect", expect) + c.Log("scanLeaders", leaders) + for i := range expect { + c.Assert(scanRegions[i], DeepEquals, expect[i]) + if scanRegions[i].GetId() == region3.GetID() { + c.Assert(leaders[i], DeepEquals, &metapb.Peer{}) + } else { + c.Assert(leaders[i], DeepEquals, expect[i].Peers[0]) + } + } + } + + check([]byte{0}, 10, regions) + check([]byte{1}, 5, regions[1:6]) + check([]byte{100}, 1, nil) +} + func (s *testClientSuite) TestGetRegionByID(c *C) { regionID := regionIDAllocator.alloc() region := &metapb.Region{ diff --git a/go.mod b/go.mod index 8e643602226..3d81846c830 100644 --- a/go.mod +++ b/go.mod @@ -30,7 +30,7 @@ require ( github.com/pingcap/errcode v0.0.0-20180921232412-a1a7271709d9 github.com/pingcap/errors v0.10.1 // indirect github.com/pingcap/failpoint v0.0.0-20190512135322-30cc7431d99c - github.com/pingcap/kvproto v0.0.0-20190327032727-3d8cb3a30d5d + github.com/pingcap/kvproto v0.0.0-20190516013202-4cf58ad90b6c github.com/pingcap/log v0.0.0-20190214045112-b37da76f67a7 github.com/pkg/errors v0.8.1 github.com/prometheus/client_golang v0.8.0 diff --git a/go.sum b/go.sum index 59c8c800ee7..f5666357e1c 100644 --- a/go.sum +++ b/go.sum @@ -107,8 +107,8 @@ github.com/pingcap/errors v0.10.1 h1:fGVuPMtwNcxbzQ3aoRyyi6kxvXKMkEsceP81f3b8wsk github.com/pingcap/errors v0.10.1/go.mod h1:Oi8TUi2kEtXXLMJk9l1cGmz20kV3TaQ0usTwv5KuLY8= github.com/pingcap/failpoint v0.0.0-20190512135322-30cc7431d99c h1:hvQd3aOLKLF7xvRV6DzvPkKY4QXzfVbjU1BhW0d9yL8= github.com/pingcap/failpoint v0.0.0-20190512135322-30cc7431d99c/go.mod h1:DNS3Qg7bEDhU6EXNHF+XSv/PGznQaMJ5FWvctpm6pQI= -github.com/pingcap/kvproto v0.0.0-20190327032727-3d8cb3a30d5d h1:LJYJl+cBhkkTWD79n+n9Bp4agQ85SdF9YKY4zEtL9Kw= -github.com/pingcap/kvproto v0.0.0-20190327032727-3d8cb3a30d5d/go.mod h1:QMdbTAXCHzzygQzqcG9uVUgU2fKeSN1GmfMiykdSzzY= +github.com/pingcap/kvproto v0.0.0-20190516013202-4cf58ad90b6c h1:pY/MQQ5UajEHfSnQS8rFAM9gw9bBKzqBl414cdfhpRQ= +github.com/pingcap/kvproto v0.0.0-20190516013202-4cf58ad90b6c/go.mod h1:QMdbTAXCHzzygQzqcG9uVUgU2fKeSN1GmfMiykdSzzY= github.com/pingcap/log v0.0.0-20190214045112-b37da76f67a7 h1:kOHAMalwF69bJrtWrOdVaCSvZjLucrJhP4NQKIu6uM4= github.com/pingcap/log v0.0.0-20190214045112-b37da76f67a7/go.mod h1:xsfkWVaFVV5B8e1K9seWfyJWFrIhbtUTAD8NV1Pq3+w= github.com/pkg/errors v0.8.0/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0= diff --git a/server/grpc_service.go b/server/grpc_service.go index ba78d697e2f..a5b249d4ef9 100644 --- a/server/grpc_service.go +++ b/server/grpc_service.go @@ -447,6 +447,29 @@ func (s *Server) GetRegionByID(ctx context.Context, request *pdpb.GetRegionByIDR }, nil } +// ScanRegions implements gRPC PDServer. +func (s *Server) ScanRegions(ctx context.Context, request *pdpb.ScanRegionsRequest) (*pdpb.ScanRegionsResponse, error) { + if err := s.validateRequest(request.GetHeader()); err != nil { + return nil, err + } + + cluster := s.GetRaftCluster() + if cluster == nil { + return &pdpb.ScanRegionsResponse{Header: s.notBootstrappedHeader()}, nil + } + regions := cluster.ScanRegionsByKey(request.GetStartKey(), int(request.GetLimit())) + resp := &pdpb.ScanRegionsResponse{Header: s.header()} + for _, r := range regions { + leader := r.GetLeader() + if leader == nil { + leader = &metapb.Peer{} + } + resp.Regions = append(resp.Regions, r.GetMeta()) + resp.Leaders = append(resp.Leaders, leader) + } + return resp, nil +} + // AskSplit implements gRPC PDServer. func (s *Server) AskSplit(ctx context.Context, request *pdpb.AskSplitRequest) (*pdpb.AskSplitResponse, error) { if err := s.validateRequest(request.GetHeader()); err != nil {