From 832389e3c3808e148805d0706830335faa3ed2b1 Mon Sep 17 00:00:00 2001 From: ShuNing Date: Wed, 5 Sep 2018 14:58:44 +0800 Subject: [PATCH 1/2] server/coordinator: startup schedulers with considering the proportion of regions (#1225) --- server/cluster_info.go | 48 +++++++++++++++++++++++++++++++++++--- server/coordinator.go | 1 + server/coordinator_test.go | 18 ++++++++++---- server/core/region.go | 4 ---- 4 files changed, 60 insertions(+), 11 deletions(-) diff --git a/server/cluster_info.go b/server/cluster_info.go index d0d5a1cd9c5..61850210728 100644 --- a/server/cluster_info.go +++ b/server/cluster_info.go @@ -35,10 +35,10 @@ type clusterInfo struct { id core.IDAllocator kv *core.KV meta *metapb.Cluster - activeRegions int opt *scheduleOption regionStats *regionStatistics labelLevelStats *labelLevelStatistics + prepareChecker *prepareChecker } func newClusterInfo(id core.IDAllocator, opt *scheduleOption, kv *core.KV) *clusterInfo { @@ -48,6 +48,7 @@ func newClusterInfo(id core.IDAllocator, opt *scheduleOption, kv *core.KV) *clus opt: opt, kv: kv, labelLevelStats: newLabelLevelStatistics(), + prepareChecker: newPrepareChecker(), } } @@ -422,7 +423,7 @@ func (c *clusterInfo) GetFollowerStores(region *core.RegionInfo) []*core.StoreIn func (c *clusterInfo) isPrepared() bool { c.RLock() defer c.RUnlock() - return float64(c.core.Regions.Length())*collectFactor <= float64(c.activeRegions) + return c.prepareChecker.check(c) } // handleStoreHeartbeat updates the store status. @@ -520,7 +521,7 @@ func (c *clusterInfo) handleRegionHeartbeat(region *core.RegionInfo) error { c.Lock() defer c.Unlock() if isNew { - c.activeRegions++ + c.prepareChecker.collect(region) } if saveCache { @@ -711,3 +712,44 @@ func (c *clusterInfo) RegionWriteStats() []*core.RegionStat { // RegionStats is a thread-safe method return c.core.HotCache.RegionStats(schedule.WriteFlow) } + +type prepareChecker struct { + reactiveRegions map[uint64]int + start time.Time + sum int + isPrepared bool +} + +func newPrepareChecker() *prepareChecker { + return &prepareChecker{ + start: time.Now(), + reactiveRegions: make(map[uint64]int), + } +} + +func (checker *prepareChecker) check(c *clusterInfo) bool { + if checker.isPrepared || time.Since(checker.start) > collectTimeout { + return true + } + if float64(c.core.Regions.Length())*collectFactor > float64(checker.sum) { + return false + } + for _, store := range c.core.GetStores() { + if !store.IsUp() { + continue + } + storeID := store.GetId() + if float64(c.core.Regions.GetStoreRegionCount(storeID))*collectFactor > float64(checker.reactiveRegions[storeID]) { + return false + } + } + checker.isPrepared = true + return true +} + +func (checker *prepareChecker) collect(region *core.RegionInfo) { + for _, p := range region.GetPeers() { + checker.reactiveRegions[p.GetStoreId()]++ + } + checker.sum++ +} diff --git a/server/coordinator.go b/server/coordinator.go index f8585c3f2d0..8eaf8099322 100644 --- a/server/coordinator.go +++ b/server/coordinator.go @@ -34,6 +34,7 @@ import ( const ( runSchedulerCheckInterval = 3 * time.Second collectFactor = 0.8 + collectTimeout = 5 * time.Minute historyKeepTime = 5 * time.Minute maxScheduleRetries = 10 diff --git a/server/coordinator_test.go b/server/coordinator_test.go index dfd00f2335b..6fbe5b62e9c 100644 --- a/server/coordinator_test.go +++ b/server/coordinator_test.go @@ -444,12 +444,19 @@ func (s *testCoordinatorSuite) TestShouldRun(c *C) { co := newCoordinator(tc.clusterInfo, hbStreams, namespace.DefaultClassifier) + tc.addLeaderStore(1, 5) + tc.addLeaderStore(2, 2) + tc.addLeaderStore(3, 0) + tc.addLeaderStore(4, 0) tc.LoadRegion(1, 1, 2, 3) tc.LoadRegion(2, 1, 2, 3) tc.LoadRegion(3, 1, 2, 3) tc.LoadRegion(4, 1, 2, 3) tc.LoadRegion(5, 1, 2, 3) + tc.LoadRegion(6, 2, 1, 4) + tc.LoadRegion(7, 2, 1, 4) c.Assert(co.shouldRun(), IsFalse) + c.Assert(tc.core.Regions.GetStoreRegionCount(4), Equals, 2) tbl := []struct { regionID uint64 @@ -458,8 +465,11 @@ func (s *testCoordinatorSuite) TestShouldRun(c *C) { {1, false}, {2, false}, {3, false}, - {4, true}, - {5, true}, + {4, false}, + {5, false}, + // store4 needs collect two region + {6, false}, + {7, true}, } for _, t := range tbl { @@ -471,7 +481,7 @@ func (s *testCoordinatorSuite) TestShouldRun(c *C) { nr := &metapb.Region{Id: 6, Peers: []*metapb.Peer{}} newRegion := core.NewRegionInfo(nr, nil) tc.handleRegionHeartbeat(newRegion) - c.Assert(co.cluster.activeRegions, Equals, 6) + c.Assert(co.cluster.prepareChecker.sum, Equals, 7) } @@ -629,8 +639,8 @@ func (s *testCoordinatorSuite) TestRestart(c *C) { tc.addRegionStore(2, 2) tc.addRegionStore(3, 3) tc.addLeaderRegion(1, 1) - tc.activeRegions = 1 region := tc.GetRegion(1) + tc.prepareChecker.collect(region) // Add 1 replica on store 2. co := newCoordinator(tc.clusterInfo, hbStreams, namespace.DefaultClassifier) diff --git a/server/core/region.go b/server/core/region.go index e2cbd6bf722..021ab5a0b27 100644 --- a/server/core/region.go +++ b/server/core/region.go @@ -538,10 +538,6 @@ func (r *RegionsInfo) AddRegion(region *RegionInfo) []*metapb.Region { r.regions.Put(region) - if region.leader == nil { - return overlaps - } - // Add to leaders and followers. for _, peer := range region.GetVoters() { storeID := peer.GetStoreId() From 0d99adce94dd87fa3ab162830dd379fe167ec71a Mon Sep 17 00:00:00 2001 From: Ryan Leung Date: Wed, 5 Sep 2018 16:51:00 +0800 Subject: [PATCH 2/2] pd-ctl: support topconfver and topversion (#1233) --- pdctl/command/region_command.go | 52 ++++++++++++++++++++++++++++++++- server/api/api.raml | 32 ++++++++++++++++++++ server/api/region.go | 12 ++++++++ server/api/region_test.go | 27 ++++++++++------- server/api/router.go | 2 ++ server/core/region_option.go | 12 ++++++-- 6 files changed, 123 insertions(+), 14 deletions(-) diff --git a/pdctl/command/region_command.go b/pdctl/command/region_command.go index f9d73dde2b6..1ea4f396908 100644 --- a/pdctl/command/region_command.go +++ b/pdctl/command/region_command.go @@ -30,6 +30,8 @@ var ( regionsCheckPrefix = "pd/api/v1/regions/check" regionsWriteflowPrefix = "pd/api/v1/regions/writeflow" regionsReadflowPrefix = "pd/api/v1/regions/readflow" + regionsConfVerPrefix = "pd/api/v1/regions/confver" + regionsVersionPrefix = "pd/api/v1/regions/version" regionsSiblingPrefix = "pd/api/v1/regions/sibling" regionIDPrefix = "pd/api/v1/region/id" regionKeyPrefix = "pd/api/v1/region/key" @@ -60,6 +62,20 @@ func NewRegionCommand() *cobra.Command { Run: showRegionTopWriteCommandFunc, } r.AddCommand(topWrite) + + topConfVer := &cobra.Command{ + Use: "topconfver ", + Short: "show regions with top conf version", + Run: showRegionTopConfVerCommandFunc, + } + r.AddCommand(topConfVer) + + topVersion := &cobra.Command{ + Use: "topversion ", + Short: "show regions with top version", + Run: showRegionTopVersionCommandFunc, + } + r.AddCommand(topVersion) r.Flags().String("jq", "", "jq query") return r @@ -121,7 +137,41 @@ func showRegionTopReadCommandFunc(cmd *cobra.Command, args []string) { fmt.Println(r) } -// NewRegionWithKeyCommand returns a region with key subcommand of regionCmd +func showRegionTopConfVerCommandFunc(cmd *cobra.Command, args []string) { + prefix := regionsConfVerPrefix + if len(args) == 1 { + if _, err := strconv.Atoi(args[0]); err != nil { + fmt.Println("limit should be a number") + return + } + prefix += "?limit=" + args[0] + } + r, err := doRequest(cmd, prefix, http.MethodGet) + if err != nil { + fmt.Printf("Failed to get regions: %s\n", err) + return + } + fmt.Println(r) +} + +func showRegionTopVersionCommandFunc(cmd *cobra.Command, args []string) { + prefix := regionsVersionPrefix + if len(args) == 1 { + if _, err := strconv.Atoi(args[0]); err != nil { + fmt.Println("limit should be a number") + return + } + prefix += "?limit=" + args[0] + } + r, err := doRequest(cmd, prefix, http.MethodGet) + if err != nil { + fmt.Printf("Failed to get regions: %s\n", err) + return + } + fmt.Println(r) +} + +// NewRegionWithKeyCommand return a region with key subcommand of regionCmd func NewRegionWithKeyCommand() *cobra.Command { r := &cobra.Command{ Use: "key [--format=raw|pb|proto|protobuf] ", diff --git a/server/api/api.raml b/server/api/api.raml index 9e7dcff22da..5a186e02c38 100644 --- a/server/api/api.raml +++ b/server/api/api.raml @@ -831,6 +831,38 @@ types: description: The input is invalid. 500: description: PD server failed to proceed the request. + /confver: + get: + description: List regions with the largest conf version. + queryParameters: + limit?: + type: integer + default: 16 + responses: + 200: + body: + application/json: + type: Regions + 400: + description: The input is invalid. + 500: + description: PD server failed to proceed the request. + /version: + get: + description: List regions with the largest version. + queryParameters: + limit?: + type: integer + default: 16 + responses: + 200: + body: + application/json: + type: Regions + 400: + description: The input is invalid. + 500: + description: PD server failed to proceed the request. /check/{filter}: uriParameters: filter: diff --git a/server/api/region.go b/server/api/region.go index 276ced65099..5b929734fd4 100644 --- a/server/api/region.go +++ b/server/api/region.go @@ -255,6 +255,18 @@ func (h *regionsHandler) GetTopReadFlow(w http.ResponseWriter, r *http.Request) h.GetTopNRegions(w, r, func(a, b *core.RegionInfo) bool { return a.GetBytesRead() < b.GetBytesRead() }) } +func (h *regionsHandler) GetTopConfVer(w http.ResponseWriter, r *http.Request) { + h.GetTopNRegions(w, r, func(a, b *core.RegionInfo) bool { + return a.GetMeta().GetRegionEpoch().GetConfVer() < b.GetMeta().GetRegionEpoch().GetConfVer() + }) +} + +func (h *regionsHandler) GetTopVersion(w http.ResponseWriter, r *http.Request) { + h.GetTopNRegions(w, r, func(a, b *core.RegionInfo) bool { + return a.GetMeta().GetRegionEpoch().GetVersion() < b.GetMeta().GetRegionEpoch().GetVersion() + }) +} + func (h *regionsHandler) GetTopNRegions(w http.ResponseWriter, r *http.Request, less func(a, b *core.RegionInfo) bool) { cluster := h.svr.GetRaftCluster() if cluster == nil { diff --git a/server/api/region_test.go b/server/api/region_test.go index aca561b8d7a..4dd100d0749 100644 --- a/server/api/region_test.go +++ b/server/api/region_test.go @@ -52,10 +52,11 @@ func newTestRegionInfo(regionID, storeID uint64, start, end []byte, opts ...core StoreId: storeID, } metaRegion := &metapb.Region{ - Id: regionID, - StartKey: start, - EndKey: end, - Peers: []*metapb.Peer{leader}, + Id: regionID, + StartKey: start, + EndKey: end, + Peers: []*metapb.Peer{leader}, + RegionEpoch: &metapb.RegionEpoch{ConfVer: 1, Version: 1}, } newOpts := []core.RegionCreateOption{ core.SetApproximateKeys(10), @@ -120,18 +121,22 @@ func (s *testRegionSuite) TestStoreRegions(c *C) { } func (s *testRegionSuite) TestTopFlow(c *C) { - r1 := newTestRegionInfo(1, 1, []byte("a"), []byte("b"), core.SetWrittenBytes(1000), core.SetReadBytes(1000)) + r1 := newTestRegionInfo(1, 1, []byte("a"), []byte("b"), core.SetWrittenBytes(1000), core.SetReadBytes(1000), core.SetRegionConfVer(1), core.SetRegionVersion(1)) mustRegionHeartbeat(c, s.svr, r1) - r2 := newTestRegionInfo(2, 1, []byte("b"), []byte("c"), core.SetWrittenBytes(2000), core.SetReadBytes(0)) + r2 := newTestRegionInfo(2, 1, []byte("b"), []byte("c"), core.SetWrittenBytes(2000), core.SetReadBytes(0), core.SetRegionConfVer(2), core.SetRegionVersion(3)) mustRegionHeartbeat(c, s.svr, r2) - r3 := newTestRegionInfo(3, 1, []byte("c"), []byte("d"), core.SetWrittenBytes(500), core.SetReadBytes(800)) + r3 := newTestRegionInfo(3, 1, []byte("c"), []byte("d"), core.SetWrittenBytes(500), core.SetReadBytes(800), core.SetRegionConfVer(3), core.SetRegionVersion(2)) mustRegionHeartbeat(c, s.svr, r3) - s.checkTopFlow(c, fmt.Sprintf("%s/regions/writeflow", s.urlPrefix), []uint64{2, 1, 3}) - s.checkTopFlow(c, fmt.Sprintf("%s/regions/readflow", s.urlPrefix), []uint64{1, 3, 2}) - s.checkTopFlow(c, fmt.Sprintf("%s/regions/writeflow?limit=2", s.urlPrefix), []uint64{2, 1}) + s.checkTopRegions(c, fmt.Sprintf("%s/regions/writeflow", s.urlPrefix), []uint64{2, 1, 3}) + s.checkTopRegions(c, fmt.Sprintf("%s/regions/readflow", s.urlPrefix), []uint64{1, 3, 2}) + s.checkTopRegions(c, fmt.Sprintf("%s/regions/writeflow?limit=2", s.urlPrefix), []uint64{2, 1}) + s.checkTopRegions(c, fmt.Sprintf("%s/regions/confver", s.urlPrefix), []uint64{3, 2, 1}) + s.checkTopRegions(c, fmt.Sprintf("%s/regions/confver?limit=2", s.urlPrefix), []uint64{3, 2}) + s.checkTopRegions(c, fmt.Sprintf("%s/regions/version", s.urlPrefix), []uint64{2, 3, 1}) + s.checkTopRegions(c, fmt.Sprintf("%s/regions/version?limit=2", s.urlPrefix), []uint64{2, 3}) } -func (s *testRegionSuite) checkTopFlow(c *C, url string, regionIDs []uint64) { +func (s *testRegionSuite) checkTopRegions(c *C, url string, regionIDs []uint64) { regions := ®ionsInfo{} err := readJSONWithURL(url, regions) c.Assert(err, IsNil) diff --git a/server/api/router.go b/server/api/router.go index c9c259a846a..cef5d00ebbd 100644 --- a/server/api/router.go +++ b/server/api/router.go @@ -87,6 +87,8 @@ func createRouter(prefix string, svr *server.Server) *mux.Router { router.HandleFunc("/api/v1/regions/store/{id}", regionsHandler.GetStoreRegions).Methods("GET") router.HandleFunc("/api/v1/regions/writeflow", regionsHandler.GetTopWriteFlow).Methods("GET") router.HandleFunc("/api/v1/regions/readflow", regionsHandler.GetTopReadFlow).Methods("GET") + router.HandleFunc("/api/v1/regions/confver", regionsHandler.GetTopConfVer).Methods("GET") + router.HandleFunc("/api/v1/regions/version", regionsHandler.GetTopVersion).Methods("GET") router.HandleFunc("/api/v1/regions/check/miss-peer", regionsHandler.GetMissPeerRegions).Methods("GET") router.HandleFunc("/api/v1/regions/check/extra-peer", regionsHandler.GetExtraPeerRegions).Methods("GET") router.HandleFunc("/api/v1/regions/check/pending-peer", regionsHandler.GetPendingPeerRegions).Methods("GET") diff --git a/server/core/region_option.go b/server/core/region_option.go index 35d29194c72..f97d223dbaf 100644 --- a/server/core/region_option.go +++ b/server/core/region_option.go @@ -156,14 +156,22 @@ func SetApproximateKeys(v int64) RegionCreateOption { // SetRegionConfVer sets the config version for the reigon. func SetRegionConfVer(confVer uint64) RegionCreateOption { return func(region *RegionInfo) { - region.meta.RegionEpoch.ConfVer = confVer + if region.meta.RegionEpoch == nil { + region.meta.RegionEpoch = &metapb.RegionEpoch{ConfVer: confVer, Version: 1} + } else { + region.meta.RegionEpoch.ConfVer = confVer + } } } // SetRegionVersion sets the version for the reigon. func SetRegionVersion(version uint64) RegionCreateOption { return func(region *RegionInfo) { - region.meta.RegionEpoch.Version = version + if region.meta.RegionEpoch == nil { + region.meta.RegionEpoch = &metapb.RegionEpoch{ConfVer: 1, Version: version} + } else { + region.meta.RegionEpoch.Version = version + } } }