From 8f91ccd10eee7df4dde550113782390623802d94 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E6=B7=B7=E6=B2=8CDM?= Date: Fri, 13 Aug 2021 14:35:20 +0800 Subject: [PATCH 1/4] scheduler: use compatible config after upgrading for hot-region (#3981) * scheduler: use compatible config after upgrading for hot-region-scheduler Signed-off-by: HunDunDM * add unit test Signed-off-by: HunDunDM * add unit test Signed-off-by: HunDunDM * address comment Signed-off-by: HunDunDM Co-authored-by: Ti Chi Robot --- server/schedulers/hot_region.go | 15 ++++- server/schedulers/hot_region_config.go | 23 ++++--- server/schedulers/hot_region_test.go | 83 +++++++++++++++++++++++--- 3 files changed, 103 insertions(+), 18 deletions(-) diff --git a/server/schedulers/hot_region.go b/server/schedulers/hot_region.go index efc734ed52f..fb800399d66 100644 --- a/server/schedulers/hot_region.go +++ b/server/schedulers/hot_region.go @@ -45,9 +45,20 @@ func init() { }) schedule.RegisterScheduler(HotRegionType, func(opController *schedule.OperatorController, storage *core.Storage, decoder schedule.ConfigDecoder) (schedule.Scheduler, error) { conf := initHotRegionScheduleConfig() - if err := decoder(conf); err != nil { + + var data map[string]interface{} + if err := decoder(&data); err != nil { return nil, err } + if len(data) != 0 { + // After upgrading, use compatible config. + // For clusters with the initial version >= v5.2, it will be overwritten by the default config. + conf.apply(compatibleConfig) + if err := decoder(conf); err != nil { + return nil, err + } + } + conf.storage = storage return newHotScheduler(opController, conf), nil }) @@ -395,7 +406,7 @@ func (bs *balanceSolver) init() { // For write, they are different switch bs.rwTy { case read: - bs.firstPriority, bs.secondPriority = bs.adjustConfig(bs.sche.conf.GetReadPriorities(), getReadLeaderPriorities) + bs.firstPriority, bs.secondPriority = bs.adjustConfig(bs.sche.conf.GetReadPriorities(), getReadPriorities) case write: switch bs.opTy { case transferLeader: diff --git a/server/schedulers/hot_region_config.go b/server/schedulers/hot_region_config.go index 082b66a0c82..2a4655fefec 100644 --- a/server/schedulers/hot_region_config.go +++ b/server/schedulers/hot_region_config.go @@ -44,21 +44,21 @@ const ( ) var defaultConfig = prioritiesConfig{ - readLeader: []string{QueryPriority, BytePriority}, + read: []string{QueryPriority, BytePriority}, writeLeader: []string{KeyPriority, BytePriority}, writePeer: []string{BytePriority, KeyPriority}, } // because tikv below 5.2.0 does not report query information, we will use byte and key as the scheduling dimensions var compatibleConfig = prioritiesConfig{ - readLeader: []string{BytePriority, KeyPriority}, + read: []string{BytePriority, KeyPriority}, writeLeader: []string{KeyPriority, BytePriority}, writePeer: []string{BytePriority, KeyPriority}, } // params about hot region. func initHotRegionScheduleConfig() *hotRegionSchedulerConfig { - return &hotRegionSchedulerConfig{ + cfg := &hotRegionSchedulerConfig{ MinHotByteRate: 100, MinHotKeyRate: 10, MinHotQueryRate: 10, @@ -72,12 +72,11 @@ func initHotRegionScheduleConfig() *hotRegionSchedulerConfig { MinorDecRatio: 0.99, SrcToleranceRatio: 1.05, // Tolerate 5% difference DstToleranceRatio: 1.05, // Tolerate 5% difference - ReadPriorities: defaultConfig.readLeader, - WriteLeaderPriorities: defaultConfig.writeLeader, - WritePeerPriorities: defaultConfig.writePeer, StrictPickingStore: true, EnableForTiFlash: true, } + cfg.apply(defaultConfig) + return cfg } type hotRegionSchedulerConfig struct { @@ -314,13 +313,19 @@ func (conf *hotRegionSchedulerConfig) persist() error { } type prioritiesConfig struct { - readLeader []string + read []string writeLeader []string writePeer []string } -func getReadLeaderPriorities(c *prioritiesConfig) []string { - return c.readLeader +func (conf *hotRegionSchedulerConfig) apply(p prioritiesConfig) { + conf.ReadPriorities = append(p.read[:0:0], p.read...) + conf.WriteLeaderPriorities = append(p.writeLeader[:0:0], p.writeLeader...) + conf.WritePeerPriorities = append(p.writePeer[:0:0], p.writePeer...) +} + +func getReadPriorities(c *prioritiesConfig) []string { + return c.read } func getWriteLeaderPriorities(c *prioritiesConfig) []string { diff --git a/server/schedulers/hot_region_test.go b/server/schedulers/hot_region_test.go index 29a68c5c49f..b399525a66e 100644 --- a/server/schedulers/hot_region_test.go +++ b/server/schedulers/hot_region_test.go @@ -59,15 +59,16 @@ func newHotWriteScheduler(opController *schedule.OperatorController, conf *hotRe } type testHotSchedulerSuite struct{} +type testHotReadRegionSchedulerSuite struct{} +type testHotWriteRegionSchedulerSuite struct{} type testInfluenceSerialSuite struct{} type testHotCacheSuite struct{} -type testHotReadRegionSchedulerSuite struct{} -var _ = Suite(&testHotWriteRegionSchedulerSuite{}) var _ = Suite(&testHotSchedulerSuite{}) var _ = Suite(&testHotReadRegionSchedulerSuite{}) -var _ = Suite(&testHotCacheSuite{}) +var _ = Suite(&testHotWriteRegionSchedulerSuite{}) var _ = SerialSuites(&testInfluenceSerialSuite{}) +var _ = Suite(&testHotCacheSuite{}) func (s *testHotSchedulerSuite) TestGCPendingOpInfos(c *C) { ctx, cancel := context.WithCancel(context.Background()) @@ -150,8 +151,6 @@ func newTestRegion(id uint64) *core.RegionInfo { return core.NewRegionInfo(&metapb.Region{Id: id, Peers: peers}, peers[0]) } -type testHotWriteRegionSchedulerSuite struct{} - func (s *testHotWriteRegionSchedulerSuite) TestByteRateOnly(c *C) { ctx, cancel := context.WithCancel(context.Background()) defer cancel() @@ -1735,7 +1734,7 @@ func loadsEqual(loads1, loads2 []float64) bool { return true } -func (s *testHotSchedulerSuite) TestHotReadPeerSchedule(c *C) { +func (s *testHotReadRegionSchedulerSuite) TestHotReadPeerSchedule(c *C) { ctx, cancel := context.WithCancel(context.Background()) defer cancel() opt := config.NewTestOptions() @@ -1850,7 +1849,7 @@ func (s *testHotSchedulerSuite) TestHotScheduleWithPriority(c *C) { hb.(*hotScheduler).clearPendingInfluence() } -func (s *testHotSchedulerSuite) TestHotWriteLeaderScheduleWithPriority(c *C) { +func (s *testHotWriteRegionSchedulerSuite) TestHotWriteLeaderScheduleWithPriority(c *C) { ctx, cancel := context.WithCancel(context.Background()) defer cancel() statistics.Denoising = false @@ -1951,6 +1950,76 @@ func (s *testHotSchedulerSuite) TestCompatibility(c *C) { }) } +func (s *testHotSchedulerSuite) TestCompatibilityConfig(c *C) { + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + opt := config.NewTestOptions() + tc := mockcluster.NewCluster(ctx, opt) + + // From new or 3.x cluster + hb, err := schedule.CreateScheduler(HotRegionType, schedule.NewOperatorController(ctx, tc, nil), core.NewStorage(kv.NewMemoryKV()), schedule.ConfigSliceDecoder("hot-region", nil)) + c.Assert(err, IsNil) + checkPriority(c, hb.(*hotScheduler), tc, [3][2]int{ + {statistics.QueryDim, statistics.ByteDim}, + {statistics.KeyDim, statistics.ByteDim}, + {statistics.ByteDim, statistics.KeyDim}, + }) + + // Config file is not currently supported + hb, err = schedule.CreateScheduler(HotRegionType, schedule.NewOperatorController(ctx, tc, nil), core.NewStorage(kv.NewMemoryKV()), + schedule.ConfigSliceDecoder("hot-region", []string{"read-priorities=byte,query"})) + c.Assert(err, IsNil) + checkPriority(c, hb.(*hotScheduler), tc, [3][2]int{ + {statistics.QueryDim, statistics.ByteDim}, + {statistics.KeyDim, statistics.ByteDim}, + {statistics.ByteDim, statistics.KeyDim}, + }) + + // from 4.0 or 5.0 or 5.1 cluster + var data []byte + storage := core.NewStorage(kv.NewMemoryKV()) + data, err = schedule.EncodeConfig(map[string]interface{}{ + "min-hot-byte-rate": 100, + "min-hot-key-rate": 10, + "max-zombie-rounds": 3, + "max-peer-number": 1000, + "byte-rate-rank-step-ratio": 0.05, + "key-rate-rank-step-ratio": 0.05, + "count-rank-step-ratio": 0.01, + "great-dec-ratio": 0.95, + "minor-dec-ratio": 0.99, + "src-tolerance-ratio": 1.05, + "dst-tolerance-ratio": 1.05, + }) + c.Assert(err, IsNil) + err = storage.SaveScheduleConfig(HotRegionName, data) + c.Assert(err, IsNil) + hb, err = schedule.CreateScheduler(HotRegionType, schedule.NewOperatorController(ctx, tc, nil), core.NewStorage(kv.NewMemoryKV()), schedule.ConfigJSONDecoder(data)) + c.Assert(err, IsNil) + checkPriority(c, hb.(*hotScheduler), tc, [3][2]int{ + {statistics.ByteDim, statistics.KeyDim}, + {statistics.KeyDim, statistics.ByteDim}, + {statistics.ByteDim, statistics.KeyDim}, + }) + + // From configured cluster + storage = core.NewStorage(kv.NewMemoryKV()) + cfg := initHotRegionScheduleConfig() + cfg.ReadPriorities = []string{"key", "query"} + cfg.WriteLeaderPriorities = []string{"query", "key"} + data, err = schedule.EncodeConfig(cfg) + c.Assert(err, IsNil) + err = storage.SaveScheduleConfig(HotRegionName, data) + c.Assert(err, IsNil) + hb, err = schedule.CreateScheduler(HotRegionType, schedule.NewOperatorController(ctx, nil, nil), core.NewStorage(kv.NewMemoryKV()), schedule.ConfigJSONDecoder(data)) + c.Assert(err, IsNil) + checkPriority(c, hb.(*hotScheduler), tc, [3][2]int{ + {statistics.KeyDim, statistics.QueryDim}, + {statistics.QueryDim, statistics.KeyDim}, + {statistics.ByteDim, statistics.KeyDim}, + }) +} + func checkPriority(c *C, hb *hotScheduler, tc *mockcluster.Cluster, dims [3][2]int) { readSolver := newBalanceSolver(hb, tc, read, transferLeader) writeLeaderSolver := newBalanceSolver(hb, tc, write, transferLeader) From e137de4904d7ad9804ab6abc4f51ecae0dd63868 Mon Sep 17 00:00:00 2001 From: 5kbpers Date: Mon, 16 Aug 2021 13:29:59 +0800 Subject: [PATCH 2/4] metrics: add alert for slow store (#3978) * metrics: add alert for slow store Signed-off-by: 5kbpers * address comment Signed-off-by: 5kbpers * Apply suggestions from code review Signed-off-by: 5kbpers Co-authored-by: Ryan Leung --- metrics/alertmanager/pd.rules.yml | 12 +++ metrics/grafana/pd.json | 146 +++++++++++++++--------------- 2 files changed, 86 insertions(+), 72 deletions(-) diff --git a/metrics/alertmanager/pd.rules.yml b/metrics/alertmanager/pd.rules.yml index c0be1768ab5..5a8a2485483 100644 --- a/metrics/alertmanager/pd.rules.yml +++ b/metrics/alertmanager/pd.rules.yml @@ -168,3 +168,15 @@ groups: description: 'cluster: ENV_LABELS_ENV, instance: {{ $labels.instance }}, values:{{ $value }}' value: '{{ $value }}' summary: PD server has been restarted + + - alert: PD_cluster_slow_tikv_nums + expr: (sum(pd_cluster_status{type="store_slow_count"}) by (instance) > 0) and (sum(etcd_server_is_leader) by (instance) > 0) + for: 1m + labels: + env: ENV_LABELS_ENV + level: critical + expr: (sum(pd_cluster_status{type="store_slow_count"}) by (instance) > 0) and (sum(etcd_server_is_leader) by (instance) > 0) + annotations: + description: 'cluster: ENV_LABELS_ENV, instance: {{ $labels.instance }}, values:{{ $value }}' + value: '{{ $value }}' + summary: PD_cluster_slow_tikv_nums diff --git a/metrics/grafana/pd.json b/metrics/grafana/pd.json index c93c3205f47..390dfdefdbb 100644 --- a/metrics/grafana/pd.json +++ b/metrics/grafana/pd.json @@ -687,6 +687,14 @@ "legendFormat": "Tombstone Stores", "refId": "G", "step": 20 + }, + { + "expr": "sum(pd_cluster_status{tidb_cluster=\"$tidb_cluster\", instance=\"$instance\", type=\"store_slow_count\"})", + "format": "time_series", + "intervalFactor": 2, + "legendFormat": "Slow Stores", + "refId": "H", + "step": 20 } ], "timeFrom": "1s", @@ -1352,101 +1360,95 @@ "type": "table" }, { - "cacheTimeout":null, - "colorBackground":false, - "colorValue":false, - "colors":[ + "cacheTimeout": null, + "colorBackground": false, + "colorValue": false, + "colors": [ "#299c46", "rgba(237, 129, 40, 0.89)", "#d44a3a" ], - "datasource":"tidb-cluster", - "fieldConfig":{ - "defaults":{ - "custom":{ - - } + "datasource": "tidb-cluster", + "fieldConfig": { + "defaults": { + "custom": {} }, - "overrides":[ - - ] + "overrides": [] }, - "format":"none", - "gauge":{ - "maxValue":100, - "minValue":0, - "show":false, - "thresholdLabels":false, - "thresholdMarkers":true + "format": "none", + "gauge": { + "maxValue": 100, + "minValue": 0, + "show": false, + "thresholdLabels": false, + "thresholdMarkers": true }, - "gridPos":{ - "h":4, - "w":5, - "x":9, - "y":17 + "gridPos": { + "h": 4, + "w": 5, + "x": 9, + "y": 17 }, - "hideTimeOverride":true, - "id":115, - "interval":null, - "links":[ - - ], - "mappingType":1, - "mappingTypes":[ + "hideTimeOverride": true, + "id": 115, + "interval": null, + "links": [], + "mappingType": 1, + "mappingTypes": [ { - "name":"value to text", - "value":1 + "name": "value to text", + "value": 1 }, { - "name":"range to text", - "value":2 + "name": "range to text", + "value": 2 } ], - "maxDataPoints":100, - "nullPointMode":"connected", - "nullText":null, - "postfix":"", - "postfixFontSize":"50%", - "prefix":"", - "prefixFontSize":"50%", - "rangeMaps":[ + "maxDataPoints": 100, + "nullPointMode": "connected", + "nullText": null, + "postfix": "", + "postfixFontSize": "50%", + "prefix": "", + "prefixFontSize": "50%", + "rangeMaps": [ { - "from":"null", - "text":"N/A", - "to":"null" + "from": "null", + "text": "N/A", + "to": "null" } ], - "sparkline":{ - "fillColor":"rgba(31, 118, 189, 0.18)", - "full":false, - "lineColor":"rgb(31, 120, 193)", - "show":false + "sparkline": { + "fillColor": "rgba(31, 118, 189, 0.18)", + "full": false, + "lineColor": "rgb(31, 120, 193)", + "show": false }, - "tableColumn":"idalloc", - "targets":[ + "tableColumn": "idalloc", + "targets": [ { - "expr":"pd_cluster_id{tidb_cluster=\"$tidb_cluster\", instance=\"$instance\"}", - "format":"time_series", - "hide":false, - "instant":true, - "intervalFactor":2, - "legendFormat":"{{type}}", - "refId":"A" + "expr": "pd_cluster_id{tidb_cluster=\"$tidb_cluster\", instance=\"$instance\"}", + "format": "time_series", + "hide": false, + "instant": true, + "intervalFactor": 2, + "legendFormat": "{{type}}", + "refId": "A" } ], - "thresholds":"", - "timeFrom":"1s", - "title":"Current ID allocation", - "type":"singlestat", - "valueFontSize":"80%", - "valueMaps":[ + "thresholds": "", + "timeFrom": "1s", + "title": "Current ID allocation", + "type": "singlestat", + "valueFontSize": "80%", + "valueMaps": [ { - "op":"=", - "text":"N/A", - "value":"null" + "op": "=", + "text": "N/A", + "value": "null" } ], - "valueName":"avg" + "valueName": "avg" }, { "aliasColors": {}, From ab0125b7cf3b76267094f10fda080139059391f5 Mon Sep 17 00:00:00 2001 From: lhy1024 Date: Mon, 16 Aug 2021 16:13:59 +0800 Subject: [PATCH 3/4] scheduler: adjust hot region config when pd-ctl return (#3982) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit * adjust config when pd-ctl return Signed-off-by: lhy1024 * address comments Signed-off-by: lhy1024 * address comments Signed-off-by: lhy1024 * address comments Signed-off-by: lhy1024 * address comment Signed-off-by: lhy1024 * address comment Signed-off-by: lhy1024 * address comment Signed-off-by: lhy1024 Co-authored-by: 混沌DM --- pkg/mock/mockcluster/mockcluster.go | 7 +++ server/schedulers/hot_region.go | 51 +++++------------- server/schedulers/hot_region_config.go | 71 ++++++++++++++++++++++++- server/schedulers/hot_region_test.go | 11 ++++ tests/pdctl/helper.go | 5 +- tests/pdctl/scheduler/scheduler_test.go | 17 +++++- 6 files changed, 119 insertions(+), 43 deletions(-) diff --git a/pkg/mock/mockcluster/mockcluster.go b/pkg/mock/mockcluster/mockcluster.go index 53d316716d0..81b9b708752 100644 --- a/pkg/mock/mockcluster/mockcluster.go +++ b/pkg/mock/mockcluster/mockcluster.go @@ -733,6 +733,13 @@ func (mc *Cluster) DisableFeature(fs ...versioninfo.Feature) { } } +// EnableFeature marks that these features are supported in the cluster. +func (mc *Cluster) EnableFeature(fs ...versioninfo.Feature) { + for _, f := range fs { + delete(mc.disabledFeatures, f) + } +} + // IsFeatureSupported checks if the feature is supported by current cluster. func (mc *Cluster) IsFeatureSupported(f versioninfo.Feature) bool { _, ok := mc.disabledFeatures[f] diff --git a/server/schedulers/hot_region.go b/server/schedulers/hot_region.go index fb800399d66..f0a1c5d4483 100644 --- a/server/schedulers/hot_region.go +++ b/server/schedulers/hot_region.go @@ -33,7 +33,6 @@ import ( "github.com/tikv/pd/server/schedule/operator" "github.com/tikv/pd/server/schedule/opt" "github.com/tikv/pd/server/statistics" - "github.com/tikv/pd/server/versioninfo" "go.uber.org/zap" ) @@ -402,52 +401,30 @@ func (bs *balanceSolver) init() { Count: maxCur.Count * bs.sche.conf.GetCountRankStepRatio(), } + bs.firstPriority, bs.secondPriority = prioritiesToDim(bs.getPriorities()) +} + +func (bs *balanceSolver) isSelectedDim(dim int) bool { + return dim == bs.firstPriority || dim == bs.secondPriority +} + +func (bs *balanceSolver) getPriorities() []string { + querySupport := bs.sche.conf.checkQuerySupport(bs.cluster) // For read, transfer-leader and move-peer have the same priority config // For write, they are different switch bs.rwTy { case read: - bs.firstPriority, bs.secondPriority = bs.adjustConfig(bs.sche.conf.GetReadPriorities(), getReadPriorities) + return adjustConfig(querySupport, bs.sche.conf.GetReadPriorities(), getReadPriorities) case write: switch bs.opTy { case transferLeader: - bs.firstPriority, bs.secondPriority = bs.adjustConfig(bs.sche.conf.GetWriteLeaderPriorities(), getWriteLeaderPriorities) + return adjustConfig(querySupport, bs.sche.conf.GetWriteLeaderPriorities(), getWriteLeaderPriorities) case movePeer: - bs.firstPriority, bs.secondPriority = bs.adjustConfig(bs.sche.conf.GetWritePeerPriorities(), getWritePeerPriorities) + return adjustConfig(querySupport, bs.sche.conf.GetWritePeerPriorities(), getWritePeerPriorities) } } -} - -func (bs *balanceSolver) isSelectedDim(dim int) bool { - return dim == bs.firstPriority || dim == bs.secondPriority -} - -// adjustConfig will adjust config for cluster with low version tikv -// because tikv below 5.2.0 does not report query information, we will use byte and key as the scheduling dimensions -func (bs *balanceSolver) adjustConfig(origins []string, getPriorities func(*prioritiesConfig) []string) (first, second int) { - querySupport := bs.cluster.IsFeatureSupported(versioninfo.HotScheduleWithQuery) - withQuery := slice.AnyOf(origins, func(i int) bool { - return origins[i] == QueryPriority - }) - compatibles := getPriorities(&compatibleConfig) - if !querySupport && withQuery { - schedulerCounter.WithLabelValues(bs.sche.GetName(), "use-compatible-config").Inc() - return prioritiesToDim(compatibles) - } - - defaults := getPriorities(&defaultConfig) - isLegal := slice.AllOf(origins, func(i int) bool { - return origins[i] == BytePriority || origins[i] == KeyPriority || origins[i] == QueryPriority - }) - if len(defaults) == len(origins) && isLegal && origins[0] != origins[1] { - return prioritiesToDim(origins) - } - - if !querySupport { - schedulerCounter.WithLabelValues(bs.sche.GetName(), "use-compatible-config").Inc() - return prioritiesToDim(compatibles) - } - schedulerCounter.WithLabelValues(bs.sche.GetName(), "use-default-config").Inc() - return prioritiesToDim(defaults) + log.Error("illegal type or illegal operator while getting the priority", zap.String("type", bs.rwTy.String()), zap.String("operator", bs.opTy.String())) + return []string{} } func newBalanceSolver(sche *hotScheduler, cluster opt.Cluster, rwTy rwType, opTy opType) *balanceSolver { diff --git a/server/schedulers/hot_region_config.go b/server/schedulers/hot_region_config.go index 2a4655fefec..08dbe5fa947 100644 --- a/server/schedulers/hot_region_config.go +++ b/server/schedulers/hot_region_config.go @@ -24,10 +24,15 @@ import ( "time" "github.com/gorilla/mux" + "github.com/pingcap/log" + "github.com/tikv/pd/pkg/slice" "github.com/tikv/pd/server/core" "github.com/tikv/pd/server/schedule" + "github.com/tikv/pd/server/schedule/opt" "github.com/tikv/pd/server/statistics" + "github.com/tikv/pd/server/versioninfo" "github.com/unrolled/render" + "go.uber.org/zap" ) const ( @@ -79,9 +84,33 @@ func initHotRegionScheduleConfig() *hotRegionSchedulerConfig { return cfg } +func (conf *hotRegionSchedulerConfig) getValidConf() *hotRegionSchedulerConfig { + return &hotRegionSchedulerConfig{ + MinHotByteRate: conf.MinHotByteRate, + MinHotKeyRate: conf.MinHotKeyRate, + MinHotQueryRate: conf.MinHotQueryRate, + MaxZombieRounds: conf.MaxZombieRounds, + MaxPeerNum: conf.MaxPeerNum, + ByteRateRankStepRatio: conf.ByteRateRankStepRatio, + KeyRateRankStepRatio: conf.KeyRateRankStepRatio, + QueryRateRankStepRatio: conf.QueryRateRankStepRatio, + CountRankStepRatio: conf.CountRankStepRatio, + GreatDecRatio: conf.GreatDecRatio, + MinorDecRatio: conf.MinorDecRatio, + SrcToleranceRatio: conf.SrcToleranceRatio, + DstToleranceRatio: conf.DstToleranceRatio, + ReadPriorities: adjustConfig(conf.lastQuerySupported, conf.ReadPriorities, getReadPriorities), + WriteLeaderPriorities: adjustConfig(conf.lastQuerySupported, conf.WriteLeaderPriorities, getWriteLeaderPriorities), + WritePeerPriorities: adjustConfig(conf.lastQuerySupported, conf.WritePeerPriorities, getWritePeerPriorities), + StrictPickingStore: conf.StrictPickingStore, + EnableForTiFlash: conf.EnableForTiFlash, + } +} + type hotRegionSchedulerConfig struct { sync.RWMutex - storage *core.Storage + storage *core.Storage + lastQuerySupported bool MinHotByteRate float64 `json:"min-hot-byte-rate"` MinHotKeyRate float64 `json:"min-hot-key-rate"` @@ -259,7 +288,7 @@ func (conf *hotRegionSchedulerConfig) handleGetConfig(w http.ResponseWriter, r * conf.RLock() defer conf.RUnlock() rd := render.New(render.Options{IndentJSON: true}) - rd.JSON(w, http.StatusOK, conf) + rd.JSON(w, http.StatusOK, conf.getValidConf()) } func (conf *hotRegionSchedulerConfig) handleSetConfig(w http.ResponseWriter, r *http.Request) { @@ -312,6 +341,19 @@ func (conf *hotRegionSchedulerConfig) persist() error { return conf.storage.SaveScheduleConfig(HotRegionName, data) } +func (conf *hotRegionSchedulerConfig) checkQuerySupport(cluster opt.Cluster) bool { + querySupport := cluster.IsFeatureSupported(versioninfo.HotScheduleWithQuery) + if querySupport != conf.lastQuerySupported { + log.Info("query supported changed", + zap.Bool("last-query-support", conf.lastQuerySupported), + zap.String("cluster-version", cluster.GetOpts().GetClusterVersion().String()), + zap.Reflect("config", conf), + zap.Reflect("valid-config", conf.getValidConf())) + conf.lastQuerySupported = querySupport + } + return querySupport +} + type prioritiesConfig struct { read []string writeLeader []string @@ -335,3 +377,28 @@ func getWriteLeaderPriorities(c *prioritiesConfig) []string { func getWritePeerPriorities(c *prioritiesConfig) []string { return c.writePeer } + +// adjustConfig will adjust config for cluster with low version tikv +// because tikv below 5.2.0 does not report query information, we will use byte and key as the scheduling dimensions +func adjustConfig(querySupport bool, origins []string, getPriorities func(*prioritiesConfig) []string) []string { + withQuery := slice.AnyOf(origins, func(i int) bool { + return origins[i] == QueryPriority + }) + compatibles := getPriorities(&compatibleConfig) + if !querySupport && withQuery { + return compatibles + } + + defaults := getPriorities(&defaultConfig) + isLegal := slice.AllOf(origins, func(i int) bool { + return origins[i] == BytePriority || origins[i] == KeyPriority || origins[i] == QueryPriority + }) + if len(defaults) == len(origins) && isLegal && origins[0] != origins[1] { + return origins + } + + if !querySupport { + return compatibles + } + return defaults +} diff --git a/server/schedulers/hot_region_test.go b/server/schedulers/hot_region_test.go index b399525a66e..9130e2be759 100644 --- a/server/schedulers/hot_region_test.go +++ b/server/schedulers/hot_region_test.go @@ -1948,6 +1948,17 @@ func (s *testHotSchedulerSuite) TestCompatibility(c *C) { {statistics.KeyDim, statistics.ByteDim}, {statistics.ByteDim, statistics.KeyDim}, }) + // test version change + tc.SetClusterVersion(versioninfo.MinSupportedVersion(versioninfo.Version5_0)) + c.Assert(hb.(*hotScheduler).conf.lastQuerySupported, IsFalse) + tc.EnableFeature(versioninfo.HotScheduleWithQuery) + c.Assert(hb.(*hotScheduler).conf.lastQuerySupported, IsFalse) // it will updated after scheduling + checkPriority(c, hb.(*hotScheduler), tc, [3][2]int{ + {statistics.QueryDim, statistics.ByteDim}, + {statistics.KeyDim, statistics.ByteDim}, + {statistics.ByteDim, statistics.KeyDim}, + }) + c.Assert(hb.(*hotScheduler).conf.lastQuerySupported, IsTrue) } func (s *testHotSchedulerSuite) TestCompatibilityConfig(c *C) { diff --git a/tests/pdctl/helper.go b/tests/pdctl/helper.go index 6a93ff23403..af0ee27daec 100644 --- a/tests/pdctl/helper.go +++ b/tests/pdctl/helper.go @@ -83,8 +83,9 @@ func CheckRegionsInfo(c *check.C, output *api.RegionsInfo, expected []*core.Regi // MustPutStore is used for test purpose. func MustPutStore(c *check.C, svr *server.Server, store *metapb.Store) { store.Address = fmt.Sprintf("tikv%d", store.GetId()) - store.Version = versioninfo.MinSupportedVersion(versioninfo.Version2_0).String() - + if len(store.Version) == 0 { + store.Version = versioninfo.MinSupportedVersion(versioninfo.Version2_0).String() + } _, err := svr.PutStore(context.Background(), &pdpb.PutStoreRequest{ Header: &pdpb.RequestHeader{ClusterId: svr.ClusterID()}, Store: store, diff --git a/tests/pdctl/scheduler/scheduler_test.go b/tests/pdctl/scheduler/scheduler_test.go index 7ec5c054413..8c0ce48aafe 100644 --- a/tests/pdctl/scheduler/scheduler_test.go +++ b/tests/pdctl/scheduler/scheduler_test.go @@ -24,6 +24,7 @@ import ( "github.com/pingcap/kvproto/pkg/metapb" "github.com/tikv/pd/server" "github.com/tikv/pd/server/config" + "github.com/tikv/pd/server/versioninfo" "github.com/tikv/pd/tests" "github.com/tikv/pd/tests/pdctl" pdctlCmd "github.com/tikv/pd/tools/pd-ctl/pdctl" @@ -269,7 +270,7 @@ func (s *schedulerTestSuite) TestScheduler(c *C) { "minor-dec-ratio": 0.99, "src-tolerance-ratio": 1.05, "dst-tolerance-ratio": 1.05, - "read-priorities": []interface{}{"query", "byte"}, + "read-priorities": []interface{}{"byte", "key"}, "write-leader-priorities": []interface{}{"key", "byte"}, "write-peer-priorities": []interface{}{"byte", "key"}, "strict-picking-store": "true", @@ -314,11 +315,23 @@ func (s *schedulerTestSuite) TestScheduler(c *C) { mustExec([]string{"-u", pdAddr, "scheduler", "config", "balance-hot-region-scheduler", "set", "write-priorities", "key,byte"}, nil) mustExec([]string{"-u", pdAddr, "scheduler", "config", "balance-hot-region-scheduler"}, &conf1) c.Assert(conf1, DeepEquals, expected1) + // test compatibility + for _, store := range stores { + version := versioninfo.HotScheduleWithQuery + store.Version = versioninfo.MinSupportedVersion(version).String() + pdctl.MustPutStore(c, leaderServer.GetServer(), store) + mustExec([]string{"-u", pdAddr, "scheduler", "config", "balance-hot-region-scheduler"}, &conf1) + } + conf["read-priorities"] = []interface{}{"query", "byte"} + mustExec([]string{"-u", pdAddr, "scheduler", "config", "balance-hot-region-scheduler"}, &conf1) // cannot set qps as write-peer-priorities mustExec([]string{"-u", pdAddr, "scheduler", "config", "balance-hot-region-scheduler", "set", "write-peer-priorities", "query,byte"}, nil) mustExec([]string{"-u", pdAddr, "scheduler", "config", "balance-hot-region-scheduler"}, &conf1) c.Assert(conf1, DeepEquals, expected1) - + // test remove and add + mustExec([]string{"-u", pdAddr, "scheduler", "remove", "balance-hot-region-scheduler"}, nil) + mustExec([]string{"-u", pdAddr, "scheduler", "add", "balance-hot-region-scheduler"}, nil) + c.Assert(conf1, DeepEquals, expected1) // test show scheduler with paused and disabled status. checkSchedulerWithStatusCommand := func(args []string, status string, expected []string) { if args != nil { From 7a2ab5054c4af2f6f3f71f8e6abdd76cfd085229 Mon Sep 17 00:00:00 2001 From: ShuNing Date: Mon, 16 Aug 2021 17:42:00 +0800 Subject: [PATCH 4/4] replication: check the up stores to switch to async (#3991) Signed-off-by: nolouch Co-authored-by: Ti Chi Robot --- server/replication/replication_mode.go | 39 +++++++++++++-------- server/replication/replication_mode_test.go | 17 +++++++-- 2 files changed, 39 insertions(+), 17 deletions(-) diff --git a/server/replication/replication_mode.go b/server/replication/replication_mode.go index 4fc41e40d46..98d4579a448 100644 --- a/server/replication/replication_mode.go +++ b/server/replication/replication_mode.go @@ -369,21 +369,22 @@ func (m *ModeManager) tickDR() { drTickCounter.Inc() - totalPrimary, totalDr := m.config.DRAutoSync.PrimaryReplicas, m.config.DRAutoSync.DRReplicas - downPrimary, downDr := m.checkStoreStatus() + totalPrimaryPeers, totalDrPeers := m.config.DRAutoSync.PrimaryReplicas, m.config.DRAutoSync.DRReplicas + downPrimaryStores, downDrStores, upPrimayStores, upDrStores := m.checkStoreStatus() // canSync is true when every region has at least 1 replica in each DC. - canSync := downPrimary < totalPrimary && downDr < totalDr + canSync := downPrimaryStores < totalPrimaryPeers && downDrStores < totalDrPeers && + upPrimayStores > 0 && upDrStores > 0 // hasMajority is true when every region has majority peer online. var upPeers int - if downPrimary < totalPrimary { - upPeers += totalPrimary - downPrimary + if downPrimaryStores < totalPrimaryPeers { + upPeers += totalPrimaryPeers - downPrimaryStores } - if downDr < totalDr { - upPeers += totalDr - downDr + if downDrStores < totalDrPeers { + upPeers += totalDrPeers - downDrStores } - hasMajority := upPeers*2 > totalPrimary+totalDr + hasMajority := upPeers*2 > totalPrimaryPeers+totalDrPeers // If hasMajority is false, the cluster is always unavailable. Switch to async won't help. if !canSync && hasMajority && m.drGetState() != drStateAsync && m.drCheckAsyncTimeout() { @@ -407,17 +408,25 @@ func (m *ModeManager) tickDR() { } } -func (m *ModeManager) checkStoreStatus() (primaryFailCount, drFailCount int) { +func (m *ModeManager) checkStoreStatus() (primaryDownCount, drDownCount, primaryUpCount, drUpCount int) { m.RLock() defer m.RUnlock() for _, s := range m.cluster.GetStores() { - if !s.IsTombstone() && s.DownTime() >= m.config.DRAutoSync.WaitStoreTimeout.Duration { - labelValue := s.GetLabelValue(m.config.DRAutoSync.LabelKey) - if labelValue == m.config.DRAutoSync.Primary { - primaryFailCount++ + down := !s.IsTombstone() && s.DownTime() >= m.config.DRAutoSync.WaitStoreTimeout.Duration + labelValue := s.GetLabelValue(m.config.DRAutoSync.LabelKey) + if labelValue == m.config.DRAutoSync.Primary { + if down { + primaryDownCount++ + } else { + primaryUpCount++ } - if labelValue == m.config.DRAutoSync.DR { - drFailCount++ + + } + if labelValue == m.config.DRAutoSync.DR { + if down { + drDownCount++ + } else { + drUpCount++ } } } diff --git a/server/replication/replication_mode_test.go b/server/replication/replication_mode_test.go index 057893823da..ff6cfb7c5a6 100644 --- a/server/replication/replication_mode_test.go +++ b/server/replication/replication_mode_test.go @@ -166,8 +166,6 @@ func (s *testReplicationMode) TestStateSwitch(c *C) { cluster.AddLabelsStore(1, 1, map[string]string{"zone": "zone1"}) cluster.AddLabelsStore(2, 1, map[string]string{"zone": "zone1"}) cluster.AddLabelsStore(3, 1, map[string]string{"zone": "zone1"}) - cluster.AddLabelsStore(4, 1, map[string]string{"zone": "zone2"}) - cluster.AddLabelsStore(5, 1, map[string]string{"zone": "zone2"}) // initial state is sync c.Assert(rep.drGetState(), Equals, drStateSync) @@ -178,6 +176,21 @@ func (s *testReplicationMode) TestStateSwitch(c *C) { stateID = rep.drAutoSync.StateID } + // only one zone, sync -> async + rep.tickDR() + c.Assert(rep.drGetState(), Equals, drStateAsync) + assertStateIDUpdate() + + // add new store in dr zone. + cluster.AddLabelsStore(4, 1, map[string]string{"zone": "zone2"}) + cluster.AddLabelsStore(5, 1, map[string]string{"zone": "zone2"}) + // async -> sync + rep.tickDR() + c.Assert(rep.drGetState(), Equals, drStateSyncRecover) + rep.drSwitchToSync() + c.Assert(rep.drGetState(), Equals, drStateSync) + assertStateIDUpdate() + // sync -> async rep.tickDR() c.Assert(rep.drGetState(), Equals, drStateSync)