From 2303e3a5b8dc93f25f814e860c25d64050563552 Mon Sep 17 00:00:00 2001 From: pingcap-github-bot Date: Mon, 25 May 2020 17:56:23 +0800 Subject: [PATCH] server, client: add down_peer and pending_peer in region response (#2429) (#2443) * server: add down_peers and pending_peers in get_region response Signed-off-by: disksing * update pd client Signed-off-by: disksing * minor update Signed-off-by: disksing * minor update Signed-off-by: disksing * fix unstable test TestGetRegionByID (#2435) Signed-off-by: nolouch Co-authored-by: disksing Co-authored-by: ShuNing --- client/client.go | 48 ++++++++++++++++++++++++-------- go.mod | 2 +- go.sum | 4 +-- server/api/admin_test.go | 4 +-- server/api/region.go | 2 +- server/cluster/cluster.go | 32 ++++----------------- server/cluster/cluster_test.go | 28 +++++++++---------- server/cluster/cluster_worker.go | 4 +-- server/grpc_service.go | 40 +++++++++++++++++--------- tests/client/client_test.go | 28 +++++++++++-------- 10 files changed, 107 insertions(+), 85 deletions(-) diff --git a/client/client.go b/client/client.go index cc3278c79b7..0ecce28eaaf 100644 --- a/client/client.go +++ b/client/client.go @@ -27,6 +27,14 @@ import ( "go.uber.org/zap" ) +// Region contains information of a region's meta and its peers. +type Region struct { + Meta *metapb.Region + Leader *metapb.Peer + DownPeers []*metapb.Peer + PendingPeers []*metapb.Peer +} + // Client is a PD (Placement Driver) client. // It should not be used after calling Close(). type Client interface { @@ -44,11 +52,11 @@ type Client interface { // taking care of region change. // Also it may return nil if PD finds no Region for the key temporarily, // client should retry later. - GetRegion(ctx context.Context, key []byte) (*metapb.Region, *metapb.Peer, error) + GetRegion(ctx context.Context, key []byte) (*Region, error) // GetPrevRegion gets the previous region and its leader Peer of the region where the key is located. - GetPrevRegion(ctx context.Context, key []byte) (*metapb.Region, *metapb.Peer, error) + GetPrevRegion(ctx context.Context, key []byte) (*Region, error) // GetRegionByID gets a region and its leader Peer from PD by id. - GetRegionByID(ctx context.Context, regionID uint64) (*metapb.Region, *metapb.Peer, error) + GetRegionByID(ctx context.Context, regionID uint64) (*Region, error) // ScanRegion gets a list of regions, starts from the region that contains key. // Limit limits the maximum number of regions returned. // If a region has no leader, corresponding leader will be placed by a peer @@ -423,7 +431,23 @@ func (c *client) GetTS(ctx context.Context) (physical int64, logical int64, err return resp.Wait() } -func (c *client) GetRegion(ctx context.Context, key []byte) (*metapb.Region, *metapb.Peer, error) { +func (c *client) parseRegionResponse(res *pdpb.GetRegionResponse) *Region { + if res.Region == nil { + return nil + } + + r := &Region{ + Meta: res.Region, + Leader: res.Leader, + PendingPeers: res.PendingPeers, + } + for _, s := range res.DownPeers { + r.DownPeers = append(r.DownPeers, s.Peer) + } + return r +} + +func (c *client) GetRegion(ctx context.Context, key []byte) (*Region, error) { if span := opentracing.SpanFromContext(ctx); span != nil { span = opentracing.StartSpan("pdclient.GetRegion", opentracing.ChildOf(span.Context())) defer span.Finish() @@ -441,12 +465,12 @@ func (c *client) GetRegion(ctx context.Context, key []byte) (*metapb.Region, *me if err != nil { cmdFailDurationGetRegion.Observe(time.Since(start).Seconds()) c.ScheduleCheckLeader() - return nil, nil, errors.WithStack(err) + return nil, errors.WithStack(err) } - return resp.GetRegion(), resp.GetLeader(), nil + return c.parseRegionResponse(resp), nil } -func (c *client) GetPrevRegion(ctx context.Context, key []byte) (*metapb.Region, *metapb.Peer, error) { +func (c *client) GetPrevRegion(ctx context.Context, key []byte) (*Region, error) { if span := opentracing.SpanFromContext(ctx); span != nil { span = opentracing.StartSpan("pdclient.GetPrevRegion", opentracing.ChildOf(span.Context())) defer span.Finish() @@ -464,12 +488,12 @@ func (c *client) GetPrevRegion(ctx context.Context, key []byte) (*metapb.Region, if err != nil { cmdFailDurationGetPrevRegion.Observe(time.Since(start).Seconds()) c.ScheduleCheckLeader() - return nil, nil, errors.WithStack(err) + return nil, errors.WithStack(err) } - return resp.GetRegion(), resp.GetLeader(), nil + return c.parseRegionResponse(resp), nil } -func (c *client) GetRegionByID(ctx context.Context, regionID uint64) (*metapb.Region, *metapb.Peer, error) { +func (c *client) GetRegionByID(ctx context.Context, regionID uint64) (*Region, error) { if span := opentracing.SpanFromContext(ctx); span != nil { span = opentracing.StartSpan("pdclient.GetRegionByID", opentracing.ChildOf(span.Context())) defer span.Finish() @@ -487,9 +511,9 @@ func (c *client) GetRegionByID(ctx context.Context, regionID uint64) (*metapb.Re if err != nil { cmdFailedDurationGetRegionByID.Observe(time.Since(start).Seconds()) c.ScheduleCheckLeader() - return nil, nil, errors.WithStack(err) + return nil, errors.WithStack(err) } - return resp.GetRegion(), resp.GetLeader(), nil + return c.parseRegionResponse(resp), nil } func (c *client) ScanRegions(ctx context.Context, key, endKey []byte, limit int) ([]*metapb.Region, []*metapb.Peer, error) { diff --git a/go.mod b/go.mod index fa6d278b490..3f4118bd5ce 100644 --- a/go.mod +++ b/go.mod @@ -42,7 +42,7 @@ require ( github.com/pingcap/check v0.0.0-20191216031241-8a5a85928f12 github.com/pingcap/errcode v0.0.0-20180921232412-a1a7271709d9 github.com/pingcap/failpoint v0.0.0-20191029060244-12f4ac2fd11d - github.com/pingcap/kvproto v0.0.0-20200420075417-e0c6e8842f22 + github.com/pingcap/kvproto v0.0.0-20200518112156-d4aeb467de29 github.com/pingcap/log v0.0.0-20200117041106-d28c14d3b1cd github.com/pingcap/sysutil v0.0.0-20200408114249-ed3bd6f7fdb1 github.com/pkg/errors v0.9.1 diff --git a/go.sum b/go.sum index ddf9dcb3ad0..377c8f155c6 100644 --- a/go.sum +++ b/go.sum @@ -312,8 +312,8 @@ github.com/pingcap/failpoint v0.0.0-20191029060244-12f4ac2fd11d h1:F8vp38kTAckN+ github.com/pingcap/failpoint v0.0.0-20191029060244-12f4ac2fd11d/go.mod h1:DNS3Qg7bEDhU6EXNHF+XSv/PGznQaMJ5FWvctpm6pQI= github.com/pingcap/kvproto v0.0.0-20191211054548-3c6b38ea5107/go.mod h1:WWLmULLO7l8IOcQG+t+ItJ3fEcrL5FxF0Wu+HrMy26w= github.com/pingcap/kvproto v0.0.0-20200411081810-b85805c9476c/go.mod h1:IOdRDPLyda8GX2hE/jO7gqaCV/PNFh8BZQCQZXfIOqI= -github.com/pingcap/kvproto v0.0.0-20200420075417-e0c6e8842f22 h1:D5EBGKd6o4A0PV0sUaUduPSCShiNi0OwFJmf+xRzpuI= -github.com/pingcap/kvproto v0.0.0-20200420075417-e0c6e8842f22/go.mod h1:IOdRDPLyda8GX2hE/jO7gqaCV/PNFh8BZQCQZXfIOqI= +github.com/pingcap/kvproto v0.0.0-20200518112156-d4aeb467de29 h1:NpW1OuYrIl+IQrSsVbtyHpHpazmSCHy+ysrOixY0xY4= +github.com/pingcap/kvproto v0.0.0-20200518112156-d4aeb467de29/go.mod h1:IOdRDPLyda8GX2hE/jO7gqaCV/PNFh8BZQCQZXfIOqI= github.com/pingcap/log v0.0.0-20191012051959-b742a5d432e9 h1:AJD9pZYm72vMgPcQDww9rkZ1DnWfl0pXV3BOWlkYIjA= github.com/pingcap/log v0.0.0-20191012051959-b742a5d432e9/go.mod h1:4rbK1p9ILyIfb6hU7OG2CiWSqMXnp3JMbiaVJ6mvoY8= github.com/pingcap/log v0.0.0-20200117041106-d28c14d3b1cd h1:CV3VsP3Z02MVtdpTMfEgRJ4T9NGgGTxdHpJerent7rM= diff --git a/server/api/admin_test.go b/server/api/admin_test.go index 5718aabf203..6e24e8b4e7d 100644 --- a/server/api/admin_test.go +++ b/server/api/admin_test.go @@ -52,7 +52,7 @@ func (s *testAdminSuite) TestDropRegion(c *C) { cluster := s.svr.GetRaftCluster() // Update region's epoch to (100, 100). - region := cluster.GetRegionInfoByKey([]byte("foo")).Clone( + region := cluster.GetRegionByKey([]byte("foo")).Clone( core.SetRegionConfVer(100), core.SetRegionVersion(100), ) @@ -78,7 +78,7 @@ func (s *testAdminSuite) TestDropRegion(c *C) { err = cluster.HandleRegionHeartbeat(region) c.Assert(err, IsNil) - region = cluster.GetRegionInfoByKey([]byte("foo")) + region = cluster.GetRegionByKey([]byte("foo")) c.Assert(region.GetRegionEpoch().ConfVer, Equals, uint64(50)) c.Assert(region.GetRegionEpoch().Version, Equals, uint64(50)) } diff --git a/server/api/region.go b/server/api/region.go index cf675d7ecb0..7d2b632dcbb 100644 --- a/server/api/region.go +++ b/server/api/region.go @@ -151,7 +151,7 @@ func (h *regionHandler) GetRegionByKey(w http.ResponseWriter, r *http.Request) { h.rd.JSON(w, http.StatusBadRequest, err.Error()) return } - regionInfo := rc.GetRegionInfoByKey([]byte(key)) + regionInfo := rc.GetRegionByKey([]byte(key)) h.rd.JSON(w, http.StatusOK, NewRegionInfo(regionInfo)) } diff --git a/server/cluster/cluster.go b/server/cluster/cluster.go index d96ba369a69..8df117597ed 100644 --- a/server/cluster/cluster.go +++ b/server/cluster/cluster.go @@ -650,27 +650,14 @@ func (c *RaftCluster) putMetaLocked(meta *metapb.Cluster) error { return nil } -// GetRegionByKey gets region and leader peer by region key from cluster. -func (c *RaftCluster) GetRegionByKey(regionKey []byte) (*metapb.Region, *metapb.Peer) { - region := c.core.SearchRegion(regionKey) - if region == nil { - return nil, nil - } - return region.GetMeta(), region.GetLeader() +// GetRegionByKey gets regionInfo by region key from cluster. +func (c *RaftCluster) GetRegionByKey(regionKey []byte) *core.RegionInfo { + return c.core.SearchRegion(regionKey) } // GetPrevRegionByKey gets previous region and leader peer by the region key from cluster. -func (c *RaftCluster) GetPrevRegionByKey(regionKey []byte) (*metapb.Region, *metapb.Peer) { - region := c.core.SearchPrevRegion(regionKey) - if region == nil { - return nil, nil - } - return region.GetMeta(), region.GetLeader() -} - -// GetRegionInfoByKey gets regionInfo by region key from cluster. -func (c *RaftCluster) GetRegionInfoByKey(regionKey []byte) *core.RegionInfo { - return c.core.SearchRegion(regionKey) +func (c *RaftCluster) GetPrevRegionByKey(regionKey []byte) *core.RegionInfo { + return c.core.SearchPrevRegion(regionKey) } // ScanRegions scans region with start key, until the region contains endKey, or @@ -679,15 +666,6 @@ func (c *RaftCluster) ScanRegions(startKey, endKey []byte, limit int) []*core.Re return c.core.ScanRange(startKey, endKey, limit) } -// GetRegionByID gets region and leader peer by regionID from cluster. -func (c *RaftCluster) GetRegionByID(regionID uint64) (*metapb.Region, *metapb.Peer) { - region := c.GetRegion(regionID) - if region == nil { - return nil, nil - } - return region.GetMeta(), region.GetLeader() -} - // GetRegion searches for a region by ID. func (c *RaftCluster) GetRegion(regionID uint64) *core.RegionInfo { return c.core.GetRegion(regionID) diff --git a/server/cluster/cluster_test.go b/server/cluster/cluster_test.go index aa15a12b852..a9ae9afb21c 100644 --- a/server/cluster/cluster_test.go +++ b/server/cluster/cluster_test.go @@ -341,7 +341,7 @@ func (s *testClusterInfoSuite) TestConcurrentRegionHeartbeat(c *C) { c.Assert(failpoint.Disable("github.com/pingcap/pd/server/cluster/concurrentRegionHeartbeat"), IsNil) c.Assert(cluster.processRegionHeartbeat(target), IsNil) wg.Wait() - checkRegion(c, cluster.GetRegionInfoByKey([]byte{}), target) + checkRegion(c, cluster.GetRegionByKey([]byte{}), target) } func heartbeatRegions(c *C, cluster *RaftCluster, regions []*core.RegionInfo) { @@ -350,23 +350,23 @@ func heartbeatRegions(c *C, cluster *RaftCluster, regions []*core.RegionInfo) { c.Assert(cluster.processRegionHeartbeat(r), IsNil) checkRegion(c, cluster.GetRegion(r.GetID()), r) - checkRegion(c, cluster.GetRegionInfoByKey(r.GetStartKey()), r) + checkRegion(c, cluster.GetRegionByKey(r.GetStartKey()), r) if len(r.GetEndKey()) > 0 { end := r.GetEndKey()[0] - checkRegion(c, cluster.GetRegionInfoByKey([]byte{end - 1}), r) + checkRegion(c, cluster.GetRegionByKey([]byte{end - 1}), r) } } // Check all regions after handling all heartbeats. for _, r := range regions { checkRegion(c, cluster.GetRegion(r.GetID()), r) - checkRegion(c, cluster.GetRegionInfoByKey(r.GetStartKey()), r) + checkRegion(c, cluster.GetRegionByKey(r.GetStartKey()), r) if len(r.GetEndKey()) > 0 { end := r.GetEndKey()[0] - checkRegion(c, cluster.GetRegionInfoByKey([]byte{end - 1}), r) - result := cluster.GetRegionInfoByKey([]byte{end + 1}) + checkRegion(c, cluster.GetRegionByKey([]byte{end - 1}), r) + result := cluster.GetRegionByKey([]byte{end + 1}) c.Assert(result.GetID(), Not(Equals), r.GetID()) } } @@ -380,7 +380,7 @@ func (s *testClusterInfoSuite) TestHeartbeatSplit(c *C) { // 1: [nil, nil) region1 := core.NewRegionInfo(&metapb.Region{Id: 1, RegionEpoch: &metapb.RegionEpoch{Version: 1, ConfVer: 1}}, nil) c.Assert(cluster.processRegionHeartbeat(region1), IsNil) - checkRegion(c, cluster.GetRegionInfoByKey([]byte("foo")), region1) + checkRegion(c, cluster.GetRegionByKey([]byte("foo")), region1) // split 1 to 2: [nil, m) 1: [m, nil), sync 2 first. region1 = region1.Clone( @@ -389,12 +389,12 @@ func (s *testClusterInfoSuite) TestHeartbeatSplit(c *C) { ) region2 := core.NewRegionInfo(&metapb.Region{Id: 2, EndKey: []byte("m"), RegionEpoch: &metapb.RegionEpoch{Version: 1, ConfVer: 1}}, nil) c.Assert(cluster.processRegionHeartbeat(region2), IsNil) - checkRegion(c, cluster.GetRegionInfoByKey([]byte("a")), region2) + checkRegion(c, cluster.GetRegionByKey([]byte("a")), region2) // [m, nil) is missing before r1's heartbeat. - c.Assert(cluster.GetRegionInfoByKey([]byte("z")), IsNil) + c.Assert(cluster.GetRegionByKey([]byte("z")), IsNil) c.Assert(cluster.processRegionHeartbeat(region1), IsNil) - checkRegion(c, cluster.GetRegionInfoByKey([]byte("z")), region1) + checkRegion(c, cluster.GetRegionByKey([]byte("z")), region1) // split 1 to 3: [m, q) 1: [q, nil), sync 1 first. region1 = region1.Clone( @@ -403,12 +403,12 @@ func (s *testClusterInfoSuite) TestHeartbeatSplit(c *C) { ) region3 := core.NewRegionInfo(&metapb.Region{Id: 3, StartKey: []byte("m"), EndKey: []byte("q"), RegionEpoch: &metapb.RegionEpoch{Version: 1, ConfVer: 1}}, nil) c.Assert(cluster.processRegionHeartbeat(region1), IsNil) - checkRegion(c, cluster.GetRegionInfoByKey([]byte("z")), region1) - checkRegion(c, cluster.GetRegionInfoByKey([]byte("a")), region2) + checkRegion(c, cluster.GetRegionByKey([]byte("z")), region1) + checkRegion(c, cluster.GetRegionByKey([]byte("a")), region2) // [m, q) is missing before r3's heartbeat. - c.Assert(cluster.GetRegionInfoByKey([]byte("n")), IsNil) + c.Assert(cluster.GetRegionByKey([]byte("n")), IsNil) c.Assert(cluster.processRegionHeartbeat(region3), IsNil) - checkRegion(c, cluster.GetRegionInfoByKey([]byte("n")), region3) + checkRegion(c, cluster.GetRegionByKey([]byte("n")), region3) } func (s *testClusterInfoSuite) TestRegionSplitAndMerge(c *C) { diff --git a/server/cluster/cluster_worker.go b/server/cluster/cluster_worker.go index 89250cdd3fd..464c42453bf 100644 --- a/server/cluster/cluster_worker.go +++ b/server/cluster/cluster_worker.go @@ -84,13 +84,13 @@ func (c *RaftCluster) HandleAskSplit(request *pdpb.AskSplitRequest) (*pdpb.AskSp // ValidRequestRegion is used to decide if the region is valid. func (c *RaftCluster) ValidRequestRegion(reqRegion *metapb.Region) error { startKey := reqRegion.GetStartKey() - region, _ := c.GetRegionByKey(startKey) + region := c.GetRegionByKey(startKey) if region == nil { return errors.Errorf("region not found, request region: %v", core.RegionToHexMeta(reqRegion)) } // If the request epoch is less than current region epoch, then returns an error. reqRegionEpoch := reqRegion.GetRegionEpoch() - regionEpoch := region.GetRegionEpoch() + regionEpoch := region.GetMeta().GetRegionEpoch() if reqRegionEpoch.GetVersion() < regionEpoch.GetVersion() || reqRegionEpoch.GetConfVer() < regionEpoch.GetConfVer() { return errors.Errorf("invalid region epoch, request: %v, currenrt: %v", reqRegionEpoch, regionEpoch) diff --git a/server/grpc_service.go b/server/grpc_service.go index 44fd724e7a9..4256493e3da 100644 --- a/server/grpc_service.go +++ b/server/grpc_service.go @@ -416,11 +416,16 @@ func (s *Server) GetRegion(ctx context.Context, request *pdpb.GetRegionRequest) if rc == nil { return &pdpb.GetRegionResponse{Header: s.notBootstrappedHeader()}, nil } - region, leader := rc.GetRegionByKey(request.GetRegionKey()) + region := rc.GetRegionByKey(request.GetRegionKey()) + if region == nil { + return &pdpb.GetRegionResponse{Header: s.header()}, nil + } return &pdpb.GetRegionResponse{ - Header: s.header(), - Region: region, - Leader: leader, + Header: s.header(), + Region: region.GetMeta(), + Leader: region.GetLeader(), + DownPeers: region.GetDownPeers(), + PendingPeers: region.GetPendingPeers(), }, nil } @@ -435,11 +440,16 @@ func (s *Server) GetPrevRegion(ctx context.Context, request *pdpb.GetRegionReque return &pdpb.GetRegionResponse{Header: s.notBootstrappedHeader()}, nil } - region, leader := rc.GetPrevRegionByKey(request.GetRegionKey()) + region := rc.GetPrevRegionByKey(request.GetRegionKey()) + if region == nil { + return &pdpb.GetRegionResponse{Header: s.header()}, nil + } return &pdpb.GetRegionResponse{ - Header: s.header(), - Region: region, - Leader: leader, + Header: s.header(), + Region: region.GetMeta(), + Leader: region.GetLeader(), + DownPeers: region.GetDownPeers(), + PendingPeers: region.GetPendingPeers(), }, nil } @@ -453,12 +463,16 @@ func (s *Server) GetRegionByID(ctx context.Context, request *pdpb.GetRegionByIDR if rc == nil { return &pdpb.GetRegionResponse{Header: s.notBootstrappedHeader()}, nil } - id := request.GetRegionId() - region, leader := rc.GetRegionByID(id) + region := rc.GetRegion(request.GetRegionId()) + if region == nil { + return &pdpb.GetRegionResponse{Header: s.header()}, nil + } return &pdpb.GetRegionResponse{ - Header: s.header(), - Region: region, - Leader: leader, + Header: s.header(), + Region: region.GetMeta(), + Leader: region.GetLeader(), + DownPeers: region.GetDownPeers(), + PendingPeers: region.GetPendingPeers(), }, nil } diff --git a/tests/client/client_test.go b/tests/client/client_test.go index 258a0adb4e8..70e8c75e644 100644 --- a/tests/client/client_test.go +++ b/tests/client/client_test.go @@ -370,10 +370,13 @@ func (s *testClientSuite) TestGetRegion(c *C) { c.Assert(err, IsNil) testutil.WaitUntil(c, func(c *C) bool { - r, leader, err := s.client.GetRegion(context.Background(), []byte("a")) + r, err := s.client.GetRegion(context.Background(), []byte("a")) c.Assert(err, IsNil) - return c.Check(r, DeepEquals, region) && - c.Check(leader, DeepEquals, peers[0]) + if r == nil { + return false + } + return c.Check(r.Meta, DeepEquals, region) && + c.Check(r.Leader, DeepEquals, peers[0]) }) c.Succeed() } @@ -402,16 +405,16 @@ func (s *testClientSuite) TestGetPrevRegion(c *C) { err := s.regionHeartbeat.Send(req) c.Assert(err, IsNil) } + time.Sleep(500 * time.Millisecond) for i := 0; i < 20; i++ { testutil.WaitUntil(c, func(c *C) bool { - r, leader, err := s.client.GetPrevRegion(context.Background(), []byte{byte(i)}) + r, err := s.client.GetPrevRegion(context.Background(), []byte{byte(i)}) c.Assert(err, IsNil) if i > 0 && i < regionLen { - return c.Check(leader, DeepEquals, peers[0]) && - c.Check(r, DeepEquals, regions[i-1]) + return c.Check(r.Leader, DeepEquals, peers[0]) && + c.Check(r.Meta, DeepEquals, regions[i-1]) } - return c.Check(leader, IsNil) && - c.Check(r, IsNil) + return c.Check(r, IsNil) }) } c.Succeed() @@ -496,10 +499,13 @@ func (s *testClientSuite) TestGetRegionByID(c *C) { c.Assert(err, IsNil) testutil.WaitUntil(c, func(c *C) bool { - r, leader, err := s.client.GetRegionByID(context.Background(), regionID) + r, err := s.client.GetRegionByID(context.Background(), regionID) c.Assert(err, IsNil) - return c.Check(r, DeepEquals, region) && - c.Check(leader, DeepEquals, peers[0]) + if r == nil { + return false + } + return c.Check(r.Meta, DeepEquals, region) && + c.Check(r.Leader, DeepEquals, peers[0]) }) c.Succeed() }