Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

*: support setting endKey for ScanRange #1700

Merged
merged 7 commits into from
Aug 22, 2019
Merged
Show file tree
Hide file tree
Changes from 6 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 3 additions & 2 deletions client/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ type Client interface {
// Limit limits the maximum number of regions returned.
// If a region has no leader, corresponding leader will be placed by a peer
// with empty value (PeerID is 0).
ScanRegions(ctx context.Context, key []byte, limit int) ([]*metapb.Region, []*metapb.Peer, error)
ScanRegions(ctx context.Context, key, endKey []byte, limit int) ([]*metapb.Region, []*metapb.Peer, error)
// GetStore gets a store from PD by store id.
// The store may expire later. Caller is responsible for caching and taking care
// of store change.
Expand Down Expand Up @@ -689,7 +689,7 @@ func (c *client) GetRegionByID(ctx context.Context, regionID uint64) (*metapb.Re
return resp.GetRegion(), resp.GetLeader(), nil
}

func (c *client) ScanRegions(ctx context.Context, key []byte, limit int) ([]*metapb.Region, []*metapb.Peer, error) {
func (c *client) ScanRegions(ctx context.Context, key, endKey []byte, limit int) ([]*metapb.Region, []*metapb.Peer, error) {
if span := opentracing.SpanFromContext(ctx); span != nil {
span = opentracing.StartSpan("pdclient.ScanRegions", opentracing.ChildOf(span.Context()))
defer span.Finish()
Expand All @@ -700,6 +700,7 @@ func (c *client) ScanRegions(ctx context.Context, key []byte, limit int) ([]*met
resp, err := c.leaderClient().ScanRegions(ctx, &pdpb.ScanRegionsRequest{
Header: c.requestHeader(),
StartKey: key,
EndKey: endKey,
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It looks like line 703 is a better place.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ok

Limit: int32(limit),
})
cancel()
Expand Down
14 changes: 8 additions & 6 deletions client/client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -279,16 +279,16 @@ func (s *testClientSuite) TestScanRegions(c *C) {

// Wait for region heartbeats.
testutil.WaitUntil(c, func(c *C) bool {
scanRegions, _, err := s.client.ScanRegions(context.Background(), []byte{0}, 10)
scanRegions, _, err := s.client.ScanRegions(context.Background(), []byte{0}, nil, 10)
return err == nil && len(scanRegions) == 10
})

// Set leader of region3 to nil.
region3 := core.NewRegionInfo(regions[3], nil)
s.srv.GetRaftCluster().HandleRegionHeartbeat(region3)

check := func(start []byte, limit int, expect []*metapb.Region) {
scanRegions, leaders, err := s.client.ScanRegions(context.Background(), start, limit)
check := func(start, end []byte, limit int, expect []*metapb.Region) {
scanRegions, leaders, err := s.client.ScanRegions(context.Background(), start, end, limit)
c.Assert(err, IsNil)
c.Assert(scanRegions, HasLen, len(expect))
c.Assert(leaders, HasLen, len(expect))
Expand All @@ -305,9 +305,11 @@ func (s *testClientSuite) TestScanRegions(c *C) {
}
}

check([]byte{0}, 10, regions)
check([]byte{1}, 5, regions[1:6])
check([]byte{100}, 1, nil)
check([]byte{0}, nil, 10, regions)
check([]byte{1}, nil, 5, regions[1:6])
check([]byte{100}, nil, 1, nil)
check([]byte{1}, []byte{6}, 0, regions[1:6])
check([]byte{1}, []byte{6}, 2, regions[1:3])
}

func (s *testClientSuite) TestGetRegionByID(c *C) {
Expand Down
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ require (
github.com/pingcap/check v0.0.0-20190102082844-67f458068fc8
github.com/pingcap/errcode v0.0.0-20180921232412-a1a7271709d9
github.com/pingcap/failpoint v0.0.0-20190512135322-30cc7431d99c
github.com/pingcap/kvproto v0.0.0-20190516013202-4cf58ad90b6c
github.com/pingcap/kvproto v0.0.0-20190822090350-11ea838aedf7
github.com/pingcap/log v0.0.0-20190715063458-479153f07ebd
github.com/pkg/errors v0.8.1
github.com/prometheus/client_golang v0.8.0
Expand Down
4 changes: 2 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -108,8 +108,8 @@ github.com/pingcap/errors v0.11.0 h1:DCJQB8jrHbQ1VVlMFIrbj2ApScNNotVmkSNplu2yUt4
github.com/pingcap/errors v0.11.0/go.mod h1:Oi8TUi2kEtXXLMJk9l1cGmz20kV3TaQ0usTwv5KuLY8=
github.com/pingcap/failpoint v0.0.0-20190512135322-30cc7431d99c h1:hvQd3aOLKLF7xvRV6DzvPkKY4QXzfVbjU1BhW0d9yL8=
github.com/pingcap/failpoint v0.0.0-20190512135322-30cc7431d99c/go.mod h1:DNS3Qg7bEDhU6EXNHF+XSv/PGznQaMJ5FWvctpm6pQI=
github.com/pingcap/kvproto v0.0.0-20190516013202-4cf58ad90b6c h1:pY/MQQ5UajEHfSnQS8rFAM9gw9bBKzqBl414cdfhpRQ=
github.com/pingcap/kvproto v0.0.0-20190516013202-4cf58ad90b6c/go.mod h1:QMdbTAXCHzzygQzqcG9uVUgU2fKeSN1GmfMiykdSzzY=
github.com/pingcap/kvproto v0.0.0-20190822090350-11ea838aedf7 h1:tt24R4cv6GlvnmvkHNC1OrS/ETvXxbJkJ1Nrk4prtWo=
github.com/pingcap/kvproto v0.0.0-20190822090350-11ea838aedf7/go.mod h1:QMdbTAXCHzzygQzqcG9uVUgU2fKeSN1GmfMiykdSzzY=
github.com/pingcap/log v0.0.0-20190715063458-479153f07ebd h1:hWDol43WY5PGhsh3+8794bFHY1bPrmu6bTalpssCrGg=
github.com/pingcap/log v0.0.0-20190715063458-479153f07ebd/go.mod h1:WpHUKhNZ18v116SvGrmjkA9CBhYmuUTKL+p8JC9ANEw=
github.com/pkg/errors v0.8.0/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0=
Expand Down
4 changes: 2 additions & 2 deletions pkg/mock/mockcluster/mockcluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,8 +55,8 @@ func (mc *Cluster) allocID() (uint64, error) {
}

// ScanRegions scans region with start key, until number greater than limit.
func (mc *Cluster) ScanRegions(startKey []byte, limit int) []*core.RegionInfo {
return mc.Regions.ScanRange(startKey, limit)
func (mc *Cluster) ScanRegions(startKey, endKey []byte, limit int) []*core.RegionInfo {
return mc.Regions.ScanRange(startKey, endKey, limit)
}

// LoadRegion puts region info without leader
Expand Down
4 changes: 2 additions & 2 deletions server/api/region.go
Original file line number Diff line number Diff line change
Expand Up @@ -149,7 +149,7 @@ func (h *regionsHandler) GetAll(w http.ResponseWriter, r *http.Request) {
h.rd.JSON(w, http.StatusOK, regionsInfo)
}

func (h *regionsHandler) ScanRegionsByKey(w http.ResponseWriter, r *http.Request) {
func (h *regionsHandler) ScanRegions(w http.ResponseWriter, r *http.Request) {
cluster := h.svr.GetRaftCluster()
if cluster == nil {
h.rd.JSON(w, http.StatusInternalServerError, server.ErrNotBootstrapped.Error())
Expand All @@ -170,7 +170,7 @@ func (h *regionsHandler) ScanRegionsByKey(w http.ResponseWriter, r *http.Request
if limit > maxRegionLimit {
limit = maxRegionLimit
}
regions := cluster.ScanRegionsByKey([]byte(startKey), limit)
regions := cluster.ScanRegions([]byte(startKey), nil, limit)
regionsInfo := convertToAPIRegions(regions)
h.rd.JSON(w, http.StatusOK, regionsInfo)
}
Expand Down
2 changes: 1 addition & 1 deletion server/api/router.go
Original file line number Diff line number Diff line change
Expand Up @@ -90,7 +90,7 @@ func createRouter(prefix string, svr *server.Server) *mux.Router {

regionsHandler := newRegionsHandler(svr, rd)
router.HandleFunc("/api/v1/regions", regionsHandler.GetAll).Methods("GET")
router.HandleFunc("/api/v1/regions/key", regionsHandler.ScanRegionsByKey).Methods("GET")
router.HandleFunc("/api/v1/regions/key", regionsHandler.ScanRegions).Methods("GET")
router.HandleFunc("/api/v1/regions/store/{id}", regionsHandler.GetStoreRegions).Methods("GET")
router.HandleFunc("/api/v1/regions/writeflow", regionsHandler.GetTopWriteFlow).Methods("GET")
router.HandleFunc("/api/v1/regions/readflow", regionsHandler.GetTopReadFlow).Methods("GET")
Expand Down
19 changes: 5 additions & 14 deletions server/cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -557,19 +557,10 @@ func (c *RaftCluster) GetRegionInfoByKey(regionKey []byte) *core.RegionInfo {
return c.core.SearchRegion(regionKey)
}

// ScanRegionsByKey scans region with start key, until number greater than limit.
func (c *RaftCluster) ScanRegionsByKey(startKey []byte, limit int) []*core.RegionInfo {
return c.core.ScanRange(startKey, limit)
}

// ScanRegions scans region with start key, until number greater than limit.
func (c *RaftCluster) ScanRegions(startKey []byte, limit int) []*core.RegionInfo {
return c.core.ScanRange(startKey, limit)
}

// ScanRangeWithEndKey scans regions intersecting [start key, end key).
func (c *RaftCluster) ScanRangeWithEndKey(startKey, endKey []byte) []*core.RegionInfo {
return c.core.ScanRangeWithEndKey(startKey, endKey)
// ScanRegions scans region with start key, until the region contains endKey, or
// total number greater than limit.
func (c *RaftCluster) ScanRegions(startKey, endKey []byte, limit int) []*core.RegionInfo {
return c.core.ScanRange(startKey, endKey, limit)
}

// GetRegionByID gets region and leader peer by regionID from cluster.
Expand Down Expand Up @@ -660,7 +651,7 @@ func (c *RaftCluster) GetAverageRegionSize() int64 {
func (c *RaftCluster) GetRegionStats(startKey, endKey []byte) *statistics.RegionStats {
c.RLock()
defer c.RUnlock()
return statistics.GetRegionStats(c.core.ScanRangeWithEndKey, startKey, endKey)
return statistics.GetRegionStats(c.core.ScanRange(startKey, endKey, -1))
}

// GetStoresStats returns stores' statistics from cluster.
Expand Down
2 changes: 1 addition & 1 deletion server/coordinator.go
Original file line number Diff line number Diff line change
Expand Up @@ -108,7 +108,7 @@ func (c *coordinator) patrolRegions() {
return
}

regions := c.cluster.ScanRegions(key, patrolScanRegionLimit)
regions := c.cluster.ScanRegions(key, nil, patrolScanRegionLimit)
if len(regions) == 0 {
// Resets the scan key.
key = nil
Expand Down
17 changes: 5 additions & 12 deletions server/core/basic_cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -284,19 +284,12 @@ func (bc *BasicCluster) SearchPrevRegion(regionKey []byte) *RegionInfo {
return bc.Regions.SearchPrevRegion(regionKey)
}

// ScanRange scans from the first region containing or behind start key,
// until number greater than limit.
func (bc *BasicCluster) ScanRange(startKey []byte, limit int) []*RegionInfo {
// ScanRange scans regions intersecting [start key, end key), returns at most
// `limit` regions. limit <= 0 means no limit.
func (bc *BasicCluster) ScanRange(startKey, endKey []byte, limit int) []*RegionInfo {
bc.RLock()
defer bc.RUnlock()
return bc.Regions.ScanRange(startKey, limit)
}

// ScanRangeWithEndKey scans regions intersecting [start key, end key).
func (bc *BasicCluster) ScanRangeWithEndKey(startKey, endKey []byte) []*RegionInfo {
bc.RLock()
defer bc.RUnlock()
return bc.Regions.ScanRangeWithEndKey(startKey, endKey)
return bc.Regions.ScanRange(startKey, endKey, limit)
}

// GetOverlaps returns the regions which are overlapped with the specified region range.
Expand All @@ -322,7 +315,7 @@ type RegionSetInformer interface {
GetStoreRegionCount(storeID uint64) int
GetRegion(id uint64) *RegionInfo
GetAdjacentRegions(region *RegionInfo) (*RegionInfo, *RegionInfo)
ScanRegions(startKey []byte, limit int) []*RegionInfo
ScanRegions(startKey, endKey []byte, limit int) []*RegionInfo
}

// StoreSetInformer provides access to a shared informer of stores.
Expand Down
25 changes: 8 additions & 17 deletions server/core/region.go
Original file line number Diff line number Diff line change
Expand Up @@ -709,30 +709,21 @@ func (r *RegionsInfo) GetFollower(storeID uint64, regionID uint64) *RegionInfo {
return r.followers[storeID].Get(regionID)
}

// ScanRange scans from the first region containing or behind start key,
// until number greater than limit.
func (r *RegionsInfo) ScanRange(startKey []byte, limit int) []*RegionInfo {
res := make([]*RegionInfo, 0, limit)
r.tree.scanRange(startKey, func(metaRegion *metapb.Region) bool {
res = append(res, r.GetRegion(metaRegion.GetId()))
return len(res) < limit
})
return res
}

// ScanRangeWithEndKey scans regions intersecting [start key, end key).
func (r *RegionsInfo) ScanRangeWithEndKey(startKey, endKey []byte) []*RegionInfo {
var regions []*RegionInfo
// ScanRange scans regions intersecting [start key, end key), returns at most
// `limit` regions. limit <= 0 means no limit.
func (r *RegionsInfo) ScanRange(startKey, endKey []byte, limit int) []*RegionInfo {
var res []*RegionInfo
r.tree.scanRange(startKey, func(meta *metapb.Region) bool {
if len(endKey) > 0 && bytes.Compare(meta.StartKey, endKey) >= 0 {
return false
}
if region := r.GetRegion(meta.GetId()); region != nil {
regions = append(regions, region)
if limit > 0 && len(res) >= limit {
return false
}
res = append(res, r.GetRegion(meta.GetId()))
Luffbee marked this conversation as resolved.
Show resolved Hide resolved
return true
})
return regions
return res
}

// ScanRangeWithIterator scans from the first region containing or behind start key,
Expand Down
2 changes: 1 addition & 1 deletion server/grpc_service.go
Original file line number Diff line number Diff line change
Expand Up @@ -458,7 +458,7 @@ func (s *Server) ScanRegions(ctx context.Context, request *pdpb.ScanRegionsReque
if cluster == nil {
return &pdpb.ScanRegionsResponse{Header: s.notBootstrappedHeader()}, nil
}
regions := cluster.ScanRegionsByKey(request.GetStartKey(), int(request.GetLimit()))
regions := cluster.ScanRegions(request.GetStartKey(), request.GetEndKey(), int(request.GetLimit()))
resp := &pdpb.ScanRegionsResponse{Header: s.header()}
for _, r := range regions {
leader := r.GetLeader()
Expand Down
24 changes: 2 additions & 22 deletions server/schedule/range_cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,8 +14,6 @@
package schedule

import (
"bytes"

"github.com/gogo/protobuf/proto"
"github.com/pingcap/kvproto/pkg/pdpb"
"github.com/pingcap/pd/server/core"
Expand All @@ -34,26 +32,8 @@ const scanLimit = 128
// The cluster can only know the regions within [startKey, endKey].
func GenRangeCluster(cluster Cluster, startKey, endKey []byte) *RangeCluster {
regions := core.NewRegionsInfo()
scanKey := startKey
loopEnd := false
for !loopEnd {
collect := cluster.ScanRegions(scanKey, scanLimit)
if len(collect) == 0 {
break
}
for _, r := range collect {
if bytes.Compare(r.GetStartKey(), endKey) < 0 {
regions.SetRegion(r)
} else {
loopEnd = true
break
}
if string(r.GetEndKey()) == "" {
loopEnd = true
break
}
scanKey = r.GetEndKey()
}
for _, r := range cluster.ScanRegions(startKey, endKey, -1) {
regions.AddRegion(r)
}
return &RangeCluster{
Cluster: cluster,
Expand Down
2 changes: 1 addition & 1 deletion server/schedulers/adjacent_region.go
Original file line number Diff line number Diff line change
Expand Up @@ -157,7 +157,7 @@ func (l *balanceAdjacentRegionScheduler) Schedule(cluster schedule.Cluster) []*o
}

l.cacheRegions.clear()
regions := cluster.ScanRegions(l.lastKey, scanLimit)
regions := cluster.ScanRegions(l.lastKey, nil, scanLimit)
// scan to the end
if len(regions) <= 1 {
schedulerStatus.WithLabelValues(l.GetName(), "adjacent_count").Set(float64(l.adjacentRegionsCount))
Expand Down
6 changes: 2 additions & 4 deletions server/statistics/region.go
Original file line number Diff line number Diff line change
Expand Up @@ -123,11 +123,9 @@ func (s *RegionStats) Observe(r *core.RegionInfo) {
}
}

// GetRegionStats scans regions that intersect range [startKey, endKey) and sums up
// their statistics.
func GetRegionStats(f func(startKey, endKey []byte) []*core.RegionInfo, startKey, endKey []byte) *RegionStats {
// GetRegionStats sums regions' statistics.
func GetRegionStats(regions []*core.RegionInfo) *RegionStats {
stats := newRegionStats()
regions := f(startKey, endKey)
for _, region := range regions {
stats.Observe(region)
}
Expand Down