Skip to content

Commit

Permalink
server, client: add down_peer and pending_peer in region response (#2429
Browse files Browse the repository at this point in the history
) (#2443)

* server: add down_peers and pending_peers in get_region response

Signed-off-by: disksing <i@disksing.com>

* update pd client

Signed-off-by: disksing <i@disksing.com>

* minor update

Signed-off-by: disksing <i@disksing.com>

* minor update

Signed-off-by: disksing <i@disksing.com>

* fix unstable test TestGetRegionByID (#2435)

Signed-off-by: nolouch <nolouch@gmail.com>

Co-authored-by: disksing <i@disksing.com>
Co-authored-by: ShuNing <nolouch@gmail.com>
  • Loading branch information
3 people authored May 25, 2020
1 parent 9a52e06 commit 2303e3a
Show file tree
Hide file tree
Showing 10 changed files with 107 additions and 85 deletions.
48 changes: 36 additions & 12 deletions client/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,14 @@ import (
"go.uber.org/zap"
)

// Region contains information of a region's meta and its peers.
type Region struct {
Meta *metapb.Region
Leader *metapb.Peer
DownPeers []*metapb.Peer
PendingPeers []*metapb.Peer
}

// Client is a PD (Placement Driver) client.
// It should not be used after calling Close().
type Client interface {
Expand All @@ -44,11 +52,11 @@ type Client interface {
// taking care of region change.
// Also it may return nil if PD finds no Region for the key temporarily,
// client should retry later.
GetRegion(ctx context.Context, key []byte) (*metapb.Region, *metapb.Peer, error)
GetRegion(ctx context.Context, key []byte) (*Region, error)
// GetPrevRegion gets the previous region and its leader Peer of the region where the key is located.
GetPrevRegion(ctx context.Context, key []byte) (*metapb.Region, *metapb.Peer, error)
GetPrevRegion(ctx context.Context, key []byte) (*Region, error)
// GetRegionByID gets a region and its leader Peer from PD by id.
GetRegionByID(ctx context.Context, regionID uint64) (*metapb.Region, *metapb.Peer, error)
GetRegionByID(ctx context.Context, regionID uint64) (*Region, error)
// ScanRegion gets a list of regions, starts from the region that contains key.
// Limit limits the maximum number of regions returned.
// If a region has no leader, corresponding leader will be placed by a peer
Expand Down Expand Up @@ -423,7 +431,23 @@ func (c *client) GetTS(ctx context.Context) (physical int64, logical int64, err
return resp.Wait()
}

func (c *client) GetRegion(ctx context.Context, key []byte) (*metapb.Region, *metapb.Peer, error) {
func (c *client) parseRegionResponse(res *pdpb.GetRegionResponse) *Region {
if res.Region == nil {
return nil
}

r := &Region{
Meta: res.Region,
Leader: res.Leader,
PendingPeers: res.PendingPeers,
}
for _, s := range res.DownPeers {
r.DownPeers = append(r.DownPeers, s.Peer)
}
return r
}

func (c *client) GetRegion(ctx context.Context, key []byte) (*Region, error) {
if span := opentracing.SpanFromContext(ctx); span != nil {
span = opentracing.StartSpan("pdclient.GetRegion", opentracing.ChildOf(span.Context()))
defer span.Finish()
Expand All @@ -441,12 +465,12 @@ func (c *client) GetRegion(ctx context.Context, key []byte) (*metapb.Region, *me
if err != nil {
cmdFailDurationGetRegion.Observe(time.Since(start).Seconds())
c.ScheduleCheckLeader()
return nil, nil, errors.WithStack(err)
return nil, errors.WithStack(err)
}
return resp.GetRegion(), resp.GetLeader(), nil
return c.parseRegionResponse(resp), nil
}

func (c *client) GetPrevRegion(ctx context.Context, key []byte) (*metapb.Region, *metapb.Peer, error) {
func (c *client) GetPrevRegion(ctx context.Context, key []byte) (*Region, error) {
if span := opentracing.SpanFromContext(ctx); span != nil {
span = opentracing.StartSpan("pdclient.GetPrevRegion", opentracing.ChildOf(span.Context()))
defer span.Finish()
Expand All @@ -464,12 +488,12 @@ func (c *client) GetPrevRegion(ctx context.Context, key []byte) (*metapb.Region,
if err != nil {
cmdFailDurationGetPrevRegion.Observe(time.Since(start).Seconds())
c.ScheduleCheckLeader()
return nil, nil, errors.WithStack(err)
return nil, errors.WithStack(err)
}
return resp.GetRegion(), resp.GetLeader(), nil
return c.parseRegionResponse(resp), nil
}

func (c *client) GetRegionByID(ctx context.Context, regionID uint64) (*metapb.Region, *metapb.Peer, error) {
func (c *client) GetRegionByID(ctx context.Context, regionID uint64) (*Region, error) {
if span := opentracing.SpanFromContext(ctx); span != nil {
span = opentracing.StartSpan("pdclient.GetRegionByID", opentracing.ChildOf(span.Context()))
defer span.Finish()
Expand All @@ -487,9 +511,9 @@ func (c *client) GetRegionByID(ctx context.Context, regionID uint64) (*metapb.Re
if err != nil {
cmdFailedDurationGetRegionByID.Observe(time.Since(start).Seconds())
c.ScheduleCheckLeader()
return nil, nil, errors.WithStack(err)
return nil, errors.WithStack(err)
}
return resp.GetRegion(), resp.GetLeader(), nil
return c.parseRegionResponse(resp), nil
}

func (c *client) ScanRegions(ctx context.Context, key, endKey []byte, limit int) ([]*metapb.Region, []*metapb.Peer, error) {
Expand Down
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ require (
github.com/pingcap/check v0.0.0-20191216031241-8a5a85928f12
github.com/pingcap/errcode v0.0.0-20180921232412-a1a7271709d9
github.com/pingcap/failpoint v0.0.0-20191029060244-12f4ac2fd11d
github.com/pingcap/kvproto v0.0.0-20200420075417-e0c6e8842f22
github.com/pingcap/kvproto v0.0.0-20200518112156-d4aeb467de29
github.com/pingcap/log v0.0.0-20200117041106-d28c14d3b1cd
github.com/pingcap/sysutil v0.0.0-20200408114249-ed3bd6f7fdb1
github.com/pkg/errors v0.9.1
Expand Down
4 changes: 2 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -312,8 +312,8 @@ github.com/pingcap/failpoint v0.0.0-20191029060244-12f4ac2fd11d h1:F8vp38kTAckN+
github.com/pingcap/failpoint v0.0.0-20191029060244-12f4ac2fd11d/go.mod h1:DNS3Qg7bEDhU6EXNHF+XSv/PGznQaMJ5FWvctpm6pQI=
github.com/pingcap/kvproto v0.0.0-20191211054548-3c6b38ea5107/go.mod h1:WWLmULLO7l8IOcQG+t+ItJ3fEcrL5FxF0Wu+HrMy26w=
github.com/pingcap/kvproto v0.0.0-20200411081810-b85805c9476c/go.mod h1:IOdRDPLyda8GX2hE/jO7gqaCV/PNFh8BZQCQZXfIOqI=
github.com/pingcap/kvproto v0.0.0-20200420075417-e0c6e8842f22 h1:D5EBGKd6o4A0PV0sUaUduPSCShiNi0OwFJmf+xRzpuI=
github.com/pingcap/kvproto v0.0.0-20200420075417-e0c6e8842f22/go.mod h1:IOdRDPLyda8GX2hE/jO7gqaCV/PNFh8BZQCQZXfIOqI=
github.com/pingcap/kvproto v0.0.0-20200518112156-d4aeb467de29 h1:NpW1OuYrIl+IQrSsVbtyHpHpazmSCHy+ysrOixY0xY4=
github.com/pingcap/kvproto v0.0.0-20200518112156-d4aeb467de29/go.mod h1:IOdRDPLyda8GX2hE/jO7gqaCV/PNFh8BZQCQZXfIOqI=
github.com/pingcap/log v0.0.0-20191012051959-b742a5d432e9 h1:AJD9pZYm72vMgPcQDww9rkZ1DnWfl0pXV3BOWlkYIjA=
github.com/pingcap/log v0.0.0-20191012051959-b742a5d432e9/go.mod h1:4rbK1p9ILyIfb6hU7OG2CiWSqMXnp3JMbiaVJ6mvoY8=
github.com/pingcap/log v0.0.0-20200117041106-d28c14d3b1cd h1:CV3VsP3Z02MVtdpTMfEgRJ4T9NGgGTxdHpJerent7rM=
Expand Down
4 changes: 2 additions & 2 deletions server/api/admin_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ func (s *testAdminSuite) TestDropRegion(c *C) {
cluster := s.svr.GetRaftCluster()

// Update region's epoch to (100, 100).
region := cluster.GetRegionInfoByKey([]byte("foo")).Clone(
region := cluster.GetRegionByKey([]byte("foo")).Clone(
core.SetRegionConfVer(100),
core.SetRegionVersion(100),
)
Expand All @@ -78,7 +78,7 @@ func (s *testAdminSuite) TestDropRegion(c *C) {
err = cluster.HandleRegionHeartbeat(region)
c.Assert(err, IsNil)

region = cluster.GetRegionInfoByKey([]byte("foo"))
region = cluster.GetRegionByKey([]byte("foo"))
c.Assert(region.GetRegionEpoch().ConfVer, Equals, uint64(50))
c.Assert(region.GetRegionEpoch().Version, Equals, uint64(50))
}
Expand Down
2 changes: 1 addition & 1 deletion server/api/region.go
Original file line number Diff line number Diff line change
Expand Up @@ -151,7 +151,7 @@ func (h *regionHandler) GetRegionByKey(w http.ResponseWriter, r *http.Request) {
h.rd.JSON(w, http.StatusBadRequest, err.Error())
return
}
regionInfo := rc.GetRegionInfoByKey([]byte(key))
regionInfo := rc.GetRegionByKey([]byte(key))
h.rd.JSON(w, http.StatusOK, NewRegionInfo(regionInfo))
}

Expand Down
32 changes: 5 additions & 27 deletions server/cluster/cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -650,27 +650,14 @@ func (c *RaftCluster) putMetaLocked(meta *metapb.Cluster) error {
return nil
}

// GetRegionByKey gets region and leader peer by region key from cluster.
func (c *RaftCluster) GetRegionByKey(regionKey []byte) (*metapb.Region, *metapb.Peer) {
region := c.core.SearchRegion(regionKey)
if region == nil {
return nil, nil
}
return region.GetMeta(), region.GetLeader()
// GetRegionByKey gets regionInfo by region key from cluster.
func (c *RaftCluster) GetRegionByKey(regionKey []byte) *core.RegionInfo {
return c.core.SearchRegion(regionKey)
}

// GetPrevRegionByKey gets previous region and leader peer by the region key from cluster.
func (c *RaftCluster) GetPrevRegionByKey(regionKey []byte) (*metapb.Region, *metapb.Peer) {
region := c.core.SearchPrevRegion(regionKey)
if region == nil {
return nil, nil
}
return region.GetMeta(), region.GetLeader()
}

// GetRegionInfoByKey gets regionInfo by region key from cluster.
func (c *RaftCluster) GetRegionInfoByKey(regionKey []byte) *core.RegionInfo {
return c.core.SearchRegion(regionKey)
func (c *RaftCluster) GetPrevRegionByKey(regionKey []byte) *core.RegionInfo {
return c.core.SearchPrevRegion(regionKey)
}

// ScanRegions scans region with start key, until the region contains endKey, or
Expand All @@ -679,15 +666,6 @@ func (c *RaftCluster) ScanRegions(startKey, endKey []byte, limit int) []*core.Re
return c.core.ScanRange(startKey, endKey, limit)
}

// GetRegionByID gets region and leader peer by regionID from cluster.
func (c *RaftCluster) GetRegionByID(regionID uint64) (*metapb.Region, *metapb.Peer) {
region := c.GetRegion(regionID)
if region == nil {
return nil, nil
}
return region.GetMeta(), region.GetLeader()
}

// GetRegion searches for a region by ID.
func (c *RaftCluster) GetRegion(regionID uint64) *core.RegionInfo {
return c.core.GetRegion(regionID)
Expand Down
28 changes: 14 additions & 14 deletions server/cluster/cluster_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -341,7 +341,7 @@ func (s *testClusterInfoSuite) TestConcurrentRegionHeartbeat(c *C) {
c.Assert(failpoint.Disable("github.com/pingcap/pd/server/cluster/concurrentRegionHeartbeat"), IsNil)
c.Assert(cluster.processRegionHeartbeat(target), IsNil)
wg.Wait()
checkRegion(c, cluster.GetRegionInfoByKey([]byte{}), target)
checkRegion(c, cluster.GetRegionByKey([]byte{}), target)
}

func heartbeatRegions(c *C, cluster *RaftCluster, regions []*core.RegionInfo) {
Expand All @@ -350,23 +350,23 @@ func heartbeatRegions(c *C, cluster *RaftCluster, regions []*core.RegionInfo) {
c.Assert(cluster.processRegionHeartbeat(r), IsNil)

checkRegion(c, cluster.GetRegion(r.GetID()), r)
checkRegion(c, cluster.GetRegionInfoByKey(r.GetStartKey()), r)
checkRegion(c, cluster.GetRegionByKey(r.GetStartKey()), r)

if len(r.GetEndKey()) > 0 {
end := r.GetEndKey()[0]
checkRegion(c, cluster.GetRegionInfoByKey([]byte{end - 1}), r)
checkRegion(c, cluster.GetRegionByKey([]byte{end - 1}), r)
}
}

// Check all regions after handling all heartbeats.
for _, r := range regions {
checkRegion(c, cluster.GetRegion(r.GetID()), r)
checkRegion(c, cluster.GetRegionInfoByKey(r.GetStartKey()), r)
checkRegion(c, cluster.GetRegionByKey(r.GetStartKey()), r)

if len(r.GetEndKey()) > 0 {
end := r.GetEndKey()[0]
checkRegion(c, cluster.GetRegionInfoByKey([]byte{end - 1}), r)
result := cluster.GetRegionInfoByKey([]byte{end + 1})
checkRegion(c, cluster.GetRegionByKey([]byte{end - 1}), r)
result := cluster.GetRegionByKey([]byte{end + 1})
c.Assert(result.GetID(), Not(Equals), r.GetID())
}
}
Expand All @@ -380,7 +380,7 @@ func (s *testClusterInfoSuite) TestHeartbeatSplit(c *C) {
// 1: [nil, nil)
region1 := core.NewRegionInfo(&metapb.Region{Id: 1, RegionEpoch: &metapb.RegionEpoch{Version: 1, ConfVer: 1}}, nil)
c.Assert(cluster.processRegionHeartbeat(region1), IsNil)
checkRegion(c, cluster.GetRegionInfoByKey([]byte("foo")), region1)
checkRegion(c, cluster.GetRegionByKey([]byte("foo")), region1)

// split 1 to 2: [nil, m) 1: [m, nil), sync 2 first.
region1 = region1.Clone(
Expand All @@ -389,12 +389,12 @@ func (s *testClusterInfoSuite) TestHeartbeatSplit(c *C) {
)
region2 := core.NewRegionInfo(&metapb.Region{Id: 2, EndKey: []byte("m"), RegionEpoch: &metapb.RegionEpoch{Version: 1, ConfVer: 1}}, nil)
c.Assert(cluster.processRegionHeartbeat(region2), IsNil)
checkRegion(c, cluster.GetRegionInfoByKey([]byte("a")), region2)
checkRegion(c, cluster.GetRegionByKey([]byte("a")), region2)
// [m, nil) is missing before r1's heartbeat.
c.Assert(cluster.GetRegionInfoByKey([]byte("z")), IsNil)
c.Assert(cluster.GetRegionByKey([]byte("z")), IsNil)

c.Assert(cluster.processRegionHeartbeat(region1), IsNil)
checkRegion(c, cluster.GetRegionInfoByKey([]byte("z")), region1)
checkRegion(c, cluster.GetRegionByKey([]byte("z")), region1)

// split 1 to 3: [m, q) 1: [q, nil), sync 1 first.
region1 = region1.Clone(
Expand All @@ -403,12 +403,12 @@ func (s *testClusterInfoSuite) TestHeartbeatSplit(c *C) {
)
region3 := core.NewRegionInfo(&metapb.Region{Id: 3, StartKey: []byte("m"), EndKey: []byte("q"), RegionEpoch: &metapb.RegionEpoch{Version: 1, ConfVer: 1}}, nil)
c.Assert(cluster.processRegionHeartbeat(region1), IsNil)
checkRegion(c, cluster.GetRegionInfoByKey([]byte("z")), region1)
checkRegion(c, cluster.GetRegionInfoByKey([]byte("a")), region2)
checkRegion(c, cluster.GetRegionByKey([]byte("z")), region1)
checkRegion(c, cluster.GetRegionByKey([]byte("a")), region2)
// [m, q) is missing before r3's heartbeat.
c.Assert(cluster.GetRegionInfoByKey([]byte("n")), IsNil)
c.Assert(cluster.GetRegionByKey([]byte("n")), IsNil)
c.Assert(cluster.processRegionHeartbeat(region3), IsNil)
checkRegion(c, cluster.GetRegionInfoByKey([]byte("n")), region3)
checkRegion(c, cluster.GetRegionByKey([]byte("n")), region3)
}

func (s *testClusterInfoSuite) TestRegionSplitAndMerge(c *C) {
Expand Down
4 changes: 2 additions & 2 deletions server/cluster/cluster_worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -84,13 +84,13 @@ func (c *RaftCluster) HandleAskSplit(request *pdpb.AskSplitRequest) (*pdpb.AskSp
// ValidRequestRegion is used to decide if the region is valid.
func (c *RaftCluster) ValidRequestRegion(reqRegion *metapb.Region) error {
startKey := reqRegion.GetStartKey()
region, _ := c.GetRegionByKey(startKey)
region := c.GetRegionByKey(startKey)
if region == nil {
return errors.Errorf("region not found, request region: %v", core.RegionToHexMeta(reqRegion))
}
// If the request epoch is less than current region epoch, then returns an error.
reqRegionEpoch := reqRegion.GetRegionEpoch()
regionEpoch := region.GetRegionEpoch()
regionEpoch := region.GetMeta().GetRegionEpoch()
if reqRegionEpoch.GetVersion() < regionEpoch.GetVersion() ||
reqRegionEpoch.GetConfVer() < regionEpoch.GetConfVer() {
return errors.Errorf("invalid region epoch, request: %v, currenrt: %v", reqRegionEpoch, regionEpoch)
Expand Down
40 changes: 27 additions & 13 deletions server/grpc_service.go
Original file line number Diff line number Diff line change
Expand Up @@ -416,11 +416,16 @@ func (s *Server) GetRegion(ctx context.Context, request *pdpb.GetRegionRequest)
if rc == nil {
return &pdpb.GetRegionResponse{Header: s.notBootstrappedHeader()}, nil
}
region, leader := rc.GetRegionByKey(request.GetRegionKey())
region := rc.GetRegionByKey(request.GetRegionKey())
if region == nil {
return &pdpb.GetRegionResponse{Header: s.header()}, nil
}
return &pdpb.GetRegionResponse{
Header: s.header(),
Region: region,
Leader: leader,
Header: s.header(),
Region: region.GetMeta(),
Leader: region.GetLeader(),
DownPeers: region.GetDownPeers(),
PendingPeers: region.GetPendingPeers(),
}, nil
}

Expand All @@ -435,11 +440,16 @@ func (s *Server) GetPrevRegion(ctx context.Context, request *pdpb.GetRegionReque
return &pdpb.GetRegionResponse{Header: s.notBootstrappedHeader()}, nil
}

region, leader := rc.GetPrevRegionByKey(request.GetRegionKey())
region := rc.GetPrevRegionByKey(request.GetRegionKey())
if region == nil {
return &pdpb.GetRegionResponse{Header: s.header()}, nil
}
return &pdpb.GetRegionResponse{
Header: s.header(),
Region: region,
Leader: leader,
Header: s.header(),
Region: region.GetMeta(),
Leader: region.GetLeader(),
DownPeers: region.GetDownPeers(),
PendingPeers: region.GetPendingPeers(),
}, nil
}

Expand All @@ -453,12 +463,16 @@ func (s *Server) GetRegionByID(ctx context.Context, request *pdpb.GetRegionByIDR
if rc == nil {
return &pdpb.GetRegionResponse{Header: s.notBootstrappedHeader()}, nil
}
id := request.GetRegionId()
region, leader := rc.GetRegionByID(id)
region := rc.GetRegion(request.GetRegionId())
if region == nil {
return &pdpb.GetRegionResponse{Header: s.header()}, nil
}
return &pdpb.GetRegionResponse{
Header: s.header(),
Region: region,
Leader: leader,
Header: s.header(),
Region: region.GetMeta(),
Leader: region.GetLeader(),
DownPeers: region.GetDownPeers(),
PendingPeers: region.GetPendingPeers(),
}, nil
}

Expand Down
Loading

0 comments on commit 2303e3a

Please sign in to comment.