Skip to content

Commit

Permalink
Merge branch 'master' into remove-clone
Browse files Browse the repository at this point in the history
  • Loading branch information
nolouch authored Sep 5, 2018
2 parents 643fccd + 0d99adc commit acf87a2
Show file tree
Hide file tree
Showing 10 changed files with 183 additions and 25 deletions.
52 changes: 51 additions & 1 deletion pdctl/command/region_command.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,8 @@ var (
regionsCheckPrefix = "pd/api/v1/regions/check"
regionsWriteflowPrefix = "pd/api/v1/regions/writeflow"
regionsReadflowPrefix = "pd/api/v1/regions/readflow"
regionsConfVerPrefix = "pd/api/v1/regions/confver"
regionsVersionPrefix = "pd/api/v1/regions/version"
regionsSiblingPrefix = "pd/api/v1/regions/sibling"
regionIDPrefix = "pd/api/v1/region/id"
regionKeyPrefix = "pd/api/v1/region/key"
Expand Down Expand Up @@ -60,6 +62,20 @@ func NewRegionCommand() *cobra.Command {
Run: showRegionTopWriteCommandFunc,
}
r.AddCommand(topWrite)

topConfVer := &cobra.Command{
Use: "topconfver <limit>",
Short: "show regions with top conf version",
Run: showRegionTopConfVerCommandFunc,
}
r.AddCommand(topConfVer)

topVersion := &cobra.Command{
Use: "topversion <limit>",
Short: "show regions with top version",
Run: showRegionTopVersionCommandFunc,
}
r.AddCommand(topVersion)
r.Flags().String("jq", "", "jq query")

return r
Expand Down Expand Up @@ -121,7 +137,41 @@ func showRegionTopReadCommandFunc(cmd *cobra.Command, args []string) {
fmt.Println(r)
}

// NewRegionWithKeyCommand returns a region with key subcommand of regionCmd
func showRegionTopConfVerCommandFunc(cmd *cobra.Command, args []string) {
prefix := regionsConfVerPrefix
if len(args) == 1 {
if _, err := strconv.Atoi(args[0]); err != nil {
fmt.Println("limit should be a number")
return
}
prefix += "?limit=" + args[0]
}
r, err := doRequest(cmd, prefix, http.MethodGet)
if err != nil {
fmt.Printf("Failed to get regions: %s\n", err)
return
}
fmt.Println(r)
}

func showRegionTopVersionCommandFunc(cmd *cobra.Command, args []string) {
prefix := regionsVersionPrefix
if len(args) == 1 {
if _, err := strconv.Atoi(args[0]); err != nil {
fmt.Println("limit should be a number")
return
}
prefix += "?limit=" + args[0]
}
r, err := doRequest(cmd, prefix, http.MethodGet)
if err != nil {
fmt.Printf("Failed to get regions: %s\n", err)
return
}
fmt.Println(r)
}

// NewRegionWithKeyCommand return a region with key subcommand of regionCmd
func NewRegionWithKeyCommand() *cobra.Command {
r := &cobra.Command{
Use: "key [--format=raw|pb|proto|protobuf] <key>",
Expand Down
32 changes: 32 additions & 0 deletions server/api/api.raml
Original file line number Diff line number Diff line change
Expand Up @@ -831,6 +831,38 @@ types:
description: The input is invalid.
500:
description: PD server failed to proceed the request.
/confver:
get:
description: List regions with the largest conf version.
queryParameters:
limit?:
type: integer
default: 16
responses:
200:
body:
application/json:
type: Regions
400:
description: The input is invalid.
500:
description: PD server failed to proceed the request.
/version:
get:
description: List regions with the largest version.
queryParameters:
limit?:
type: integer
default: 16
responses:
200:
body:
application/json:
type: Regions
400:
description: The input is invalid.
500:
description: PD server failed to proceed the request.
/check/{filter}:
uriParameters:
filter:
Expand Down
12 changes: 12 additions & 0 deletions server/api/region.go
Original file line number Diff line number Diff line change
Expand Up @@ -255,6 +255,18 @@ func (h *regionsHandler) GetTopReadFlow(w http.ResponseWriter, r *http.Request)
h.GetTopNRegions(w, r, func(a, b *core.RegionInfo) bool { return a.GetBytesRead() < b.GetBytesRead() })
}

func (h *regionsHandler) GetTopConfVer(w http.ResponseWriter, r *http.Request) {
h.GetTopNRegions(w, r, func(a, b *core.RegionInfo) bool {
return a.GetMeta().GetRegionEpoch().GetConfVer() < b.GetMeta().GetRegionEpoch().GetConfVer()
})
}

func (h *regionsHandler) GetTopVersion(w http.ResponseWriter, r *http.Request) {
h.GetTopNRegions(w, r, func(a, b *core.RegionInfo) bool {
return a.GetMeta().GetRegionEpoch().GetVersion() < b.GetMeta().GetRegionEpoch().GetVersion()
})
}

func (h *regionsHandler) GetTopNRegions(w http.ResponseWriter, r *http.Request, less func(a, b *core.RegionInfo) bool) {
cluster := h.svr.GetRaftCluster()
if cluster == nil {
Expand Down
27 changes: 16 additions & 11 deletions server/api/region_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,10 +52,11 @@ func newTestRegionInfo(regionID, storeID uint64, start, end []byte, opts ...core
StoreId: storeID,
}
metaRegion := &metapb.Region{
Id: regionID,
StartKey: start,
EndKey: end,
Peers: []*metapb.Peer{leader},
Id: regionID,
StartKey: start,
EndKey: end,
Peers: []*metapb.Peer{leader},
RegionEpoch: &metapb.RegionEpoch{ConfVer: 1, Version: 1},
}
newOpts := []core.RegionCreateOption{
core.SetApproximateKeys(10),
Expand Down Expand Up @@ -120,18 +121,22 @@ func (s *testRegionSuite) TestStoreRegions(c *C) {
}

func (s *testRegionSuite) TestTopFlow(c *C) {
r1 := newTestRegionInfo(1, 1, []byte("a"), []byte("b"), core.SetWrittenBytes(1000), core.SetReadBytes(1000))
r1 := newTestRegionInfo(1, 1, []byte("a"), []byte("b"), core.SetWrittenBytes(1000), core.SetReadBytes(1000), core.SetRegionConfVer(1), core.SetRegionVersion(1))
mustRegionHeartbeat(c, s.svr, r1)
r2 := newTestRegionInfo(2, 1, []byte("b"), []byte("c"), core.SetWrittenBytes(2000), core.SetReadBytes(0))
r2 := newTestRegionInfo(2, 1, []byte("b"), []byte("c"), core.SetWrittenBytes(2000), core.SetReadBytes(0), core.SetRegionConfVer(2), core.SetRegionVersion(3))
mustRegionHeartbeat(c, s.svr, r2)
r3 := newTestRegionInfo(3, 1, []byte("c"), []byte("d"), core.SetWrittenBytes(500), core.SetReadBytes(800))
r3 := newTestRegionInfo(3, 1, []byte("c"), []byte("d"), core.SetWrittenBytes(500), core.SetReadBytes(800), core.SetRegionConfVer(3), core.SetRegionVersion(2))
mustRegionHeartbeat(c, s.svr, r3)
s.checkTopFlow(c, fmt.Sprintf("%s/regions/writeflow", s.urlPrefix), []uint64{2, 1, 3})
s.checkTopFlow(c, fmt.Sprintf("%s/regions/readflow", s.urlPrefix), []uint64{1, 3, 2})
s.checkTopFlow(c, fmt.Sprintf("%s/regions/writeflow?limit=2", s.urlPrefix), []uint64{2, 1})
s.checkTopRegions(c, fmt.Sprintf("%s/regions/writeflow", s.urlPrefix), []uint64{2, 1, 3})
s.checkTopRegions(c, fmt.Sprintf("%s/regions/readflow", s.urlPrefix), []uint64{1, 3, 2})
s.checkTopRegions(c, fmt.Sprintf("%s/regions/writeflow?limit=2", s.urlPrefix), []uint64{2, 1})
s.checkTopRegions(c, fmt.Sprintf("%s/regions/confver", s.urlPrefix), []uint64{3, 2, 1})
s.checkTopRegions(c, fmt.Sprintf("%s/regions/confver?limit=2", s.urlPrefix), []uint64{3, 2})
s.checkTopRegions(c, fmt.Sprintf("%s/regions/version", s.urlPrefix), []uint64{2, 3, 1})
s.checkTopRegions(c, fmt.Sprintf("%s/regions/version?limit=2", s.urlPrefix), []uint64{2, 3})
}

func (s *testRegionSuite) checkTopFlow(c *C, url string, regionIDs []uint64) {
func (s *testRegionSuite) checkTopRegions(c *C, url string, regionIDs []uint64) {
regions := &regionsInfo{}
err := readJSONWithURL(url, regions)
c.Assert(err, IsNil)
Expand Down
2 changes: 2 additions & 0 deletions server/api/router.go
Original file line number Diff line number Diff line change
Expand Up @@ -87,6 +87,8 @@ func createRouter(prefix string, svr *server.Server) *mux.Router {
router.HandleFunc("/api/v1/regions/store/{id}", regionsHandler.GetStoreRegions).Methods("GET")
router.HandleFunc("/api/v1/regions/writeflow", regionsHandler.GetTopWriteFlow).Methods("GET")
router.HandleFunc("/api/v1/regions/readflow", regionsHandler.GetTopReadFlow).Methods("GET")
router.HandleFunc("/api/v1/regions/confver", regionsHandler.GetTopConfVer).Methods("GET")
router.HandleFunc("/api/v1/regions/version", regionsHandler.GetTopVersion).Methods("GET")
router.HandleFunc("/api/v1/regions/check/miss-peer", regionsHandler.GetMissPeerRegions).Methods("GET")
router.HandleFunc("/api/v1/regions/check/extra-peer", regionsHandler.GetExtraPeerRegions).Methods("GET")
router.HandleFunc("/api/v1/regions/check/pending-peer", regionsHandler.GetPendingPeerRegions).Methods("GET")
Expand Down
48 changes: 45 additions & 3 deletions server/cluster_info.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,10 +35,10 @@ type clusterInfo struct {
id core.IDAllocator
kv *core.KV
meta *metapb.Cluster
activeRegions int
opt *scheduleOption
regionStats *regionStatistics
labelLevelStats *labelLevelStatistics
prepareChecker *prepareChecker
}

func newClusterInfo(id core.IDAllocator, opt *scheduleOption, kv *core.KV) *clusterInfo {
Expand All @@ -48,6 +48,7 @@ func newClusterInfo(id core.IDAllocator, opt *scheduleOption, kv *core.KV) *clus
opt: opt,
kv: kv,
labelLevelStats: newLabelLevelStatistics(),
prepareChecker: newPrepareChecker(),
}
}

Expand Down Expand Up @@ -422,7 +423,7 @@ func (c *clusterInfo) GetFollowerStores(region *core.RegionInfo) []*core.StoreIn
func (c *clusterInfo) isPrepared() bool {
c.RLock()
defer c.RUnlock()
return float64(c.core.Regions.Length())*collectFactor <= float64(c.activeRegions)
return c.prepareChecker.check(c)
}

// handleStoreHeartbeat updates the store status.
Expand Down Expand Up @@ -519,7 +520,7 @@ func (c *clusterInfo) handleRegionHeartbeat(region *core.RegionInfo) error {
c.Lock()
defer c.Unlock()
if isNew {
c.activeRegions++
c.prepareChecker.collect(region)
}

if saveCache {
Expand Down Expand Up @@ -710,3 +711,44 @@ func (c *clusterInfo) RegionWriteStats() []*core.RegionStat {
// RegionStats is a thread-safe method
return c.core.HotCache.RegionStats(schedule.WriteFlow)
}

type prepareChecker struct {
reactiveRegions map[uint64]int
start time.Time
sum int
isPrepared bool
}

func newPrepareChecker() *prepareChecker {
return &prepareChecker{
start: time.Now(),
reactiveRegions: make(map[uint64]int),
}
}

func (checker *prepareChecker) check(c *clusterInfo) bool {
if checker.isPrepared || time.Since(checker.start) > collectTimeout {
return true
}
if float64(c.core.Regions.Length())*collectFactor > float64(checker.sum) {
return false
}
for _, store := range c.core.GetStores() {
if !store.IsUp() {
continue
}
storeID := store.GetId()
if float64(c.core.Regions.GetStoreRegionCount(storeID))*collectFactor > float64(checker.reactiveRegions[storeID]) {
return false
}
}
checker.isPrepared = true
return true
}

func (checker *prepareChecker) collect(region *core.RegionInfo) {
for _, p := range region.GetPeers() {
checker.reactiveRegions[p.GetStoreId()]++
}
checker.sum++
}
1 change: 1 addition & 0 deletions server/coordinator.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ import (
const (
runSchedulerCheckInterval = 3 * time.Second
collectFactor = 0.8
collectTimeout = 5 * time.Minute
historyKeepTime = 5 * time.Minute
maxScheduleRetries = 10

Expand Down
18 changes: 14 additions & 4 deletions server/coordinator_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -444,12 +444,19 @@ func (s *testCoordinatorSuite) TestShouldRun(c *C) {

co := newCoordinator(tc.clusterInfo, hbStreams, namespace.DefaultClassifier)

tc.addLeaderStore(1, 5)
tc.addLeaderStore(2, 2)
tc.addLeaderStore(3, 0)
tc.addLeaderStore(4, 0)
tc.LoadRegion(1, 1, 2, 3)
tc.LoadRegion(2, 1, 2, 3)
tc.LoadRegion(3, 1, 2, 3)
tc.LoadRegion(4, 1, 2, 3)
tc.LoadRegion(5, 1, 2, 3)
tc.LoadRegion(6, 2, 1, 4)
tc.LoadRegion(7, 2, 1, 4)
c.Assert(co.shouldRun(), IsFalse)
c.Assert(tc.core.Regions.GetStoreRegionCount(4), Equals, 2)

tbl := []struct {
regionID uint64
Expand All @@ -458,8 +465,11 @@ func (s *testCoordinatorSuite) TestShouldRun(c *C) {
{1, false},
{2, false},
{3, false},
{4, true},
{5, true},
{4, false},
{5, false},
// store4 needs collect two region
{6, false},
{7, true},
}

for _, t := range tbl {
Expand All @@ -471,7 +481,7 @@ func (s *testCoordinatorSuite) TestShouldRun(c *C) {
nr := &metapb.Region{Id: 6, Peers: []*metapb.Peer{}}
newRegion := core.NewRegionInfo(nr, nil)
tc.handleRegionHeartbeat(newRegion)
c.Assert(co.cluster.activeRegions, Equals, 6)
c.Assert(co.cluster.prepareChecker.sum, Equals, 7)

}

Expand Down Expand Up @@ -629,8 +639,8 @@ func (s *testCoordinatorSuite) TestRestart(c *C) {
tc.addRegionStore(2, 2)
tc.addRegionStore(3, 3)
tc.addLeaderRegion(1, 1)
tc.activeRegions = 1
region := tc.GetRegion(1)
tc.prepareChecker.collect(region)

// Add 1 replica on store 2.
co := newCoordinator(tc.clusterInfo, hbStreams, namespace.DefaultClassifier)
Expand Down
4 changes: 0 additions & 4 deletions server/core/region.go
Original file line number Diff line number Diff line change
Expand Up @@ -538,10 +538,6 @@ func (r *RegionsInfo) AddRegion(region *RegionInfo) []*metapb.Region {

r.regions.Put(region)

if region.leader == nil {
return overlaps
}

// Add to leaders and followers.
for _, peer := range region.GetVoters() {
storeID := peer.GetStoreId()
Expand Down
12 changes: 10 additions & 2 deletions server/core/region_option.go
Original file line number Diff line number Diff line change
Expand Up @@ -156,14 +156,22 @@ func SetApproximateKeys(v int64) RegionCreateOption {
// SetRegionConfVer sets the config version for the reigon.
func SetRegionConfVer(confVer uint64) RegionCreateOption {
return func(region *RegionInfo) {
region.meta.RegionEpoch.ConfVer = confVer
if region.meta.RegionEpoch == nil {
region.meta.RegionEpoch = &metapb.RegionEpoch{ConfVer: confVer, Version: 1}
} else {
region.meta.RegionEpoch.ConfVer = confVer
}
}
}

// SetRegionVersion sets the version for the reigon.
func SetRegionVersion(version uint64) RegionCreateOption {
return func(region *RegionInfo) {
region.meta.RegionEpoch.Version = version
if region.meta.RegionEpoch == nil {
region.meta.RegionEpoch = &metapb.RegionEpoch{ConfVer: 1, Version: version}
} else {
region.meta.RegionEpoch.Version = version
}
}
}

Expand Down

0 comments on commit acf87a2

Please sign in to comment.