Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

server, client: add down_peer and pending_peer in region response (#2429) #2443

Merged
merged 6 commits into from
May 25, 2020
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
48 changes: 36 additions & 12 deletions client/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,14 @@ import (
"go.uber.org/zap"
)

// Region contains information of a region's meta and its peers.
type Region struct {
Meta *metapb.Region
Leader *metapb.Peer
DownPeers []*metapb.Peer
PendingPeers []*metapb.Peer
}

// Client is a PD (Placement Driver) client.
// It should not be used after calling Close().
type Client interface {
Expand All @@ -44,11 +52,11 @@ type Client interface {
// taking care of region change.
// Also it may return nil if PD finds no Region for the key temporarily,
// client should retry later.
GetRegion(ctx context.Context, key []byte) (*metapb.Region, *metapb.Peer, error)
GetRegion(ctx context.Context, key []byte) (*Region, error)
// GetPrevRegion gets the previous region and its leader Peer of the region where the key is located.
GetPrevRegion(ctx context.Context, key []byte) (*metapb.Region, *metapb.Peer, error)
GetPrevRegion(ctx context.Context, key []byte) (*Region, error)
// GetRegionByID gets a region and its leader Peer from PD by id.
GetRegionByID(ctx context.Context, regionID uint64) (*metapb.Region, *metapb.Peer, error)
GetRegionByID(ctx context.Context, regionID uint64) (*Region, error)
// ScanRegion gets a list of regions, starts from the region that contains key.
// Limit limits the maximum number of regions returned.
// If a region has no leader, corresponding leader will be placed by a peer
Expand Down Expand Up @@ -429,7 +437,23 @@ func (c *client) GetTS(ctx context.Context) (physical int64, logical int64, err
return resp.Wait()
}

func (c *client) GetRegion(ctx context.Context, key []byte) (*metapb.Region, *metapb.Peer, error) {
func (c *client) parseRegionResponse(res *pdpb.GetRegionResponse) *Region {
if res.Region == nil {
return nil
}

r := &Region{
Meta: res.Region,
Leader: res.Leader,
PendingPeers: res.PendingPeers,
}
for _, s := range res.DownPeers {
r.DownPeers = append(r.DownPeers, s.Peer)
}
return r
}

func (c *client) GetRegion(ctx context.Context, key []byte) (*Region, error) {
if span := opentracing.SpanFromContext(ctx); span != nil {
span = opentracing.StartSpan("pdclient.GetRegion", opentracing.ChildOf(span.Context()))
defer span.Finish()
Expand All @@ -447,12 +471,12 @@ func (c *client) GetRegion(ctx context.Context, key []byte) (*metapb.Region, *me
if err != nil {
cmdFailDurationGetRegion.Observe(time.Since(start).Seconds())
c.ScheduleCheckLeader()
return nil, nil, errors.WithStack(err)
return nil, errors.WithStack(err)
}
return resp.GetRegion(), resp.GetLeader(), nil
return c.parseRegionResponse(resp), nil
}

func (c *client) GetPrevRegion(ctx context.Context, key []byte) (*metapb.Region, *metapb.Peer, error) {
func (c *client) GetPrevRegion(ctx context.Context, key []byte) (*Region, error) {
if span := opentracing.SpanFromContext(ctx); span != nil {
span = opentracing.StartSpan("pdclient.GetPrevRegion", opentracing.ChildOf(span.Context()))
defer span.Finish()
Expand All @@ -470,12 +494,12 @@ func (c *client) GetPrevRegion(ctx context.Context, key []byte) (*metapb.Region,
if err != nil {
cmdFailDurationGetPrevRegion.Observe(time.Since(start).Seconds())
c.ScheduleCheckLeader()
return nil, nil, errors.WithStack(err)
return nil, errors.WithStack(err)
}
return resp.GetRegion(), resp.GetLeader(), nil
return c.parseRegionResponse(resp), nil
}

func (c *client) GetRegionByID(ctx context.Context, regionID uint64) (*metapb.Region, *metapb.Peer, error) {
func (c *client) GetRegionByID(ctx context.Context, regionID uint64) (*Region, error) {
if span := opentracing.SpanFromContext(ctx); span != nil {
span = opentracing.StartSpan("pdclient.GetRegionByID", opentracing.ChildOf(span.Context()))
defer span.Finish()
Expand All @@ -493,9 +517,9 @@ func (c *client) GetRegionByID(ctx context.Context, regionID uint64) (*metapb.Re
if err != nil {
cmdFailedDurationGetRegionByID.Observe(time.Since(start).Seconds())
c.ScheduleCheckLeader()
return nil, nil, errors.WithStack(err)
return nil, errors.WithStack(err)
}
return resp.GetRegion(), resp.GetLeader(), nil
return c.parseRegionResponse(resp), nil
}

func (c *client) ScanRegions(ctx context.Context, key, endKey []byte, limit int) ([]*metapb.Region, []*metapb.Peer, error) {
Expand Down
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ require (
github.com/pingcap/check v0.0.0-20191216031241-8a5a85928f12
github.com/pingcap/errcode v0.0.0-20180921232412-a1a7271709d9
github.com/pingcap/failpoint v0.0.0-20191029060244-12f4ac2fd11d
github.com/pingcap/kvproto v0.0.0-20200420075417-e0c6e8842f22
github.com/pingcap/kvproto v0.0.0-20200518112156-d4aeb467de29
github.com/pingcap/log v0.0.0-20200117041106-d28c14d3b1cd
github.com/pingcap/sysutil v0.0.0-20200408114249-ed3bd6f7fdb1
github.com/pkg/errors v0.9.1
Expand Down
4 changes: 2 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -312,8 +312,8 @@ github.com/pingcap/failpoint v0.0.0-20191029060244-12f4ac2fd11d h1:F8vp38kTAckN+
github.com/pingcap/failpoint v0.0.0-20191029060244-12f4ac2fd11d/go.mod h1:DNS3Qg7bEDhU6EXNHF+XSv/PGznQaMJ5FWvctpm6pQI=
github.com/pingcap/kvproto v0.0.0-20191211054548-3c6b38ea5107/go.mod h1:WWLmULLO7l8IOcQG+t+ItJ3fEcrL5FxF0Wu+HrMy26w=
github.com/pingcap/kvproto v0.0.0-20200411081810-b85805c9476c/go.mod h1:IOdRDPLyda8GX2hE/jO7gqaCV/PNFh8BZQCQZXfIOqI=
github.com/pingcap/kvproto v0.0.0-20200420075417-e0c6e8842f22 h1:D5EBGKd6o4A0PV0sUaUduPSCShiNi0OwFJmf+xRzpuI=
github.com/pingcap/kvproto v0.0.0-20200420075417-e0c6e8842f22/go.mod h1:IOdRDPLyda8GX2hE/jO7gqaCV/PNFh8BZQCQZXfIOqI=
github.com/pingcap/kvproto v0.0.0-20200518112156-d4aeb467de29 h1:NpW1OuYrIl+IQrSsVbtyHpHpazmSCHy+ysrOixY0xY4=
github.com/pingcap/kvproto v0.0.0-20200518112156-d4aeb467de29/go.mod h1:IOdRDPLyda8GX2hE/jO7gqaCV/PNFh8BZQCQZXfIOqI=
github.com/pingcap/log v0.0.0-20191012051959-b742a5d432e9 h1:AJD9pZYm72vMgPcQDww9rkZ1DnWfl0pXV3BOWlkYIjA=
github.com/pingcap/log v0.0.0-20191012051959-b742a5d432e9/go.mod h1:4rbK1p9ILyIfb6hU7OG2CiWSqMXnp3JMbiaVJ6mvoY8=
github.com/pingcap/log v0.0.0-20200117041106-d28c14d3b1cd h1:CV3VsP3Z02MVtdpTMfEgRJ4T9NGgGTxdHpJerent7rM=
Expand Down
4 changes: 2 additions & 2 deletions server/api/admin_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ func (s *testAdminSuite) TestDropRegion(c *C) {
cluster := s.svr.GetRaftCluster()

// Update region's epoch to (100, 100).
region := cluster.GetRegionInfoByKey([]byte("foo")).Clone(
region := cluster.GetRegionByKey([]byte("foo")).Clone(
core.SetRegionConfVer(100),
core.SetRegionVersion(100),
)
Expand All @@ -78,7 +78,7 @@ func (s *testAdminSuite) TestDropRegion(c *C) {
err = cluster.HandleRegionHeartbeat(region)
c.Assert(err, IsNil)

region = cluster.GetRegionInfoByKey([]byte("foo"))
region = cluster.GetRegionByKey([]byte("foo"))
c.Assert(region.GetRegionEpoch().ConfVer, Equals, uint64(50))
c.Assert(region.GetRegionEpoch().Version, Equals, uint64(50))
}
Expand Down
2 changes: 1 addition & 1 deletion server/api/region.go
Original file line number Diff line number Diff line change
Expand Up @@ -151,7 +151,7 @@ func (h *regionHandler) GetRegionByKey(w http.ResponseWriter, r *http.Request) {
h.rd.JSON(w, http.StatusBadRequest, err.Error())
return
}
regionInfo := rc.GetRegionInfoByKey([]byte(key))
regionInfo := rc.GetRegionByKey([]byte(key))
h.rd.JSON(w, http.StatusOK, NewRegionInfo(regionInfo))
}

Expand Down
32 changes: 5 additions & 27 deletions server/cluster/cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -650,27 +650,14 @@ func (c *RaftCluster) putMetaLocked(meta *metapb.Cluster) error {
return nil
}

// GetRegionByKey gets region and leader peer by region key from cluster.
func (c *RaftCluster) GetRegionByKey(regionKey []byte) (*metapb.Region, *metapb.Peer) {
region := c.core.SearchRegion(regionKey)
if region == nil {
return nil, nil
}
return region.GetMeta(), region.GetLeader()
// GetRegionByKey gets regionInfo by region key from cluster.
func (c *RaftCluster) GetRegionByKey(regionKey []byte) *core.RegionInfo {
return c.core.SearchRegion(regionKey)
}

// GetPrevRegionByKey gets previous region and leader peer by the region key from cluster.
func (c *RaftCluster) GetPrevRegionByKey(regionKey []byte) (*metapb.Region, *metapb.Peer) {
region := c.core.SearchPrevRegion(regionKey)
if region == nil {
return nil, nil
}
return region.GetMeta(), region.GetLeader()
}

// GetRegionInfoByKey gets regionInfo by region key from cluster.
func (c *RaftCluster) GetRegionInfoByKey(regionKey []byte) *core.RegionInfo {
return c.core.SearchRegion(regionKey)
func (c *RaftCluster) GetPrevRegionByKey(regionKey []byte) *core.RegionInfo {
return c.core.SearchPrevRegion(regionKey)
}

// ScanRegions scans region with start key, until the region contains endKey, or
Expand All @@ -679,15 +666,6 @@ func (c *RaftCluster) ScanRegions(startKey, endKey []byte, limit int) []*core.Re
return c.core.ScanRange(startKey, endKey, limit)
}

// GetRegionByID gets region and leader peer by regionID from cluster.
func (c *RaftCluster) GetRegionByID(regionID uint64) (*metapb.Region, *metapb.Peer) {
region := c.GetRegion(regionID)
if region == nil {
return nil, nil
}
return region.GetMeta(), region.GetLeader()
}

// GetRegion searches for a region by ID.
func (c *RaftCluster) GetRegion(regionID uint64) *core.RegionInfo {
return c.core.GetRegion(regionID)
Expand Down
28 changes: 14 additions & 14 deletions server/cluster/cluster_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -341,7 +341,7 @@ func (s *testClusterInfoSuite) TestConcurrentRegionHeartbeat(c *C) {
c.Assert(failpoint.Disable("github.com/pingcap/pd/server/cluster/concurrentRegionHeartbeat"), IsNil)
c.Assert(cluster.processRegionHeartbeat(target), IsNil)
wg.Wait()
checkRegion(c, cluster.GetRegionInfoByKey([]byte{}), target)
checkRegion(c, cluster.GetRegionByKey([]byte{}), target)
}

func heartbeatRegions(c *C, cluster *RaftCluster, regions []*core.RegionInfo) {
Expand All @@ -350,23 +350,23 @@ func heartbeatRegions(c *C, cluster *RaftCluster, regions []*core.RegionInfo) {
c.Assert(cluster.processRegionHeartbeat(r), IsNil)

checkRegion(c, cluster.GetRegion(r.GetID()), r)
checkRegion(c, cluster.GetRegionInfoByKey(r.GetStartKey()), r)
checkRegion(c, cluster.GetRegionByKey(r.GetStartKey()), r)

if len(r.GetEndKey()) > 0 {
end := r.GetEndKey()[0]
checkRegion(c, cluster.GetRegionInfoByKey([]byte{end - 1}), r)
checkRegion(c, cluster.GetRegionByKey([]byte{end - 1}), r)
}
}

// Check all regions after handling all heartbeats.
for _, r := range regions {
checkRegion(c, cluster.GetRegion(r.GetID()), r)
checkRegion(c, cluster.GetRegionInfoByKey(r.GetStartKey()), r)
checkRegion(c, cluster.GetRegionByKey(r.GetStartKey()), r)

if len(r.GetEndKey()) > 0 {
end := r.GetEndKey()[0]
checkRegion(c, cluster.GetRegionInfoByKey([]byte{end - 1}), r)
result := cluster.GetRegionInfoByKey([]byte{end + 1})
checkRegion(c, cluster.GetRegionByKey([]byte{end - 1}), r)
result := cluster.GetRegionByKey([]byte{end + 1})
c.Assert(result.GetID(), Not(Equals), r.GetID())
}
}
Expand All @@ -380,7 +380,7 @@ func (s *testClusterInfoSuite) TestHeartbeatSplit(c *C) {
// 1: [nil, nil)
region1 := core.NewRegionInfo(&metapb.Region{Id: 1, RegionEpoch: &metapb.RegionEpoch{Version: 1, ConfVer: 1}}, nil)
c.Assert(cluster.processRegionHeartbeat(region1), IsNil)
checkRegion(c, cluster.GetRegionInfoByKey([]byte("foo")), region1)
checkRegion(c, cluster.GetRegionByKey([]byte("foo")), region1)

// split 1 to 2: [nil, m) 1: [m, nil), sync 2 first.
region1 = region1.Clone(
Expand All @@ -389,12 +389,12 @@ func (s *testClusterInfoSuite) TestHeartbeatSplit(c *C) {
)
region2 := core.NewRegionInfo(&metapb.Region{Id: 2, EndKey: []byte("m"), RegionEpoch: &metapb.RegionEpoch{Version: 1, ConfVer: 1}}, nil)
c.Assert(cluster.processRegionHeartbeat(region2), IsNil)
checkRegion(c, cluster.GetRegionInfoByKey([]byte("a")), region2)
checkRegion(c, cluster.GetRegionByKey([]byte("a")), region2)
// [m, nil) is missing before r1's heartbeat.
c.Assert(cluster.GetRegionInfoByKey([]byte("z")), IsNil)
c.Assert(cluster.GetRegionByKey([]byte("z")), IsNil)

c.Assert(cluster.processRegionHeartbeat(region1), IsNil)
checkRegion(c, cluster.GetRegionInfoByKey([]byte("z")), region1)
checkRegion(c, cluster.GetRegionByKey([]byte("z")), region1)

// split 1 to 3: [m, q) 1: [q, nil), sync 1 first.
region1 = region1.Clone(
Expand All @@ -403,12 +403,12 @@ func (s *testClusterInfoSuite) TestHeartbeatSplit(c *C) {
)
region3 := core.NewRegionInfo(&metapb.Region{Id: 3, StartKey: []byte("m"), EndKey: []byte("q"), RegionEpoch: &metapb.RegionEpoch{Version: 1, ConfVer: 1}}, nil)
c.Assert(cluster.processRegionHeartbeat(region1), IsNil)
checkRegion(c, cluster.GetRegionInfoByKey([]byte("z")), region1)
checkRegion(c, cluster.GetRegionInfoByKey([]byte("a")), region2)
checkRegion(c, cluster.GetRegionByKey([]byte("z")), region1)
checkRegion(c, cluster.GetRegionByKey([]byte("a")), region2)
// [m, q) is missing before r3's heartbeat.
c.Assert(cluster.GetRegionInfoByKey([]byte("n")), IsNil)
c.Assert(cluster.GetRegionByKey([]byte("n")), IsNil)
c.Assert(cluster.processRegionHeartbeat(region3), IsNil)
checkRegion(c, cluster.GetRegionInfoByKey([]byte("n")), region3)
checkRegion(c, cluster.GetRegionByKey([]byte("n")), region3)
}

func (s *testClusterInfoSuite) TestRegionSplitAndMerge(c *C) {
Expand Down
4 changes: 2 additions & 2 deletions server/cluster/cluster_worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -84,13 +84,13 @@ func (c *RaftCluster) HandleAskSplit(request *pdpb.AskSplitRequest) (*pdpb.AskSp
// ValidRequestRegion is used to decide if the region is valid.
func (c *RaftCluster) ValidRequestRegion(reqRegion *metapb.Region) error {
startKey := reqRegion.GetStartKey()
region, _ := c.GetRegionByKey(startKey)
region := c.GetRegionByKey(startKey)
if region == nil {
return errors.Errorf("region not found, request region: %v", core.RegionToHexMeta(reqRegion))
}
// If the request epoch is less than current region epoch, then returns an error.
reqRegionEpoch := reqRegion.GetRegionEpoch()
regionEpoch := region.GetRegionEpoch()
regionEpoch := region.GetMeta().GetRegionEpoch()
if reqRegionEpoch.GetVersion() < regionEpoch.GetVersion() ||
reqRegionEpoch.GetConfVer() < regionEpoch.GetConfVer() {
return errors.Errorf("invalid region epoch, request: %v, currenrt: %v", reqRegionEpoch, regionEpoch)
Expand Down
40 changes: 27 additions & 13 deletions server/grpc_service.go
Original file line number Diff line number Diff line change
Expand Up @@ -416,11 +416,16 @@ func (s *Server) GetRegion(ctx context.Context, request *pdpb.GetRegionRequest)
if rc == nil {
return &pdpb.GetRegionResponse{Header: s.notBootstrappedHeader()}, nil
}
region, leader := rc.GetRegionByKey(request.GetRegionKey())
region := rc.GetRegionByKey(request.GetRegionKey())
if region == nil {
return &pdpb.GetRegionResponse{Header: s.header()}, nil
}
return &pdpb.GetRegionResponse{
Header: s.header(),
Region: region,
Leader: leader,
Header: s.header(),
Region: region.GetMeta(),
Leader: region.GetLeader(),
DownPeers: region.GetDownPeers(),
PendingPeers: region.GetPendingPeers(),
}, nil
}

Expand All @@ -435,11 +440,16 @@ func (s *Server) GetPrevRegion(ctx context.Context, request *pdpb.GetRegionReque
return &pdpb.GetRegionResponse{Header: s.notBootstrappedHeader()}, nil
}

region, leader := rc.GetPrevRegionByKey(request.GetRegionKey())
region := rc.GetPrevRegionByKey(request.GetRegionKey())
if region == nil {
return &pdpb.GetRegionResponse{Header: s.header()}, nil
}
return &pdpb.GetRegionResponse{
Header: s.header(),
Region: region,
Leader: leader,
Header: s.header(),
Region: region.GetMeta(),
Leader: region.GetLeader(),
DownPeers: region.GetDownPeers(),
PendingPeers: region.GetPendingPeers(),
}, nil
}

Expand All @@ -453,12 +463,16 @@ func (s *Server) GetRegionByID(ctx context.Context, request *pdpb.GetRegionByIDR
if rc == nil {
return &pdpb.GetRegionResponse{Header: s.notBootstrappedHeader()}, nil
}
id := request.GetRegionId()
region, leader := rc.GetRegionByID(id)
region := rc.GetRegion(request.GetRegionId())
if region == nil {
return &pdpb.GetRegionResponse{Header: s.header()}, nil
}
return &pdpb.GetRegionResponse{
Header: s.header(),
Region: region,
Leader: leader,
Header: s.header(),
Region: region.GetMeta(),
Leader: region.GetLeader(),
DownPeers: region.GetDownPeers(),
PendingPeers: region.GetPendingPeers(),
}, nil
}

Expand Down
Loading