From 99200d5651e9e66165d5ea9ff1722575955bc044 Mon Sep 17 00:00:00 2001 From: JmPotato Date: Thu, 16 Dec 2021 15:08:36 +0800 Subject: [PATCH 1/9] tso: add a local TSO prefix to prevent being conflicted with other system key paths (#4464) * Add a local TSO prefix to prevent being conflicted with other system key paths (close #4109) Signed-off-by: JmPotato * Address the comments Signed-off-by: JmPotato --- server/tso/allocator_manager.go | 13 ++++++++++--- tests/server/tso/allocator_test.go | 2 +- 2 files changed, 11 insertions(+), 4 deletions(-) diff --git a/server/tso/allocator_manager.go b/server/tso/allocator_manager.go index 07ebcd3f3b9..9d2943b7b32 100644 --- a/server/tso/allocator_manager.go +++ b/server/tso/allocator_manager.go @@ -47,7 +47,8 @@ const ( patrolStep = 1 * time.Second defaultAllocatorLeaderLease = 3 leaderTickInterval = 50 * time.Millisecond - localTSOSuffixEtcdPrefix = "local-tso-suffix" + localTSOAllocatorEtcdPrefix = "lta" + localTSOSuffixEtcdPrefix = "lts" ) var ( @@ -328,7 +329,13 @@ func (am *AllocatorManager) getAllocatorPath(dcLocation string) string { if dcLocation == GlobalDCLocation { return am.rootPath } - return path.Join(am.rootPath, dcLocation) + return path.Join(am.getLocalTSOAllocatorPath(), dcLocation) +} + +// Add a prefix to the root path to prevent being conflicted +// with other system key paths such as leader, member, alloc_id, raft, etc. +func (am *AllocatorManager) getLocalTSOAllocatorPath() string { + return path.Join(am.rootPath, localTSOAllocatorEtcdPrefix) } // similar logic with leaderLoop in server/server.go @@ -1145,7 +1152,7 @@ func (am *AllocatorManager) transferLocalAllocator(dcLocation string, serverID u } func (am *AllocatorManager) nextLeaderKey(dcLocation string) string { - return path.Join(am.rootPath, dcLocation, "next-leader") + return path.Join(am.getAllocatorPath(dcLocation), "next-leader") } // EnableLocalTSO returns the value of AllocatorManager.enableLocalTSO. diff --git a/tests/server/tso/allocator_test.go b/tests/server/tso/allocator_test.go index 917c79fd427..44a450409b1 100644 --- a/tests/server/tso/allocator_test.go +++ b/tests/server/tso/allocator_test.go @@ -56,7 +56,7 @@ func (s *testAllocatorSuite) TestAllocatorLeader(c *C) { dcLocationConfig := map[string]string{ "pd2": "dc-1", "pd4": "dc-2", - "pd6": "dc-3", + "pd6": "leader", /* Test dc-location name is same as the special key */ } dcLocationNum := len(dcLocationConfig) cluster, err := tests.NewTestCluster(s.ctx, dcLocationNum*2, func(conf *config.Config, serverName string) { From 592e4eb9ed9a10f72f98aecbde033351c0be4ad2 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E6=B7=B7=E6=B2=8CDM?= Date: Thu, 16 Dec 2021 16:44:36 +0800 Subject: [PATCH 2/9] ctl: the duration of the pause scheduler must be specified (#4421) * ctl: the duration of the pause scheduler must be specified close #4420 Signed-off-by: HunDunDM * fix flag Signed-off-by: HunDunDM * address comment Signed-off-by: HunDunDM * fix test Signed-off-by: HunDunDM * address comment Signed-off-by: HunDunDM * address comment Signed-off-by: HunDunDM Co-authored-by: Ti Chi Robot --- server/api/scheduler.go | 4 +- tests/pdctl/scheduler/scheduler_test.go | 9 +++++ tools/pd-ctl/pdctl/command/scheduler.go | 52 ++++++++++++++----------- 3 files changed, 40 insertions(+), 25 deletions(-) diff --git a/server/api/scheduler.go b/server/api/scheduler.go index cc39d97ebec..019f1d3982d 100644 --- a/server/api/scheduler.go +++ b/server/api/scheduler.go @@ -321,7 +321,7 @@ func (h *schedulerHandler) redirectSchedulerDelete(w http.ResponseWriter, name, // @Failure 500 {string} string "PD server failed to proceed the request." // @Router /schedulers/{name} [post] func (h *schedulerHandler) PauseOrResume(w http.ResponseWriter, r *http.Request) { - var input map[string]int + var input map[string]int64 if err := apiutil.ReadJSONRespondError(h.r, w, r.Body, &input); err != nil { return } @@ -332,7 +332,7 @@ func (h *schedulerHandler) PauseOrResume(w http.ResponseWriter, r *http.Request) h.r.JSON(w, http.StatusBadRequest, "missing pause time") return } - if err := h.PauseOrResumeScheduler(name, int64(t)); err != nil { + if err := h.PauseOrResumeScheduler(name, t); err != nil { h.r.JSON(w, http.StatusInternalServerError, err.Error()) return } diff --git a/tests/pdctl/scheduler/scheduler_test.go b/tests/pdctl/scheduler/scheduler_test.go index 4ead8304dfb..471b4ab9149 100644 --- a/tests/pdctl/scheduler/scheduler_test.go +++ b/tests/pdctl/scheduler/scheduler_test.go @@ -93,6 +93,12 @@ func (s *schedulerTestSuite) TestScheduler(c *C) { return "" } + mustUsage := func(args []string) { + output, err := pdctl.ExecuteCommand(cmd, args...) + c.Assert(err, IsNil) + c.Assert(strings.Contains(string(output), "Usage"), IsTrue) + } + checkSchedulerCommand := func(args []string, expected map[string]bool) { if args != nil { mustExec(args, nil) @@ -365,10 +371,13 @@ func (s *schedulerTestSuite) TestScheduler(c *C) { c.Assert(schedulers, DeepEquals, expected) } + mustUsage([]string{"-u", pdAddr, "scheduler", "pause", "balance-leader-scheduler"}) mustExec([]string{"-u", pdAddr, "scheduler", "pause", "balance-leader-scheduler", "60"}, nil) checkSchedulerWithStatusCommand(nil, "paused", []string{ "balance-leader-scheduler", }) + + mustUsage([]string{"-u", pdAddr, "scheduler", "resume", "balance-leader-scheduler", "60"}) mustExec([]string{"-u", pdAddr, "scheduler", "resume", "balance-leader-scheduler"}, nil) checkSchedulerWithStatusCommand(nil, "paused", nil) diff --git a/tools/pd-ctl/pdctl/command/scheduler.go b/tools/pd-ctl/pdctl/command/scheduler.go index 7c4c4f2355f..1a6a64626f9 100644 --- a/tools/pd-ctl/pdctl/command/scheduler.go +++ b/tools/pd-ctl/pdctl/command/scheduler.go @@ -54,29 +54,45 @@ func NewSchedulerCommand() *cobra.Command { // NewPauseSchedulerCommand returns a command to pause a scheduler. func NewPauseSchedulerCommand() *cobra.Command { c := &cobra.Command{ - Use: "pause ", + Use: "pause ", Short: "pause a scheduler", - Run: pauseOrResumeSchedulerCommandFunc, + Run: pauseSchedulerCommandFunc, } return c } -func pauseOrResumeSchedulerCommandFunc(cmd *cobra.Command, args []string) { - if len(args) != 2 && len(args) != 1 { +func pauseSchedulerCommandFunc(cmd *cobra.Command, args []string) { + if len(args) != 2 { + cmd.Usage() + return + } + delay, err := strconv.ParseInt(args[1], 10, 64) + if err != nil || delay <= 0 { cmd.Usage() return } path := schedulersPrefix + "/" + args[0] - input := make(map[string]interface{}) - input["delay"] = 0 - if len(args) == 2 { - delay, err := strconv.Atoi(args[1]) - if err != nil { - cmd.Usage() - return - } - input["delay"] = delay + input := map[string]interface{}{"delay": delay} + postJSON(cmd, path, input) +} + +// NewResumeSchedulerCommand returns a command to resume a scheduler. +func NewResumeSchedulerCommand() *cobra.Command { + c := &cobra.Command{ + Use: "resume ", + Short: "resume a scheduler", + Run: resumeSchedulerCommandFunc, } + return c +} + +func resumeSchedulerCommandFunc(cmd *cobra.Command, args []string) { + if len(args) != 1 { + cmd.Usage() + return + } + path := schedulersPrefix + "/" + args[0] + input := map[string]interface{}{"delay": 0} postJSON(cmd, path, input) } @@ -91,16 +107,6 @@ func NewShowSchedulerCommand() *cobra.Command { return c } -// NewResumeSchedulerCommand returns a command to resume a scheduler. -func NewResumeSchedulerCommand() *cobra.Command { - c := &cobra.Command{ - Use: "resume ", - Short: "resume a scheduler", - Run: pauseOrResumeSchedulerCommandFunc, - } - return c -} - func showSchedulerCommandFunc(cmd *cobra.Command, args []string) { if len(args) != 0 { cmd.Println(cmd.UsageString()) From f569e3a3dfcec9683618f8a15ababa5e3ce70271 Mon Sep 17 00:00:00 2001 From: buffer <1045931706@qq.com> Date: Thu, 16 Dec 2021 21:14:36 +0800 Subject: [PATCH 3/9] test: increase test lease time to tolerate higher delayed time (#4472) Signed-off-by: bufferflies <1045931706@qq.com> --- server/testutil.go | 2 +- tools/pd-simulator/simulator/config.go | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/server/testutil.go b/server/testutil.go index c96441b0e0f..2534008a5b5 100644 --- a/server/testutil.go +++ b/server/testutil.go @@ -70,7 +70,7 @@ func NewTestSingleConfig(c *assertutil.Checker) *config.Config { InitialClusterState: embed.ClusterStateFlagNew, - LeaderLease: 1, + LeaderLease: 10, TSOSaveInterval: typeutil.NewDuration(200 * time.Millisecond), } diff --git a/tools/pd-simulator/simulator/config.go b/tools/pd-simulator/simulator/config.go index e92f740b21f..ad027a81c24 100644 --- a/tools/pd-simulator/simulator/config.go +++ b/tools/pd-simulator/simulator/config.go @@ -34,7 +34,7 @@ const ( defaultStoreIOMBPerSecond = 40 defaultStoreVersion = "2.1.0" // server - defaultLeaderLease = 1 + defaultLeaderLease = 3 defaultTSOSaveInterval = 200 * time.Millisecond defaultTickInterval = 100 * time.Millisecond defaultElectionInterval = 3 * time.Second From 1aa8ef0f4d9de82d05bb7ed06540799fe29cff46 Mon Sep 17 00:00:00 2001 From: buffer <1045931706@qq.com> Date: Fri, 17 Dec 2021 14:16:36 +0800 Subject: [PATCH 4/9] schedulers: add switch for hot scheduler to forbid read or write scheduler (#4262) * add switch for hot scheduler to forbid read or write scheduler Signed-off-by: bufferflies <1045931706@qq.com> * checkout should simple Signed-off-by: bufferflies <1045931706@qq.com> * disable forbid-rw-type show Signed-off-by: bufferflies <1045931706@qq.com> * add annation to describe why not move earlier Signed-off-by: bufferflies <1045931706@qq.com> * remove noneed changes Signed-off-by: bufferflies <1045931706@qq.com> * close #4261 Signed-off-by: bufferflies <1045931706@qq.com> * rwType replace Signed-off-by: bufferflies <1045931706@qq.com> --- server/schedulers/hot_region.go | 4 ++++ server/schedulers/hot_region_config.go | 9 +++++++++ tests/pdctl/scheduler/scheduler_test.go | 3 +++ 3 files changed, 16 insertions(+) diff --git a/server/schedulers/hot_region.go b/server/schedulers/hot_region.go index 9d8829f3ee6..e9e4ecc4cbe 100644 --- a/server/schedulers/hot_region.go +++ b/server/schedulers/hot_region.go @@ -157,6 +157,10 @@ func (h *hotScheduler) dispatch(typ statistics.RWType, cluster opt.Cluster) []*o defer h.Unlock() h.prepareForBalance(typ, cluster) + // it can not move earlier to support to use api and metrics. + if h.conf.IsForbidRWType(typ) { + return nil + } switch typ { case statistics.Read: diff --git a/server/schedulers/hot_region_config.go b/server/schedulers/hot_region_config.go index 8dc81bf6ce6..506f775e7d5 100644 --- a/server/schedulers/hot_region_config.go +++ b/server/schedulers/hot_region_config.go @@ -80,6 +80,7 @@ func initHotRegionScheduleConfig() *hotRegionSchedulerConfig { DstToleranceRatio: 1.05, // Tolerate 5% difference StrictPickingStore: true, EnableForTiFlash: true, + ForbidRWType: "none", } cfg.apply(defaultConfig) return cfg @@ -138,6 +139,8 @@ type hotRegionSchedulerConfig struct { // Separately control whether to start hotspot scheduling for TiFlash EnableForTiFlash bool `json:"enable-for-tiflash,string"` + // forbid read or write scheduler, only for test + ForbidRWType string `json:"forbid-rw-type,omitempty"` } func (conf *hotRegionSchedulerConfig) EncodeConfig() ([]byte, error) { @@ -278,6 +281,12 @@ func (conf *hotRegionSchedulerConfig) IsStrictPickingStoreEnabled() bool { return conf.StrictPickingStore } +func (conf *hotRegionSchedulerConfig) IsForbidRWType(rw statistics.RWType) bool { + conf.RLock() + defer conf.RUnlock() + return rw.String() == conf.ForbidRWType +} + func (conf *hotRegionSchedulerConfig) ServeHTTP(w http.ResponseWriter, r *http.Request) { router := mux.NewRouter() router.HandleFunc("/list", conf.handleGetConfig).Methods("GET") diff --git a/tests/pdctl/scheduler/scheduler_test.go b/tests/pdctl/scheduler/scheduler_test.go index 471b4ab9149..9ee1292b68b 100644 --- a/tests/pdctl/scheduler/scheduler_test.go +++ b/tests/pdctl/scheduler/scheduler_test.go @@ -344,6 +344,9 @@ func (s *schedulerTestSuite) TestScheduler(c *C) { mustExec([]string{"-u", pdAddr, "scheduler", "config", "balance-hot-region-scheduler", "set", "write-priorities", "key,byte"}, nil) mustExec([]string{"-u", pdAddr, "scheduler", "config", "balance-hot-region-scheduler"}, &conf1) c.Assert(conf1, DeepEquals, expected1) + mustExec([]string{"-u", pdAddr, "scheduler", "config", "balance-hot-region-scheduler", "set", "forbid-rw-type", "read"}, nil) + mustExec([]string{"-u", pdAddr, "scheduler", "config", "balance-hot-region-scheduler"}, &conf1) + c.Assert(conf1, DeepEquals, expected1) // test compatibility for _, store := range stores { version := versioninfo.HotScheduleWithQuery From 72452204054bdd06f98d4e6cc4876d33377607be Mon Sep 17 00:00:00 2001 From: lhy1024 Date: Mon, 20 Dec 2021 17:15:46 +0800 Subject: [PATCH 5/9] statistics: fix hot peer cache (#4446) * fix hot peer cache Signed-off-by: lhy1024 * fix Signed-off-by: lhy1024 * fix Signed-off-by: lhy1024 * add more test Signed-off-by: lhy1024 * fix ci Signed-off-by: lhy1024 * address comment Signed-off-by: lhy1024 * ref #4390 Signed-off-by: lhy1024 * add comment and test Signed-off-by: lhy1024 * address comments Signed-off-by: lhy1024 * fix Signed-off-by: lhy1024 * add more test Signed-off-by: lhy1024 * add comment Signed-off-by: lhy1024 Co-authored-by: ShuNing --- pkg/movingaverage/avg_over_time.go | 5 + server/statistics/hot_peer.go | 21 +- server/statistics/hot_peer_cache.go | 26 ++- server/statistics/hot_peer_cache_test.go | 258 +++++++++++++++-------- 4 files changed, 211 insertions(+), 99 deletions(-) diff --git a/pkg/movingaverage/avg_over_time.go b/pkg/movingaverage/avg_over_time.go index b64358baf8e..35f85ab8253 100644 --- a/pkg/movingaverage/avg_over_time.go +++ b/pkg/movingaverage/avg_over_time.go @@ -117,3 +117,8 @@ func (aot *AvgOverTime) Clone() *AvgOverTime { avgInterval: aot.avgInterval, } } + +// GetIntervalSum returns the sum of interval +func (aot *AvgOverTime) GetIntervalSum() time.Duration { + return aot.intervalSum +} diff --git a/server/statistics/hot_peer.go b/server/statistics/hot_peer.go index caf7fee747f..e7055a38af3 100644 --- a/server/statistics/hot_peer.go +++ b/server/statistics/hot_peer.go @@ -109,6 +109,11 @@ type HotPeerStat struct { // If the peer didn't been send by store heartbeat when it is already stored as hot peer stat, // we will handle it as cold peer and mark the inCold flag inCold bool + // source represents the statistics item source, such as directly, inherit, adopt. + source sourceKind + // If the item in storeA is just adopted from storeB, + // then other store, such as storeC, will be forbidden to adopt from storeA until the item in storeA is hot. + allowAdopt bool } // ID returns region ID. Implementing TopNItem. @@ -127,15 +132,18 @@ func (stat *HotPeerStat) Log(str string, level func(msg string, fields ...zap.Fi zap.Uint64("interval", stat.interval), zap.Uint64("region-id", stat.RegionID), zap.Uint64("store", stat.StoreID), + zap.Bool("is-leader", stat.isLeader), + zap.String("type", stat.Kind.String()), zap.Float64s("loads", stat.GetLoads()), zap.Float64s("loads-instant", stat.Loads), zap.Float64s("thresholds", stat.thresholds), zap.Int("hot-degree", stat.HotDegree), zap.Int("hot-anti-count", stat.AntiCount), - zap.Bool("just-transfer-leader", stat.justTransferLeader), - zap.Bool("is-leader", stat.isLeader), + zap.Duration("sum-interval", stat.getIntervalSum()), zap.Bool("need-delete", stat.IsNeedDelete()), - zap.String("type", stat.Kind.String()), + zap.String("source", stat.source.String()), + zap.Bool("allow-adopt", stat.allowAdopt), + zap.Bool("just-transfer-leader", stat.justTransferLeader), zap.Time("last-transfer-leader-time", stat.lastTransferLeaderTime)) } @@ -211,3 +219,10 @@ func (stat *HotPeerStat) hotStatReportInterval() int { } return WriteReportInterval } + +func (stat *HotPeerStat) getIntervalSum() time.Duration { + if len(stat.rollingLoads) == 0 || stat.rollingLoads[0] == nil { + return 0 + } + return stat.rollingLoads[0].LastAverage.GetIntervalSum() +} diff --git a/server/statistics/hot_peer_cache.go b/server/statistics/hot_peer_cache.go index 68ef4c28126..c5a08db5dd8 100644 --- a/server/statistics/hot_peer_cache.go +++ b/server/statistics/hot_peer_cache.go @@ -101,7 +101,9 @@ func (f *hotPeerCache) RegionStats(minHotDegree int) map[uint64][]*HotPeerStat { // Update updates the items in statistics. func (f *hotPeerCache) Update(item *HotPeerStat) { if item.IsNeedDelete() { - f.putInheritItem(item) + if item.AntiCount > 0 { // means it's deleted because expired rather than cold + f.putInheritItem(item) + } f.removeItem(item) item.Log("region heartbeat delete from cache", log.Debug) } else { @@ -185,25 +187,25 @@ func (f *hotPeerCache) CheckPeerFlow(peer *core.PeerInfo, region *core.RegionInf interval: interval, peers: peers, thresholds: thresholds, + source: direct, } - source := direct if oldItem == nil { inheritItem := f.takeInheritItem(region.GetID()) if inheritItem != nil { oldItem = inheritItem - source = inherit + newItem.source = inherit } else { for _, storeID := range f.getAllStoreIDs(region) { oldItem = f.getOldHotPeerStat(region.GetID(), storeID) - if oldItem != nil { - source = adopt + if oldItem != nil && oldItem.allowAdopt { + newItem.source = adopt break } } } } - return f.updateHotPeerStat(newItem, oldItem, source, deltaLoads, time.Duration(interval)*time.Second) + return f.updateHotPeerStat(newItem, oldItem, deltaLoads, time.Duration(interval)*time.Second) } // CheckColdPeer checks the collect the un-heartbeat peer and maintain it. @@ -240,7 +242,7 @@ func (f *hotPeerCache) CheckColdPeer(storeID uint64, reportRegions map[uint64]st for i, loads := range oldItem.thresholds { deltaLoads[i] = loads * float64(interval) } - stat := f.updateHotPeerStat(newItem, oldItem, direct, deltaLoads, time.Duration(interval)*time.Second) + stat := f.updateHotPeerStat(newItem, oldItem, deltaLoads, time.Duration(interval)*time.Second) if stat != nil { ret = append(ret, stat) } @@ -377,18 +379,20 @@ func (f *hotPeerCache) getDefaultTimeMedian() *movingaverage.TimeMedian { return movingaverage.NewTimeMedian(DefaultAotSize, rollingWindowsSize, time.Duration(f.reportIntervalSecs)*time.Second) } -func (f *hotPeerCache) updateHotPeerStat(newItem, oldItem *HotPeerStat, source sourceKind, deltaLoads []float64, interval time.Duration) *HotPeerStat { +func (f *hotPeerCache) updateHotPeerStat(newItem, oldItem *HotPeerStat, deltaLoads []float64, interval time.Duration) *HotPeerStat { regionStats := f.kind.RegionStats() if oldItem == nil { return f.updateNewHotPeerStat(newItem, deltaLoads, interval) } - if source == adopt { + if newItem.source == adopt { for _, dim := range oldItem.rollingLoads { newItem.rollingLoads = append(newItem.rollingLoads, dim.Clone()) } + newItem.allowAdopt = false } else { newItem.rollingLoads = oldItem.rollingLoads + newItem.allowAdopt = oldItem.allowAdopt } if newItem.justTransferLeader { @@ -519,12 +523,15 @@ func coldItem(newItem, oldItem *HotPeerStat) { newItem.AntiCount = oldItem.AntiCount - 1 if newItem.AntiCount <= 0 { newItem.needDelete = true + } else { + newItem.allowAdopt = true } } func hotItem(newItem, oldItem *HotPeerStat) { newItem.HotDegree = oldItem.HotDegree + 1 newItem.AntiCount = hotRegionAntiCount + newItem.allowAdopt = true if newItem.Kind == Read { newItem.AntiCount = hotRegionAntiCount * (RegionHeartBeatReportInterval / StoreHeartBeatReportInterval) } @@ -533,6 +540,7 @@ func hotItem(newItem, oldItem *HotPeerStat) { func initItemDegree(item *HotPeerStat) { item.HotDegree = 1 item.AntiCount = hotRegionAntiCount + item.allowAdopt = true if item.Kind == Read { item.AntiCount = hotRegionAntiCount * (RegionHeartBeatReportInterval / StoreHeartBeatReportInterval) } diff --git a/server/statistics/hot_peer_cache_test.go b/server/statistics/hot_peer_cache_test.go index a86dc2439d6..c3ce13f5fb5 100644 --- a/server/statistics/hot_peer_cache_test.go +++ b/server/statistics/hot_peer_cache_test.go @@ -65,6 +65,7 @@ const ( transferLeader operator = iota movePeer addReplica + removeReplica ) type testCacheCase struct { @@ -94,11 +95,11 @@ func testCache(c *C, t *testCacheCase) { Write: 3, // all peers } cache := NewHotPeerCache(t.kind) - region := buildRegion(nil, nil, t.kind) + region := buildRegion(t.kind, 3, 60) checkAndUpdate(c, cache, region, defaultSize[t.kind]) checkHit(c, cache, region, t.kind, false) // all peers are new - srcStore, region := schedule(t.operator, region, t.kind) + srcStore, region := schedule(c, t.operator, region, 10) res := checkAndUpdate(c, cache, region, t.expect) checkHit(c, cache, region, t.kind, true) // hit cache if t.expect != defaultSize[t.kind] { @@ -106,37 +107,65 @@ func testCache(c *C, t *testCacheCase) { } } -func checkAndUpdate(c *C, cache *hotPeerCache, region *core.RegionInfo, expect int) (res []*HotPeerStat) { +func orderingPeers(cache *hotPeerCache, region *core.RegionInfo) []*metapb.Peer { + var peers []*metapb.Peer + for _, peer := range region.GetPeers() { + if cache.getOldHotPeerStat(region.GetID(), peer.StoreId) != nil { + peers = append([]*metapb.Peer{peer}, peers...) + } else { + peers = append(peers, peer) + } + } + return peers +} + +func checkFlow(cache *hotPeerCache, region *core.RegionInfo, peers []*metapb.Peer) (res []*HotPeerStat) { reportInterval := region.GetInterval() interval := reportInterval.GetEndTimestamp() - reportInterval.GetStartTimestamp() res = append(res, cache.CollectExpiredItems(region)...) - for _, peer := range region.GetPeers() { + for _, peer := range peers { peerInfo := core.NewPeerInfo(peer, region.GetLoads(), interval) item := cache.CheckPeerFlow(peerInfo, region) if item != nil { res = append(res, item) } } - c.Assert(res, HasLen, expect) + return res +} + +func updateFlow(cache *hotPeerCache, res []*HotPeerStat) []*HotPeerStat { for _, p := range res { cache.Update(p) } return res } -func checkAndUpdateSync(cache *hotPeerCache, region *core.RegionInfo) (res []*HotPeerStat) { - reportInterval := region.GetInterval() - interval := reportInterval.GetEndTimestamp() - reportInterval.GetStartTimestamp() - res = append(res, cache.CollectExpiredItems(region)...) - for _, peer := range region.GetPeers() { - peerInfo := core.NewPeerInfo(peer, region.GetLoads(), interval) - item := cache.CheckPeerFlow(peerInfo, region) - if item != nil { - res = append(res, item) - cache.Update(item) - } +type check func(c *C, cache *hotPeerCache, region *core.RegionInfo, expect ...int) (res []*HotPeerStat) + +func checkAndUpdate(c *C, cache *hotPeerCache, region *core.RegionInfo, expect ...int) (res []*HotPeerStat) { + res = checkFlow(cache, region, region.GetPeers()) + if len(expect) != 0 { + c.Assert(res, HasLen, expect[0]) } - return res + return updateFlow(cache, res) +} + +// Check and update peers in the specified order that old item that he items that have not expired come first, and the items that have expired come second. +// This order is also similar to the previous version. By the way the order in now version is random. +func checkAndUpdateWithOrdering(c *C, cache *hotPeerCache, region *core.RegionInfo, expect ...int) (res []*HotPeerStat) { + res = checkFlow(cache, region, orderingPeers(cache, region)) + if len(expect) != 0 { + c.Assert(res, HasLen, expect[0]) + } + return updateFlow(cache, res) +} + +func checkAndUpdateSkipOne(c *C, cache *hotPeerCache, region *core.RegionInfo, expect ...int) (res []*HotPeerStat) { + res = checkFlow(cache, region, region.GetPeers()[1:]) + if len(expect) != 0 { + c.Assert(res, HasLen, expect[0]) + } + return updateFlow(cache, res) } func checkHit(c *C, cache *hotPeerCache, region *core.RegionInfo, kind RWType, isHit bool) { @@ -162,21 +191,32 @@ func checkNeedDelete(c *C, ret []*HotPeerStat, storeID uint64, needDelete bool) } } -func schedule(operator operator, region *core.RegionInfo, kind RWType) (srcStore uint64, _ *core.RegionInfo) { +func schedule(c *C, operator operator, region *core.RegionInfo, targets ...uint64) (srcStore uint64, _ *core.RegionInfo) { switch operator { case transferLeader: _, newLeader := pickFollower(region) - return region.GetLeader().StoreId, buildRegion(region.GetMeta(), newLeader, kind) + return region.GetLeader().StoreId, region.Clone(core.WithLeader(newLeader)) case movePeer: + c.Assert(targets, HasLen, 1) index, _ := pickFollower(region) - meta := region.GetMeta() - srcStore := meta.Peers[index].StoreId - meta.Peers[index] = &metapb.Peer{Id: 4, StoreId: 4} - return srcStore, buildRegion(meta, region.GetLeader(), kind) + srcStore := region.GetPeers()[index].StoreId + region := region.Clone(core.WithAddPeer(&metapb.Peer{Id: targets[0]*10 + 1, StoreId: targets[0]})) + region = region.Clone(core.WithRemoveStorePeer(srcStore)) + return srcStore, region case addReplica: - meta := region.GetMeta() - meta.Peers = append(meta.Peers, &metapb.Peer{Id: 4, StoreId: 4}) - return 0, buildRegion(meta, region.GetLeader(), kind) + c.Assert(targets, HasLen, 1) + region := region.Clone(core.WithAddPeer(&metapb.Peer{Id: targets[0]*10 + 1, StoreId: targets[0]})) + return 0, region + case removeReplica: + if len(targets) == 0 { + index, _ := pickFollower(region) + srcStore = region.GetPeers()[index].StoreId + } else { + srcStore = targets[0] + } + region = region.Clone(core.WithRemoveStorePeer(srcStore)) + return srcStore, region + default: return 0, nil } @@ -198,30 +238,39 @@ func pickFollower(region *core.RegionInfo) (index int, peer *metapb.Peer) { return dst, meta.Peers[dst] } -func buildRegion(meta *metapb.Region, leader *metapb.Peer, kind RWType) *core.RegionInfo { - const interval = uint64(60) - if meta == nil { - peer1 := &metapb.Peer{Id: 1, StoreId: 1} - peer2 := &metapb.Peer{Id: 2, StoreId: 2} - peer3 := &metapb.Peer{Id: 3, StoreId: 3} - - meta = &metapb.Region{ - Id: 1000, - Peers: []*metapb.Peer{peer1, peer2, peer3}, - StartKey: []byte(""), - EndKey: []byte(""), - RegionEpoch: &metapb.RegionEpoch{ConfVer: 6, Version: 6}, - } - leader = meta.Peers[rand.Intn(3)] +func buildRegion(kind RWType, peerCount int, interval uint64) *core.RegionInfo { + peers := newPeers(peerCount, + func(i int) uint64 { return uint64(10000 + i) }, + func(i int) uint64 { return uint64(i) }) + meta := &metapb.Region{ + Id: 1000, + Peers: peers, + StartKey: []byte(""), + EndKey: []byte(""), + RegionEpoch: &metapb.RegionEpoch{ConfVer: 6, Version: 6}, } + leader := meta.Peers[rand.Intn(3)] + switch kind { case Read: - return core.NewRegionInfo(meta, leader, core.SetReportInterval(interval), - core.SetReadBytes(interval*100*1024)) + return core.NewRegionInfo( + meta, + leader, + core.SetReportInterval(interval), + core.SetReadBytes(10*1024*1024*interval), + core.SetReadKeys(10*1024*1024*interval), + core.SetReadQuery(1024*interval), + ) case Write: - return core.NewRegionInfo(meta, leader, core.SetReportInterval(interval), - core.SetWrittenBytes(interval*100*1024)) + return core.NewRegionInfo( + meta, + leader, + core.SetReportInterval(interval), + core.SetWrittenBytes(10*1024*1024*interval), + core.SetWrittenKeys(10*1024*1024*interval), + core.SetWrittenQuery(1024*interval), + ) default: return nil } @@ -248,50 +297,50 @@ func (t *testHotPeerCache) TestUpdateHotPeerStat(c *C) { // skip interval=0 newItem := &HotPeerStat{needDelete: false, thresholds: []float64{0.0, 0.0, 0.0}, Kind: Read} - newItem = cache.updateHotPeerStat(newItem, nil, direct, []float64{0.0, 0.0, 0.0}, 0) + newItem = cache.updateHotPeerStat(newItem, nil, []float64{0.0, 0.0, 0.0}, 0) c.Check(newItem, IsNil) // new peer, interval is larger than report interval, but no hot newItem = &HotPeerStat{needDelete: false, thresholds: []float64{1.0, 1.0, 1.0}, Kind: Read} - newItem = cache.updateHotPeerStat(newItem, nil, direct, []float64{0.0, 0.0, 0.0}, 10*time.Second) + newItem = cache.updateHotPeerStat(newItem, nil, []float64{0.0, 0.0, 0.0}, 10*time.Second) c.Check(newItem, IsNil) // new peer, interval is less than report interval newItem = &HotPeerStat{needDelete: false, thresholds: []float64{0.0, 0.0, 0.0}, Kind: Read} - newItem = cache.updateHotPeerStat(newItem, nil, direct, []float64{60.0, 60.0, 60.0}, 4*time.Second) + newItem = cache.updateHotPeerStat(newItem, nil, []float64{60.0, 60.0, 60.0}, 4*time.Second) c.Check(newItem, NotNil) c.Check(newItem.HotDegree, Equals, 0) c.Check(newItem.AntiCount, Equals, 0) // sum of interval is less than report interval oldItem := newItem - newItem = cache.updateHotPeerStat(newItem, oldItem, direct, []float64{60.0, 60.0, 60.0}, 4*time.Second) + newItem = cache.updateHotPeerStat(newItem, oldItem, []float64{60.0, 60.0, 60.0}, 4*time.Second) c.Check(newItem.HotDegree, Equals, 0) c.Check(newItem.AntiCount, Equals, 0) // sum of interval is larger than report interval, and hot oldItem = newItem - newItem = cache.updateHotPeerStat(newItem, oldItem, direct, []float64{60.0, 60.0, 60.0}, 4*time.Second) + newItem = cache.updateHotPeerStat(newItem, oldItem, []float64{60.0, 60.0, 60.0}, 4*time.Second) c.Check(newItem.HotDegree, Equals, 1) c.Check(newItem.AntiCount, Equals, 2*m) // sum of interval is less than report interval oldItem = newItem - newItem = cache.updateHotPeerStat(newItem, oldItem, direct, []float64{60.0, 60.0, 60.0}, 4*time.Second) + newItem = cache.updateHotPeerStat(newItem, oldItem, []float64{60.0, 60.0, 60.0}, 4*time.Second) c.Check(newItem.HotDegree, Equals, 1) c.Check(newItem.AntiCount, Equals, 2*m) // sum of interval is larger than report interval, and hot oldItem = newItem - newItem = cache.updateHotPeerStat(newItem, oldItem, direct, []float64{60.0, 60.0, 60.0}, 10*time.Second) + newItem = cache.updateHotPeerStat(newItem, oldItem, []float64{60.0, 60.0, 60.0}, 10*time.Second) c.Check(newItem.HotDegree, Equals, 2) c.Check(newItem.AntiCount, Equals, 2*m) // sum of interval is larger than report interval, and cold oldItem = newItem newItem.thresholds = []float64{10.0, 10.0, 10.0} - newItem = cache.updateHotPeerStat(newItem, oldItem, direct, []float64{60.0, 60.0, 60.0}, 10*time.Second) + newItem = cache.updateHotPeerStat(newItem, oldItem, []float64{60.0, 60.0, 60.0}, 10*time.Second) c.Check(newItem.HotDegree, Equals, 1) c.Check(newItem.AntiCount, Equals, 2*m-1) // sum of interval is larger than report interval, and cold for i := 0; i < 2*m-1; i++ { oldItem = newItem - newItem = cache.updateHotPeerStat(newItem, oldItem, direct, []float64{60.0, 60.0, 60.0}, 10*time.Second) + newItem = cache.updateHotPeerStat(newItem, oldItem, []float64{60.0, 60.0, 60.0}, 10*time.Second) } c.Check(newItem.HotDegree, Less, 0) c.Check(newItem.AntiCount, Equals, 0) @@ -330,7 +379,7 @@ func (t *testHotPeerCache) testMetrics(c *C, interval, byteRate, expectThreshold if oldItem != nil && oldItem.rollingLoads[RegionReadBytes].isHot(thresholds[RegionReadBytes]) == true { break } - item := cache.updateHotPeerStat(newItem, oldItem, direct, []float64{byteRate * interval, 0.0, 0.0}, time.Duration(interval)*time.Second) + item := cache.updateHotPeerStat(newItem, oldItem, []float64{byteRate * interval, 0.0, 0.0}, time.Duration(interval)*time.Second) cache.Update(item) } thresholds := cache.calcHotThresholds(storeID) @@ -343,43 +392,78 @@ func (t *testHotPeerCache) testMetrics(c *C, interval, byteRate, expectThreshold } func (t *testHotPeerCache) TestRemoveFromCache(c *C) { + checkers := []check{checkAndUpdate, checkAndUpdateWithOrdering} + for _, checker := range checkers { + cache := NewHotPeerCache(Write) + region := buildRegion(Write, 3, 5) + // prepare + for i := 1; i <= 200; i++ { + checker(c, cache, region) + } + // make the interval sum of peers are different + checkAndUpdateSkipOne(c, cache, region) + var intervalSums []int + for _, peer := range region.GetPeers() { + oldItem := cache.getOldHotPeerStat(region.GetID(), peer.StoreId) + intervalSums = append(intervalSums, int(oldItem.getIntervalSum())) + } + c.Assert(intervalSums, HasLen, 3) + c.Assert(intervalSums[0], Not(Equals), intervalSums[1]) + c.Assert(intervalSums[0], Not(Equals), intervalSums[2]) + // check whether cold cache is cleared + var isClear bool + region = region.Clone(core.SetWrittenBytes(0), core.SetWrittenKeys(0), core.SetWrittenQuery(0)) + for i := 1; i <= 200; i++ { + checker(c, cache, region) + if len(cache.storesOfRegion[region.GetID()]) == 0 { + isClear = true + break + } + } + c.Assert(isClear, IsTrue) + } +} + +func (t *testHotPeerCache) TestRemoveFromCacheRandom(c *C) { peerCounts := []int{3, 5} - intervals := []uint64{120, 60, 10} + intervals := []uint64{120, 60, 10, 5} + checkers := []check{checkAndUpdate, checkAndUpdateWithOrdering} for _, peerCount := range peerCounts { for _, interval := range intervals { - cache := NewHotPeerCache(Write) - peers := newPeers(peerCount, - func(i int) uint64 { return uint64(10000 + i) }, - func(i int) uint64 { return uint64(i) }) - meta := &metapb.Region{ - Id: 1000, - Peers: peers, - StartKey: []byte(""), - EndKey: []byte(""), - RegionEpoch: &metapb.RegionEpoch{ConfVer: 6, Version: 6}, - } - region := core.NewRegionInfo( - meta, - peers[0], - core.SetReportInterval(interval), - core.SetWrittenBytes(10*1024*1024*interval), - core.SetWrittenKeys(10*1024*1024*interval), - core.SetWrittenQuery(1024*interval), - ) - for i := 1; i <= 200; i++ { - checkAndUpdate(c, cache, region, peerCount) - } - c.Assert(cache.storesOfRegion[region.GetID()], HasLen, peerCount) - var isClear bool - region = region.Clone(core.SetWrittenBytes(0), core.SetWrittenKeys(0), core.SetWrittenQuery(0)) - for i := 1; i <= 200; i++ { - checkAndUpdateSync(cache, region) - if len(cache.storesOfRegion[region.GetID()]) == 0 { - isClear = true - break + for _, checker := range checkers { + cache := NewHotPeerCache(Write) + region := buildRegion(Write, peerCount, interval) + + target := uint64(10) + movePeer := func() { + tmp := uint64(0) + tmp, region = schedule(c, removeReplica, region) + _, region = schedule(c, addReplica, region, target) + target = tmp + } + // prepare with random move peer to make the interval sum of peers are different + for i := 1; i <= 200; i++ { + if i%5 == 0 { + movePeer() + } + checker(c, cache, region) + } + c.Assert(cache.storesOfRegion[region.GetID()], HasLen, peerCount) + // check whether cold cache is cleared + var isClear bool + region = region.Clone(core.SetWrittenBytes(0), core.SetWrittenKeys(0), core.SetWrittenQuery(0)) + for i := 1; i <= 200; i++ { + if i%5 == 0 { + movePeer() + } + checker(c, cache, region) + if len(cache.storesOfRegion[region.GetID()]) == 0 { + isClear = true + break + } } + c.Assert(isClear, IsTrue) } - c.Assert(isClear, IsTrue) } } } From af174e6401299e0d2ac8ef72a5f4beb48be535e2 Mon Sep 17 00:00:00 2001 From: tomato <38561029+qidi1@users.noreply.github.com> Date: Mon, 20 Dec 2021 17:53:46 +0800 Subject: [PATCH 6/9] core:fix style of hot history region info of startkey and endkey,fix bug isleader always false (#4450) bugfix:change the style of data store in levledb bugfix:remove omit empty ref #4449 Signed-off-by: qidi1 <1083369179@qq.com> Co-authored-by: Ti Chi Robot --- conf/config.toml | 2 +- server/core/hot_region_storage.go | 43 +++++++++++++------------- server/core/hot_region_storage_test.go | 22 +++++++++++-- server/handler.go | 11 ++++--- 4 files changed, 47 insertions(+), 31 deletions(-) diff --git a/conf/config.toml b/conf/config.toml index 9cf246eb5a2..252d5a2938e 100644 --- a/conf/config.toml +++ b/conf/config.toml @@ -119,7 +119,7 @@ ## Controls the time interval between write hot regions info into leveldb # hot-regions-write-interval= "10m" ## The day of hot regions data to be reserved. 0 means close. -# hot-regions-reserved-days= "7" +# hot-regions-reserved-days= 7 ## The number of Leader scheduling tasks performed at the same time. # leader-schedule-limit = 4 ## The number of Region scheduling tasks performed at the same time. diff --git a/server/core/hot_region_storage.go b/server/core/hot_region_storage.go index 0e4289fd432..4800275eb28 100644 --- a/server/core/hot_region_storage.go +++ b/server/core/hot_region_storage.go @@ -62,19 +62,19 @@ type HistoryHotRegions struct { // HistoryHotRegion wraps hot region info // it is storage format of hot_region_storage type HistoryHotRegion struct { - UpdateTime int64 `json:"update_time,omitempty"` - RegionID uint64 `json:"region_id,omitempty"` - PeerID uint64 `json:"peer_id,omitempty"` - StoreID uint64 `json:"store_id,omitempty"` - IsLeader bool `json:"is_leader,omitempty"` - IsLearner bool `json:"is_learner,omitempty"` - HotRegionType string `json:"hot_region_type,omitempty"` - HotDegree int64 `json:"hot_degree,omitempty"` - FlowBytes float64 `json:"flow_bytes,omitempty"` - KeyRate float64 `json:"key_rate,omitempty"` - QueryRate float64 `json:"query_rate,omitempty"` - StartKey []byte `json:"start_key,omitempty"` - EndKey []byte `json:"end_key,omitempty"` + UpdateTime int64 `json:"update_time"` + RegionID uint64 `json:"region_id"` + PeerID uint64 `json:"peer_id"` + StoreID uint64 `json:"store_id"` + IsLeader bool `json:"is_leader"` + IsLearner bool `json:"is_learner"` + HotRegionType string `json:"hot_region_type"` + HotDegree int64 `json:"hot_degree"` + FlowBytes float64 `json:"flow_bytes"` + KeyRate float64 `json:"key_rate"` + QueryRate float64 `json:"query_rate"` + StartKey string `json:"start_key"` + EndKey string `json:"end_key"` // Encryption metadata for start_key and end_key. encryption_meta.iv is IV for start_key. // IV for end_key is calculated from (encryption_meta.iv + len(start_key)). // The field is only used by PD and should be ignored otherwise. @@ -254,16 +254,16 @@ func (h *HotRegionStorage) packHistoryHotRegions(historyHotRegions []HistoryHotR for i := range historyHotRegions { region := &metapb.Region{ Id: historyHotRegions[i].RegionID, - StartKey: historyHotRegions[i].StartKey, - EndKey: historyHotRegions[i].EndKey, + StartKey: HexRegionKey([]byte(historyHotRegions[i].StartKey)), + EndKey: HexRegionKey([]byte(historyHotRegions[i].EndKey)), EncryptionMeta: historyHotRegions[i].EncryptionMeta, } region, err := encryption.EncryptRegion(region, h.encryptionKeyManager) if err != nil { return err } - historyHotRegions[i].StartKey = region.StartKey - historyHotRegions[i].EndKey = region.EndKey + historyHotRegions[i].StartKey = String(region.StartKey) + historyHotRegions[i].EndKey = String(region.EndKey) key := HotRegionStorePath(hotRegionType, historyHotRegions[i].UpdateTime, historyHotRegions[i].RegionID) h.batchHotInfo[key] = &historyHotRegions[i] } @@ -338,21 +338,20 @@ func (it *HotRegionStorageIterator) Next() (*HistoryHotRegion, error) { } region := &metapb.Region{ Id: message.RegionID, - StartKey: message.StartKey, - EndKey: message.EndKey, + StartKey: []byte(message.StartKey), + EndKey: []byte(message.EndKey), EncryptionMeta: message.EncryptionMeta, } if err := encryption.DecryptRegion(region, it.encryptionKeyManager); err != nil { return nil, err } - message.StartKey = region.StartKey - message.EndKey = region.EndKey + message.StartKey = String(region.StartKey) + message.EndKey = String(region.EndKey) message.EncryptionMeta = nil return &message, nil } // HotRegionStorePath generate hot region store key for HotRegionStorage. -// TODO:find a better place to put this function. func HotRegionStorePath(hotRegionType string, updateTime int64, regionID uint64) string { return path.Join( "schedule", diff --git a/server/core/hot_region_storage_test.go b/server/core/hot_region_storage_test.go index 11dd7c641f3..62859c47881 100644 --- a/server/core/hot_region_storage_test.go +++ b/server/core/hot_region_storage_test.go @@ -15,6 +15,7 @@ package core import ( "context" + "encoding/json" "fmt" "log" "math/rand" @@ -63,8 +64,8 @@ func (m *MockPackHotRegionInfo) GenHistoryHotRegions(num int, updateTime time.Ti FlowBytes: rand.Float64() * 100, KeyRate: rand.Float64() * 100, QueryRate: rand.Float64() * 100, - StartKey: []byte(fmt.Sprintf("%20d", i)), - EndKey: []byte(fmt.Sprintf("%20d", i)), + StartKey: fmt.Sprintf("%20d", i), + EndKey: fmt.Sprintf("%20d", i), } if i%2 == 1 { m.historyHotWrites = append(m.historyHotWrites, historyHotRegion) @@ -103,20 +104,33 @@ func (t *testHotRegionStorage) TestHotRegionWrite(c *C) { RegionID: 1, StoreID: 1, HotRegionType: ReadType.String(), + StartKey: string([]byte{0x74, 0x80, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0xff, 0x15, 0x5f, 0x69, 0x80, 0x0, 0x0, 0x0, 0x0, 0xff, 0x0, 0x0, 0x1, 0x0, 0x0, 0x0, 0x0, 0x0, 0xfa}), + EndKey: string([]byte{0x74, 0x80, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0xff, 0x15, 0x5f, 0x69, 0x80, 0x0, 0x0, 0x0, 0x0, 0xff, 0x0, 0x0, 0x1, 0x0, 0x0, 0x0, 0x0, 0x0, 0xfa}), }, { UpdateTime: now.Add(10*time.Second).UnixNano() / int64(time.Millisecond), RegionID: 2, StoreID: 1, HotRegionType: ReadType.String(), + StartKey: string([]byte{0x74, 0x80, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0xff, 0x15, 0x5f, 0x69, 0x80, 0x0, 0x0, 0x0, 0x0, 0xff, 0x0, 0x0, 0x2, 0x0, 0x0, 0x0, 0x0, 0x0, 0xfa}), + EndKey: string([]byte{0x74, 0x80, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0xff, 0x15, 0x5f, 0x69, 0x80, 0x0, 0x0, 0x0, 0x0, 0xff, 0x0, 0x0, 0x2, 0x0, 0x0, 0x0, 0x0, 0x0, 0xfa}), }, { UpdateTime: now.Add(20*time.Second).UnixNano() / int64(time.Millisecond), RegionID: 3, StoreID: 1, HotRegionType: ReadType.String(), + StartKey: string([]byte{0x74, 0x80, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0xff, 0x83, 0x5f, 0x69, 0x80, 0x0, 0x0, 0x0, 0x0, 0xff, 0x0, 0x0, 0x1, 0x0, 0x0, 0x0, 0x0, 0x0, 0xfa}), + EndKey: string([]byte{0x74, 0x80, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0xff, 0x83, 0x5f, 0x69, 0x80, 0x0, 0x0, 0x0, 0x0, 0xff, 0x0, 0x0, 0x1, 0x0, 0x0, 0x0, 0x0, 0x0, 0xfa}), }, } + var copyHotRegionStorages []HistoryHotRegion + data, _ := json.Marshal(hotRegionStorages) + json.Unmarshal(data, ©HotRegionStorages) + for i, region := range hotRegionStorages { + copyHotRegionStorages[i].StartKey = region.StartKey + copyHotRegionStorages[i].EndKey = region.EndKey + } packHotRegionInfo.historyHotReads = hotRegionStorages packHotRegionInfo.historyHotWrites = []HistoryHotRegion{ { @@ -133,7 +147,9 @@ func (t *testHotRegionStorage) TestHotRegionWrite(c *C) { now.Add(40*time.Second).UnixNano()/int64(time.Millisecond)) index := 0 for next, err := iter.Next(); next != nil && err == nil; next, err = iter.Next() { - c.Assert(reflect.DeepEqual(&hotRegionStorages[index], next), IsTrue) + copyHotRegionStorages[index].StartKey = HexRegionKeyStr([]byte(copyHotRegionStorages[index].StartKey)) + copyHotRegionStorages[index].EndKey = HexRegionKeyStr([]byte(copyHotRegionStorages[index].EndKey)) + c.Assert(reflect.DeepEqual(©HotRegionStorages[index], next), IsTrue) index++ } c.Assert(err, IsNil) diff --git a/server/handler.go b/server/handler.go index 8125f26b454..df3285120b6 100644 --- a/server/handler.go +++ b/server/handler.go @@ -994,7 +994,8 @@ func (h *Handler) packHotRegions(hotPeersStat statistics.StoreHotPeersStat, hotR for _, peer := range meta.Peers { if peer.StoreId == hotPeerStat.StoreID { peerID = peer.Id - isLearner = peer.Role == metapb.PeerRole_Learner + isLearner = core.IsLearner(peer) + break } } stat := core.HistoryHotRegion{ @@ -1003,15 +1004,15 @@ func (h *Handler) packHotRegions(hotPeersStat statistics.StoreHotPeersStat, hotR RegionID: hotPeerStat.RegionID, StoreID: hotPeerStat.StoreID, PeerID: peerID, - IsLeader: meta.Id == region.GetLeader().Id, + IsLeader: peerID == region.GetLeader().Id, IsLearner: isLearner, HotDegree: int64(hotPeerStat.HotDegree), FlowBytes: hotPeerStat.ByteRate, KeyRate: hotPeerStat.KeyRate, QueryRate: hotPeerStat.QueryRate, - StartKey: meta.StartKey, - EndKey: meta.EndKey, - EncryptionMeta: meta.EncryptionMeta, + StartKey: string(region.GetStartKey()), + EndKey: string(region.GetEndKey()), + EncryptionMeta: meta.GetEncryptionMeta(), HotRegionType: hotRegionType, } historyHotRegions = append(historyHotRegions, stat) From 447fbd4592bcf98eee3013cb7c5a06a7d91c672e Mon Sep 17 00:00:00 2001 From: tomato <38561029+qidi1@users.noreply.github.com> Date: Tue, 21 Dec 2021 14:22:00 +0800 Subject: [PATCH 7/9] command:add ctl command to get hot history regions(ref #25281) (#4103) * command:add hot-history-regions command(ref #pingcap/tidb/issues/25281) Signed-off-by: qidi1 <1083369179@qq.com> --- tests/pdctl/hot/hot_test.go | 112 +++++++++++++++++ tools/pd-ctl/pdctl/command/global.go | 95 +++++++++----- tools/pd-ctl/pdctl/command/hot_command.go | 146 +++++++++++++++++++++- 3 files changed, 320 insertions(+), 33 deletions(-) diff --git a/tests/pdctl/hot/hot_test.go b/tests/pdctl/hot/hot_test.go index 07033d81aea..d938c74a517 100644 --- a/tests/pdctl/hot/hot_test.go +++ b/tests/pdctl/hot/hot_test.go @@ -17,6 +17,7 @@ package hot_test import ( "context" "encoding/json" + "strconv" "testing" "time" @@ -233,3 +234,114 @@ func (s *hotTestSuite) TestHotWithStoreID(c *C) { c.Assert(hotRegion.AsLeader[1].TotalBytesRate, Equals, float64(200000000)) c.Assert(hotRegion.AsLeader[2].TotalBytesRate, Equals, float64(100000000)) } + +func (s *hotTestSuite) TestHistoryHotRegions(c *C) { + statistics.Denoising = false + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + cluster, err := tests.NewTestCluster(ctx, 1, + func(cfg *config.Config, serverName string) { + cfg.Schedule.HotRegionCacheHitsThreshold = 0 + cfg.Schedule.HotRegionsWriteInterval.Duration = 1000 * time.Millisecond + cfg.Schedule.HotRegionsReservedDays = 1 + }, + ) + c.Assert(err, IsNil) + err = cluster.RunInitialServers() + c.Assert(err, IsNil) + cluster.WaitLeader() + pdAddr := cluster.GetConfig().GetClientURL() + cmd := pdctlCmd.GetRootCmd() + + stores := []*metapb.Store{ + { + Id: 1, + State: metapb.StoreState_Up, + LastHeartbeat: time.Now().UnixNano(), + }, + { + Id: 2, + State: metapb.StoreState_Up, + LastHeartbeat: time.Now().UnixNano(), + }, + { + Id: 3, + State: metapb.StoreState_Up, + LastHeartbeat: time.Now().UnixNano(), + }, + } + + leaderServer := cluster.GetServer(cluster.GetLeader()) + c.Assert(leaderServer.BootstrapCluster(), IsNil) + for _, store := range stores { + pdctl.MustPutStore(c, leaderServer.GetServer(), store) + } + defer cluster.Destroy() + startTime := time.Now().UnixNano() / int64(time.Millisecond) + pdctl.MustPutRegion(c, cluster, 1, 1, []byte("a"), []byte("b"), core.SetWrittenBytes(3000000000), core.SetReportInterval(statistics.WriteReportInterval)) + pdctl.MustPutRegion(c, cluster, 2, 2, []byte("c"), []byte("d"), core.SetWrittenBytes(6000000000), core.SetReportInterval(statistics.WriteReportInterval)) + pdctl.MustPutRegion(c, cluster, 3, 1, []byte("e"), []byte("f"), core.SetWrittenBytes(9000000000), core.SetReportInterval(statistics.WriteReportInterval)) + pdctl.MustPutRegion(c, cluster, 4, 3, []byte("g"), []byte("h"), core.SetWrittenBytes(9000000000), core.SetReportInterval(statistics.WriteReportInterval)) + // wait hot scheduler starts + time.Sleep(5000 * time.Millisecond) + endTime := time.Now().UnixNano() / int64(time.Millisecond) + start := strconv.FormatInt(startTime, 10) + end := strconv.FormatInt(endTime, 10) + args := []string{"-u", pdAddr, "hot", "history", + start, end, + "hot_region_type", "write", + "region_id", "1,2", + "store_id", "1,4", + "is_learner", "false", + } + output, e := pdctl.ExecuteCommand(cmd, args...) + hotRegions := core.HistoryHotRegions{} + c.Assert(e, IsNil) + c.Assert(json.Unmarshal(output, &hotRegions), IsNil) + regions := hotRegions.HistoryHotRegion + c.Assert(len(regions), Equals, 1) + c.Assert(regions[0].RegionID, Equals, uint64(1)) + c.Assert(regions[0].StoreID, Equals, uint64(1)) + c.Assert(regions[0].HotRegionType, Equals, "write") + args = []string{"-u", pdAddr, "hot", "history", + start, end, + "hot_region_type", "write", + "region_id", "1,2", + "store_id", "1,2", + } + output, e = pdctl.ExecuteCommand(cmd, args...) + c.Assert(e, IsNil) + c.Assert(json.Unmarshal(output, &hotRegions), IsNil) + regions = hotRegions.HistoryHotRegion + c.Assert(len(regions), Equals, 2) + isSort := regions[0].UpdateTime > regions[1].UpdateTime || regions[0].RegionID < regions[1].RegionID + c.Assert(isSort, Equals, true) + args = []string{"-u", pdAddr, "hot", "history", + start, end, + "hot_region_type", "read", + "is_leader", "false", + "peer_id", "12", + } + output, e = pdctl.ExecuteCommand(cmd, args...) + c.Assert(e, IsNil) + c.Assert(json.Unmarshal(output, &hotRegions), IsNil) + c.Assert(len(hotRegions.HistoryHotRegion), Equals, 0) + args = []string{"-u", pdAddr, "hot", "history"} + output, e = pdctl.ExecuteCommand(cmd, args...) + c.Assert(e, IsNil) + c.Assert(json.Unmarshal(output, &hotRegions), NotNil) + args = []string{"-u", pdAddr, "hot", "history", + start, end, + "region_id", "dada", + } + output, e = pdctl.ExecuteCommand(cmd, args...) + c.Assert(e, IsNil) + c.Assert(json.Unmarshal(output, &hotRegions), NotNil) + args = []string{"-u", pdAddr, "hot", "history", + start, end, + "region_ids", "12323", + } + output, e = pdctl.ExecuteCommand(cmd, args...) + c.Assert(e, IsNil) + c.Assert(json.Unmarshal(output, &hotRegions), NotNil) +} diff --git a/tools/pd-ctl/pdctl/command/global.go b/tools/pd-ctl/pdctl/command/global.go index 58141d09915..bc6dcaa82ea 100644 --- a/tools/pd-ctl/pdctl/command/global.go +++ b/tools/pd-ctl/pdctl/command/global.go @@ -80,26 +80,21 @@ func doRequest(cmd *cobra.Command, prefix string, method string, endpoints := getEndpoints(cmd) err := tryURLs(cmd, endpoints, func(endpoint string) error { - var err error - url := endpoint + "/" + prefix - if method == "" { - method = http.MethodGet - } - var req *http.Request + return doGet(endpoint, prefix, method, &resp, b) + }) + return resp, err +} - req, err = http.NewRequest(method, url, b.body) - if err != nil { - return err - } - if b.contentType != "" { - req.Header.Set("Content-Type", b.contentType) - } - // the resp would be returned by the outer function - resp, err = dial(req) - if err != nil { - return err - } - return nil +func doRequestSingleEndpoint(cmd *cobra.Command, endpoint, prefix, method string, + opts ...BodyOption) (string, error) { + b := &bodyOption{} + for _, o := range opts { + o(b) + } + var resp string + + err := requestURL(cmd, endpoint, func(endpoint string) error { + return doGet(endpoint, prefix, method, &resp, b) }) return resp, err } @@ -134,20 +129,11 @@ type DoFunc func(endpoint string) error func tryURLs(cmd *cobra.Command, endpoints []string, f DoFunc) error { var err error for _, endpoint := range endpoints { - var u *url.URL - u, err = url.Parse(endpoint) + endpoint, err = checkURL(endpoint) if err != nil { - cmd.Println("address format is wrong, should like 'http://127.0.0.1:2379' or '127.0.0.1:2379'") + cmd.Println(err.Error()) os.Exit(1) } - // tolerate some schemes that will be used by users, the TiKV SDK - // use 'tikv' as the scheme, it is really confused if we do not - // support it by pd-ctl - if u.Scheme == "" || u.Scheme == "pd" || u.Scheme == "tikv" { - u.Scheme = "http" - } - - endpoint = u.String() err = f(endpoint) if err != nil { continue @@ -160,6 +146,15 @@ func tryURLs(cmd *cobra.Command, endpoints []string, f DoFunc) error { return err } +func requestURL(cmd *cobra.Command, endpoint string, f DoFunc) error { + endpoint, err := checkURL(endpoint) + if err != nil { + cmd.Println(err.Error()) + os.Exit(1) + } + return f(endpoint) +} + func getEndpoints(cmd *cobra.Command) []string { addrs, err := cmd.Flags().GetString("pd") if err != nil { @@ -207,3 +202,43 @@ func postJSON(cmd *cobra.Command, prefix string, input map[string]interface{}) { } cmd.Println("Success!") } + +// doGet send a get request to server. +func doGet(endpoint, prefix, method string, resp *string, b *bodyOption) error { + var err error + url := endpoint + "/" + prefix + if method == "" { + method = http.MethodGet + } + var req *http.Request + + req, err = http.NewRequest(method, url, b.body) + if err != nil { + return err + } + if b.contentType != "" { + req.Header.Set("Content-Type", b.contentType) + } + // the resp would be returned by the outer function + *resp, err = dial(req) + if err != nil { + return err + } + return nil +} + +func checkURL(endpoint string) (string, error) { + var u *url.URL + u, err := url.Parse(endpoint) + if err != nil { + return "", errors.Errorf("address format is wrong, should like 'http://127.0.0.1:2379' or '127.0.0.1:2379'") + } + // tolerate some schemes that will be used by users, the TiKV SDK + // use 'tikv' as the scheme, it is really confused if we do not + // support it by pd-ctl + if u.Scheme == "" || u.Scheme == "pd" || u.Scheme == "tikv" { + u.Scheme = "http" + } + + return u.String(), nil +} diff --git a/tools/pd-ctl/pdctl/command/hot_command.go b/tools/pd-ctl/pdctl/command/hot_command.go index 93e44d84056..0d72f69f4b0 100644 --- a/tools/pd-ctl/pdctl/command/hot_command.go +++ b/tools/pd-ctl/pdctl/command/hot_command.go @@ -15,17 +15,23 @@ package command import ( + "bytes" + "encoding/json" "net/http" + "sort" "strconv" + "strings" "github.com/pingcap/errors" "github.com/spf13/cobra" + "github.com/tikv/pd/server/core" ) const ( - hotReadRegionsPrefix = "pd/api/v1/hotspot/regions/read" - hotWriteRegionsPrefix = "pd/api/v1/hotspot/regions/write" - hotStoresPrefix = "pd/api/v1/hotspot/stores" + hotReadRegionsPrefix = "pd/api/v1/hotspot/regions/read" + hotWriteRegionsPrefix = "pd/api/v1/hotspot/regions/write" + hotStoresPrefix = "pd/api/v1/hotspot/stores" + hotRegionsHistoryPrefix = "pd/api/v1/hotspot/regions/history" ) // NewHotSpotCommand return a hot subcommand of rootCmd @@ -37,6 +43,7 @@ func NewHotSpotCommand() *cobra.Command { cmd.AddCommand(NewHotWriteRegionCommand()) cmd.AddCommand(NewHotReadRegionCommand()) cmd.AddCommand(NewHotStoreCommand()) + cmd.AddCommand(NewHotRegionsHistoryCommand()) return cmd } @@ -107,6 +114,61 @@ func showHotStoresCommandFunc(cmd *cobra.Command, args []string) { cmd.Println(r) } +// NewHotRegionsHistoryCommand return a hot history regions subcommand of hotSpotCmd +func NewHotRegionsHistoryCommand() *cobra.Command { + cmd := &cobra.Command{ + // TODO + // Need a better description. + Use: "history [ ]", + Short: "show the hot history regions", + Run: showHotRegionsHistoryCommandFunc, + } + return cmd +} + +func showHotRegionsHistoryCommandFunc(cmd *cobra.Command, args []string) { + if len(args) < 2 || len(args)%2 != 0 { + cmd.Println(cmd.UsageString()) + return + } + input, err := parseHotRegionsHistoryArgs(args) + if err != nil { + cmd.Printf("Failed to get history hotspot: %s\n", err) + return + } + data, _ := json.Marshal(input) + endpoints := getEndpoints(cmd) + hotRegions := &core.HistoryHotRegions{} + for _, endpoint := range endpoints { + tempHotRegions := core.HistoryHotRegions{} + resp, err := doRequestSingleEndpoint(cmd, endpoint, hotRegionsHistoryPrefix, + http.MethodGet, WithBody("application/json", bytes.NewBuffer(data))) + if err != nil { + cmd.Printf("Failed to get history hotspot: %s\n", err) + return + } + err = json.Unmarshal([]byte(resp), &tempHotRegions) + if err != nil { + cmd.Printf("Failed to get history hotspot: %s\n", err) + return + } + hotRegions.HistoryHotRegion = append(hotRegions.HistoryHotRegion, tempHotRegions.HistoryHotRegion...) + } + historyHotRegions := hotRegions.HistoryHotRegion + sort.SliceStable(historyHotRegions, func(i, j int) bool { + if historyHotRegions[i].UpdateTime > historyHotRegions[j].UpdateTime { + return true + } + return historyHotRegions[i].RegionID < historyHotRegions[j].RegionID + }) + resp, err := json.Marshal(hotRegions) + if err != nil { + cmd.Printf("Failed to get history hotspot: %s\n", err) + return + } + cmd.Println(string(resp)) +} + func parseOptionalArgs(prefix string, args []string) (string, error) { argsLen := len(args) if argsLen > 0 { @@ -124,3 +186,81 @@ func parseOptionalArgs(prefix string, args []string) (string, error) { } return prefix, nil } + +func parseHotRegionsHistoryArgs(args []string) (map[string]interface{}, error) { + startTime, err := strconv.ParseInt(args[0], 10, 64) + if err != nil { + return nil, errors.Errorf("start_time should be a number,but got %s", args[0]) + } + endTime, err := strconv.ParseInt(args[1], 10, 64) + if err != nil { + return nil, errors.Errorf("end_time should be a number,but got %s", args[1]) + } + input := map[string]interface{}{ + "start_time": startTime, + "end_time": endTime, + } + stringToIntSlice := func(s string) ([]int64, error) { + results := make([]int64, 0) + args := strings.Split(s, ",") + for _, arg := range args { + result, err := strconv.ParseInt(arg, 10, 64) + if err != nil { + return nil, err + } + results = append(results, result) + } + return results, nil + } + for index := 2; index < len(args); index += 2 { + switch args[index] { + case "hot_region_type": + input["hot_region_type"] = []string{args[index+1]} + case "region_id": + results, err := stringToIntSlice(args[index+1]) + if err != nil { + return nil, errors.Errorf("region_id should be a number slice,but got %s", args[index+1]) + } + input["region_ids"] = results + case "store_id": + results, err := stringToIntSlice(args[index+1]) + if err != nil { + return nil, errors.Errorf("store_id should be a number slice,but got %s", args[index+1]) + } + input["store_ids"] = results + case "peer_id": + results, err := stringToIntSlice(args[index+1]) + if err != nil { + return nil, errors.Errorf("peer_id should be a number slice,but got %s", args[index+1]) + } + input["peer_ids"] = results + case "is_leader": + isLeader, err := strconv.ParseBool(args[index+1]) + if err != nil { + return nil, errors.Errorf("is_leader should be a bool,but got %s", args[index+1]) + } + input["is_leaders"] = []bool{isLeader} + case "is_learner": + isLearner, err := strconv.ParseBool(args[index+1]) + if err != nil { + return nil, errors.Errorf("is_learner should be a bool,but got %s", args[index+1]) + } + input["is_learners"] = []bool{isLearner} + default: + return nil, errors.Errorf("key should be one of hot_region_type,region_id,store_id,peer_id,is_leader,is_learner") + } + } + if _, ok := input["is_leaders"]; !ok { + input["is_leaders"] = []bool{ + true, + false, + } + } + if _, ok := input["is_learners"]; !ok { + input["is_learners"] = []bool{ + true, + false, + } + } + return input, nil +} From 95293d51c17744599e31b76e3d18ae612c516a0c Mon Sep 17 00:00:00 2001 From: buffer <1045931706@qq.com> Date: Tue, 21 Dec 2021 16:53:46 +0800 Subject: [PATCH 8/9] ctl: pd-ctl cmd will work well in char device mode (#4479) * pd-ctl cmd will work well in crontab && close 4478 Signed-off-by: bufferflies <1045931706@qq.com> * add unit test && close #4478 Signed-off-by: bufferflies <1045931706@qq.com> * string nil and add return Signed-off-by: bufferflies <1045931706@qq.com> Co-authored-by: Ti Chi Robot --- tools/pd-ctl/main.go | 11 ++++------- tools/pd-ctl/pdctl/ctl.go | 12 ++++++++++++ tools/pd-ctl/pdctl/ctl_test.go | 29 +++++++++++++++++++++++++++++ 3 files changed, 45 insertions(+), 7 deletions(-) diff --git a/tools/pd-ctl/main.go b/tools/pd-ctl/main.go index 9d969483be0..3ee1956a56e 100644 --- a/tools/pd-ctl/main.go +++ b/tools/pd-ctl/main.go @@ -16,10 +16,8 @@ package main import ( "fmt" - "io" "os" "os/signal" - "strings" "syscall" "github.com/tikv/pd/tools/pd-ctl/pdctl" @@ -49,16 +47,15 @@ func main() { } }() - var input []string + var inputs []string stat, _ := os.Stdin.Stat() if (stat.Mode() & os.ModeCharDevice) == 0 { - b, err := io.ReadAll(os.Stdin) + in, err := pdctl.ReadStdin(os.Stdin) if err != nil { fmt.Println(err) return } - input = strings.Split(strings.TrimSpace(string(b)), " ") + inputs = in } - - pdctl.MainStart(append(os.Args[1:], input...)) + pdctl.MainStart(append(os.Args[1:], inputs...)) } diff --git a/tools/pd-ctl/pdctl/ctl.go b/tools/pd-ctl/pdctl/ctl.go index b6cb2fd162f..ef3c1467208 100644 --- a/tools/pd-ctl/pdctl/ctl.go +++ b/tools/pd-ctl/pdctl/ctl.go @@ -198,3 +198,15 @@ func genCompleter(cmd *cobra.Command) []readline.PrefixCompleterInterface { } return pc } + +// ReadStdin convert stdin to string array +func ReadStdin(r io.Reader) (input []string, err error) { + b, err := io.ReadAll(r) + if err != nil { + return nil, err + } + if s := strings.TrimSpace(string(b)); len(s) > 0 { + input = strings.Split(s, " ") + } + return input, nil +} diff --git a/tools/pd-ctl/pdctl/ctl_test.go b/tools/pd-ctl/pdctl/ctl_test.go index cf3d2840811..90369bab46c 100644 --- a/tools/pd-ctl/pdctl/ctl_test.go +++ b/tools/pd-ctl/pdctl/ctl_test.go @@ -15,6 +15,8 @@ package pdctl import ( + "io" + "strings" "testing" "github.com/spf13/cobra" @@ -68,3 +70,30 @@ func TestGenCompleter(t *testing.T) { } } } + +func TestReadStdin(t *testing.T) { + s := []struct { + in io.Reader + targets []string + }{{ + in: strings.NewReader(""), + targets: []string{}, + }, { + in: strings.NewReader("a b c"), + targets: []string{"a", "b", "c"}, + }} + for _, v := range s { + in, err := ReadStdin(v.in) + if err != nil { + t.Errorf("ReadStdin err:%v", err) + } + if len(v.targets) != len(in) { + t.Errorf("ReadStdin = %v, want %s, nil", in, v.targets) + } + for i, target := range v.targets { + if target != in[i] { + t.Errorf("ReadStdin = %v, want %s, nil", in, v.targets) + } + } + } +} From 39fb207563d5eb4f6e15cfc71e1236f449f20b8e Mon Sep 17 00:00:00 2001 From: Yongbo Jiang Date: Wed, 22 Dec 2021 14:57:47 +0800 Subject: [PATCH 9/9] api: Revert for showing State in StoreInfo (#4485) * Revert "fix #3816: recovered pdctl/store_test && add comment to function onlu used by test" This reverts commit acc531f55207446a4160905973aef76cffe91569. Signed-off-by: Cabinfever_B * Revert "fix #3816: fix code format problem" This reverts commit 0f0f65a17461904b5933bfb971a0e8be0a909654. Signed-off-by: Cabinfever_B * Revert "fix #3816: fix code format problem" This reverts commit b36673f52d2ba9e1861ecdab60f043df468902af. Signed-off-by: Cabinfever_B * Revert "fix #3816: fix code format problem" This reverts commit 8c281746bfcbbf9ff2807449c0d9612761d8e540. Signed-off-by: Cabinfever_B * Revert "Fix #3816 delete unnecessary field and fix unit test" This reverts commit acc942e948114939f88fd740160e270cd98fd47b. Signed-off-by: Cabinfever_B * revert #4334 Signed-off-by: Cabinfever_B * revert #4334: empty commit for add signed-off-by Signed-off-by: Cabinfever_B Co-authored-by: JmPotato --- server/api/store.go | 57 ++++------------------------- server/api/store_test.go | 63 +++++---------------------------- server/api/trend.go | 4 +-- tests/cluster.go | 2 +- tests/pdctl/helper.go | 3 +- tests/pdctl/store/store_test.go | 6 ++-- 6 files changed, 22 insertions(+), 113 deletions(-) diff --git a/server/api/store.go b/server/api/store.go index 50d9297f376..b3caca88643 100644 --- a/server/api/store.go +++ b/server/api/store.go @@ -36,56 +36,10 @@ import ( "github.com/unrolled/render" ) -// MetaStore contains meta information about a store which needed to show. +// MetaStore contains meta information about a store. type MetaStore struct { - StoreID uint64 `json:"id,omitempty"` - Address string `json:"address,omitempty"` - Labels []*metapb.StoreLabel `json:"labels,omitempty"` - Version string `json:"version,omitempty"` - PeerAddress string `json:"peer_address,omitempty"` - StatusAddress string `json:"status_address,omitempty"` - GitHash string `json:"git_hash,omitempty"` - StartTimestamp int64 `json:"start_timestamp,omitempty"` - DeployPath string `json:"deploy_path,omitempty"` - LastHeartbeat int64 `json:"last_heartbeat,omitempty"` - PhysicallyDestroyed bool `json:"physically_destroyed,omitempty"` - StateName string `json:"state_name"` -} - -// NewMetaStore convert metapb.Store to MetaStore without State -func NewMetaStore(store *metapb.Store, stateName string) *MetaStore { - metaStore := &MetaStore{StateName: stateName} - metaStore.StoreID = store.GetId() - metaStore.Address = store.GetAddress() - metaStore.Labels = store.GetLabels() - metaStore.Version = store.GetVersion() - metaStore.PeerAddress = store.GetPeerAddress() - metaStore.StatusAddress = store.GetStatusAddress() - metaStore.GitHash = store.GetGitHash() - metaStore.StartTimestamp = store.GetStartTimestamp() - metaStore.DeployPath = store.GetDeployPath() - metaStore.LastHeartbeat = store.GetLastHeartbeat() - metaStore.PhysicallyDestroyed = store.GetPhysicallyDestroyed() - return metaStore -} - -// ConvertToMetapbStore convert to metapb.Store -// For test only. -func (m *MetaStore) ConvertToMetapbStore() *metapb.Store { - metapbStore := &metapb.Store{ - Id: m.StoreID, - Address: m.Address, - State: metapb.StoreState(metapb.StoreState_value[m.StateName]), - Labels: m.Labels, - Version: m.Version, - PeerAddress: m.PeerAddress, - StatusAddress: m.StatusAddress, - GitHash: m.GitHash, - StartTimestamp: m.StartTimestamp, - DeployPath: m.DeployPath, - LastHeartbeat: m.LastHeartbeat, - } - return metapbStore + *metapb.Store + StateName string `json:"state_name"` } // StoreStatus contains status about a store. @@ -123,7 +77,10 @@ const ( func newStoreInfo(opt *config.ScheduleConfig, store *core.StoreInfo) *StoreInfo { s := &StoreInfo{ - Store: NewMetaStore(store.GetMeta(), store.GetState().String()), + Store: &MetaStore{ + Store: store.GetMeta(), + StateName: store.GetState().String(), + }, Status: &StoreStatus{ Capacity: typeutil.ByteSize(store.GetCapacity()), Available: typeutil.ByteSize(store.GetAvailable()), diff --git a/server/api/store_test.go b/server/api/store_test.go index c59d7dbfcc6..80d741ceecf 100644 --- a/server/api/store_test.go +++ b/server/api/store_test.go @@ -32,7 +32,6 @@ import ( "github.com/tikv/pd/server" "github.com/tikv/pd/server/config" "github.com/tikv/pd/server/core" - "github.com/tikv/pd/server/versioninfo" ) var _ = Suite(&testStoreSuite{}) @@ -115,8 +114,7 @@ func checkStoresInfo(c *C, ss []*StoreInfo, want []*metapb.Store) { } } for _, s := range ss { - metapbStore := s.Store.ConvertToMetapbStore() - obtained := proto.Clone(metapbStore).(*metapb.Store) + obtained := proto.Clone(s.Store.Store).(*metapb.Store) expected := proto.Clone(mapWant[obtained.Id]).(*metapb.Store) // Ignore lastHeartbeat obtained.LastHeartbeat, expected.LastHeartbeat = 0, 0 @@ -168,51 +166,6 @@ func (s *testStoreSuite) TestStoreGet(c *C) { checkStoresInfo(c, []*StoreInfo{info}, s.stores[:1]) } -func (s *testStoreSuite) TestStoreInfoGet(c *C) { - timeStamp := time.Now().Unix() - url := fmt.Sprintf("%s/store/1112", s.urlPrefix) - _, errPut := s.grpcSvr.PutStore(context.Background(), &pdpb.PutStoreRequest{ - Header: &pdpb.RequestHeader{ClusterId: s.svr.ClusterID()}, - Store: &metapb.Store{ - Id: 1112, - Address: fmt.Sprintf("tikv%d", 1112), - State: 1, - Labels: nil, - Version: versioninfo.MinSupportedVersion(versioninfo.Version5_0).String(), - StatusAddress: fmt.Sprintf("tikv%d", 1112), - GitHash: "45ce5b9584d618bc777877bea77cb94f61b8410", - StartTimestamp: timeStamp, - DeployPath: "/home/test", - LastHeartbeat: timeStamp, - }, - }) - c.Assert(errPut, IsNil) - - info := new(StoreInfo) - - err := readJSON(testDialClient, url, info) - c.Assert(err, IsNil) - c.Assert(info.Store.StateName, Equals, metapb.StoreState_Offline.String()) - c.Assert(info.Store.StoreID, Equals, uint64(1112)) - c.Assert(info.Store.Address, Equals, "tikv1112") - c.Assert(info.Store.Version, Equals, versioninfo.MinSupportedVersion(versioninfo.Version5_0).String()) - c.Assert(info.Store.StatusAddress, Equals, fmt.Sprintf("tikv%d", 1112)) - c.Assert(info.Store.GitHash, Equals, "45ce5b9584d618bc777877bea77cb94f61b8410") - c.Assert(info.Store.StartTimestamp, Equals, timeStamp) - c.Assert(info.Store.DeployPath, Equals, "/home/test") - c.Assert(info.Store.LastHeartbeat, Equals, timeStamp) - - resp, err := testDialClient.Get(url) - c.Assert(err, IsNil) - defer resp.Body.Close() - b, err := io.ReadAll(resp.Body) - c.Assert(err, IsNil) - str := string(b) - c.Assert(strings.Contains(str, "\"state\""), Equals, false) - s.cleanup() - s.SetUpSuite(c) -} - func (s *testStoreSuite) TestStoreLabel(c *C) { url := fmt.Sprintf("%s/store/1", s.urlPrefix) var info StoreInfo @@ -309,7 +262,7 @@ func (s *testStoreSuite) TestStoreDelete(c *C) { err := readJSON(testDialClient, url, store) c.Assert(err, IsNil) c.Assert(store.Store.PhysicallyDestroyed, IsFalse) - c.Assert(store.Store.StateName, Equals, metapb.StoreState_Offline.String()) + c.Assert(store.Store.State, Equals, metapb.StoreState_Offline) // up store success because it is offline but not physically destroyed status := requestStatusBody(c, testDialClient, http.MethodPost, fmt.Sprintf("%s/state?state=Up", url)) @@ -320,7 +273,7 @@ func (s *testStoreSuite) TestStoreDelete(c *C) { store = new(StoreInfo) err = readJSON(testDialClient, url, store) c.Assert(err, IsNil) - c.Assert(store.Store.StateName, Equals, metapb.StoreState_Up.String()) + c.Assert(store.Store.State, Equals, metapb.StoreState_Up) c.Assert(store.Store.PhysicallyDestroyed, IsFalse) // offline store with physically destroyed @@ -328,7 +281,7 @@ func (s *testStoreSuite) TestStoreDelete(c *C) { c.Assert(status, Equals, http.StatusOK) err = readJSON(testDialClient, url, store) c.Assert(err, IsNil) - c.Assert(store.Store.StateName, Equals, metapb.StoreState_Offline.String()) + c.Assert(store.Store.State, Equals, metapb.StoreState_Offline) c.Assert(store.Store.PhysicallyDestroyed, IsTrue) // try to up store again failed because it is physically destroyed @@ -344,7 +297,7 @@ func (s *testStoreSuite) TestStoreSetState(c *C) { info := StoreInfo{} err := readJSON(testDialClient, url, &info) c.Assert(err, IsNil) - c.Assert(info.Store.StateName, Equals, metapb.StoreState_Up.String()) + c.Assert(info.Store.State, Equals, metapb.StoreState_Up) // Set to Offline. info = StoreInfo{} @@ -352,7 +305,7 @@ func (s *testStoreSuite) TestStoreSetState(c *C) { c.Assert(err, IsNil) err = readJSON(testDialClient, url, &info) c.Assert(err, IsNil) - c.Assert(info.Store.StateName, Equals, metapb.StoreState_Offline.String()) + c.Assert(info.Store.State, Equals, metapb.StoreState_Offline) // store not found info = StoreInfo{} @@ -367,7 +320,7 @@ func (s *testStoreSuite) TestStoreSetState(c *C) { c.Assert(err, NotNil) err = readJSON(testDialClient, url, &info) c.Assert(err, IsNil) - c.Assert(info.Store.StateName, Equals, metapb.StoreState_Offline.String()) + c.Assert(info.Store.State, Equals, metapb.StoreState_Offline) } // Set back to Up. @@ -376,7 +329,7 @@ func (s *testStoreSuite) TestStoreSetState(c *C) { c.Assert(err, IsNil) err = readJSON(testDialClient, url, &info) c.Assert(err, IsNil) - c.Assert(info.Store.StateName, Equals, metapb.StoreState_Up.String()) + c.Assert(info.Store.State, Equals, metapb.StoreState_Up) } func (s *testStoreSuite) TestUrlStoreFilter(c *C) { diff --git a/server/api/trend.go b/server/api/trend.go index 564319af163..542d2d9e966 100644 --- a/server/api/trend.go +++ b/server/api/trend.go @@ -130,8 +130,8 @@ func (h *trendHandler) getTrendStores() ([]trendStore, error) { for _, store := range stores { info := newStoreInfo(h.svr.GetScheduleConfig(), store) s := trendStore{ - ID: info.Store.StoreID, - Address: info.Store.Address, + ID: info.Store.GetId(), + Address: info.Store.GetAddress(), StateName: info.Store.StateName, Capacity: uint64(info.Status.Capacity), Available: uint64(info.Status.Available), diff --git a/tests/cluster.go b/tests/cluster.go index c0768bec9d8..4d42ee38806 100644 --- a/tests/cluster.go +++ b/tests/cluster.go @@ -361,7 +361,7 @@ func (s *TestServer) GetStoreRegions(storeID uint64) []*core.RegionInfo { func (s *TestServer) BootstrapCluster() error { bootstrapReq := &pdpb.BootstrapRequest{ Header: &pdpb.RequestHeader{ClusterId: s.GetClusterID()}, - Store: &metapb.Store{Id: 1, Address: "mock://1", LastHeartbeat: time.Now().UnixNano()}, + Store: &metapb.Store{Id: 1, Address: "mock://1"}, Region: &metapb.Region{Id: 2, Peers: []*metapb.Peer{{Id: 3, StoreId: 1, Role: metapb.PeerRole_Voter}}}, } _, err := s.grpcServer.Bootstrap(context.Background(), bootstrapReq) diff --git a/tests/pdctl/helper.go b/tests/pdctl/helper.go index 67d991871b1..66e675edb7f 100644 --- a/tests/pdctl/helper.go +++ b/tests/pdctl/helper.go @@ -51,8 +51,7 @@ func CheckStoresInfo(c *check.C, stores []*api.StoreInfo, want []*metapb.Store) } } for _, s := range stores { - metapbStore := s.Store.ConvertToMetapbStore() - obtained := proto.Clone(metapbStore).(*metapb.Store) + obtained := proto.Clone(s.Store.Store).(*metapb.Store) expected := proto.Clone(mapWant[obtained.Id]).(*metapb.Store) // Ignore lastHeartbeat obtained.LastHeartbeat, expected.LastHeartbeat = 0, 0 diff --git a/tests/pdctl/store/store_test.go b/tests/pdctl/store/store_test.go index 59b9927bb93..e89115ef2c6 100644 --- a/tests/pdctl/store/store_test.go +++ b/tests/pdctl/store/store_test.go @@ -248,7 +248,7 @@ func (s *storeTestSuite) TestStore(c *C) { c.Assert(ok, IsFalse) // store delete command - c.Assert(storeInfo.Store.StateName, Equals, metapb.StoreState_Up.String()) + c.Assert(storeInfo.Store.State, Equals, metapb.StoreState_Up) args = []string{"-u", pdAddr, "store", "delete", "1"} _, err = pdctl.ExecuteCommand(cmd, args...) c.Assert(err, IsNil) @@ -256,7 +256,7 @@ func (s *storeTestSuite) TestStore(c *C) { output, err = pdctl.ExecuteCommand(cmd, args...) c.Assert(err, IsNil) c.Assert(json.Unmarshal(output, &storeInfo), IsNil) - c.Assert(storeInfo.Store.StateName, Equals, metapb.StoreState_Offline.String()) + c.Assert(storeInfo.Store.State, Equals, metapb.StoreState_Offline) // store check status args = []string{"-u", pdAddr, "store", "check", "Offline"} @@ -285,7 +285,7 @@ func (s *storeTestSuite) TestStore(c *C) { output, err = pdctl.ExecuteCommand(cmd, args...) c.Assert(err, IsNil) c.Assert(json.Unmarshal(output, &storeInfo), IsNil) - c.Assert(storeInfo.Store.StateName, Equals, metapb.StoreState_Offline.String()) + c.Assert(storeInfo.Store.State, Equals, metapb.StoreState_Offline) // store remove-tombstone args = []string{"-u", pdAddr, "store", "remove-tombstone"}