From 1cd99f1aff6509b767514ce51c36e4ba50b16f60 Mon Sep 17 00:00:00 2001 From: lhy1024 Date: Fri, 12 Apr 2024 11:45:23 +0800 Subject: [PATCH 1/5] pdctl: support top query in pdctl (#7843) close tikv/pd#7369 Signed-off-by: lhy1024 Co-authored-by: ti-chi-bot[bot] <108142056+ti-chi-bot[bot]@users.noreply.github.com> --- pkg/core/region_option.go | 36 ++++++++- server/api/region.go | 26 ++++++ server/api/router.go | 2 + tools/pd-ctl/pdctl/command/region_command.go | 51 ++++++++++-- tools/pd-ctl/tests/config/config_test.go | 28 +++++++ tools/pd-ctl/tests/helper.go | 9 +++ tools/pd-ctl/tests/region/region_test.go | 85 ++++++++++++++------ 7 files changed, 204 insertions(+), 33 deletions(-) diff --git a/pkg/core/region_option.go b/pkg/core/region_option.go index 36db7cf3460..e973a1e7c1f 100644 --- a/pkg/core/region_option.go +++ b/pkg/core/region_option.go @@ -248,18 +248,23 @@ func SetReadKeys(v uint64) RegionCreateOption { // SetReadQuery sets the read query for the region, only used for unit test. func SetReadQuery(v uint64) RegionCreateOption { - q := RandomKindReadQuery(v) - return SetQueryStats(q) + return func(region *RegionInfo) { + resetReadQuery(region.queryStats) + region.queryStats = mergeQueryStat(region.queryStats, RandomKindReadQuery(v)) + } } // SetWrittenQuery sets the write query for the region, only used for unit test. func SetWrittenQuery(v uint64) RegionCreateOption { - q := RandomKindWriteQuery(v) - return SetQueryStats(q) + return func(region *RegionInfo) { + resetWriteQuery(region.queryStats) + region.queryStats = mergeQueryStat(region.queryStats, RandomKindWriteQuery(v)) + } } // SetQueryStats sets the query stats for the region, it will cover previous statistic. // This func is only used for unit test. +// It will cover previous statistic. func SetQueryStats(v *pdpb.QueryStats) RegionCreateOption { return func(region *RegionInfo) { region.queryStats = v @@ -268,6 +273,7 @@ func SetQueryStats(v *pdpb.QueryStats) RegionCreateOption { // AddQueryStats sets the query stats for the region, it will preserve previous statistic. // This func is only used for test and simulator. +// It will preserve previous statistic. func AddQueryStats(v *pdpb.QueryStats) RegionCreateOption { return func(region *RegionInfo) { q := mergeQueryStat(region.queryStats, v) @@ -469,3 +475,25 @@ func mergeQueryStat(q1, q2 *pdpb.QueryStats) *pdpb.QueryStats { q2.Rollback += q1.Rollback return q2 } + +func resetReadQuery(q *pdpb.QueryStats) { + if q == nil { + return + } + q.Get = 0 + q.Scan = 0 + q.Coprocessor = 0 +} + +func resetWriteQuery(q *pdpb.QueryStats) { + if q == nil { + return + } + q.Put = 0 + q.Delete = 0 + q.DeleteRange = 0 + q.AcquirePessimisticLock = 0 + q.Rollback = 0 + q.Prewrite = 0 + q.Commit = 0 +} diff --git a/server/api/region.go b/server/api/region.go index cf3157f8834..dac92f247ca 100644 --- a/server/api/region.go +++ b/server/api/region.go @@ -545,6 +545,19 @@ func (h *regionsHandler) GetTopWriteFlowRegions(w http.ResponseWriter, r *http.R h.GetTopNRegions(w, r, func(a, b *core.RegionInfo) bool { return a.GetBytesWritten() < b.GetBytesWritten() }) } +// @Tags region +// @Summary List regions with the highest write flow. +// @Param limit query integer false "Limit count" default(16) +// @Produce json +// @Success 200 {object} response.RegionsInfo +// @Failure 400 {string} string "The input is invalid." +// @Router /regions/writequery [get] +func (h *regionsHandler) GetTopWriteQueryRegions(w http.ResponseWriter, r *http.Request) { + h.GetTopNRegions(w, r, func(a, b *core.RegionInfo) bool { + return a.GetWriteQueryNum() < b.GetWriteQueryNum() + }) +} + // @Tags region // @Summary List regions with the highest read flow. // @Param limit query integer false "Limit count" default(16) @@ -556,6 +569,19 @@ func (h *regionsHandler) GetTopReadFlowRegions(w http.ResponseWriter, r *http.Re h.GetTopNRegions(w, r, func(a, b *core.RegionInfo) bool { return a.GetBytesRead() < b.GetBytesRead() }) } +// @Tags region +// @Summary List regions with the highest write flow. +// @Param limit query integer false "Limit count" default(16) +// @Produce json +// @Success 200 {object} response.RegionsInfo +// @Failure 400 {string} string "The input is invalid." +// @Router /regions/readquery [get] +func (h *regionsHandler) GetTopReadQueryRegions(w http.ResponseWriter, r *http.Request) { + h.GetTopNRegions(w, r, func(a, b *core.RegionInfo) bool { + return a.GetReadQueryNum() < b.GetReadQueryNum() + }) +} + // @Tags region // @Summary List regions with the largest conf version. // @Param limit query integer false "Limit count" default(16) diff --git a/server/api/router.go b/server/api/router.go index 5ca8aac582d..553332e96af 100644 --- a/server/api/router.go +++ b/server/api/router.go @@ -257,7 +257,9 @@ func createRouter(prefix string, svr *server.Server) *mux.Router { registerFunc(clusterRouter, "/regions/store/{id}", regionsHandler.GetStoreRegions, setMethods(http.MethodGet), setAuditBackend(prometheus)) registerFunc(clusterRouter, "/regions/keyspace/id/{id}", regionsHandler.GetKeyspaceRegions, setMethods(http.MethodGet), setAuditBackend(prometheus)) registerFunc(clusterRouter, "/regions/writeflow", regionsHandler.GetTopWriteFlowRegions, setMethods(http.MethodGet), setAuditBackend(prometheus)) + registerFunc(clusterRouter, "/regions/writequery", regionsHandler.GetTopWriteQueryRegions, setMethods(http.MethodGet), setAuditBackend(prometheus)) registerFunc(clusterRouter, "/regions/readflow", regionsHandler.GetTopReadFlowRegions, setMethods(http.MethodGet), setAuditBackend(prometheus)) + registerFunc(clusterRouter, "/regions/readquery", regionsHandler.GetTopReadQueryRegions, setMethods(http.MethodGet), setAuditBackend(prometheus)) registerFunc(clusterRouter, "/regions/confver", regionsHandler.GetTopConfVerRegions, setMethods(http.MethodGet), setAuditBackend(prometheus)) registerFunc(clusterRouter, "/regions/version", regionsHandler.GetTopVersionRegions, setMethods(http.MethodGet), setAuditBackend(prometheus)) registerFunc(clusterRouter, "/regions/size", regionsHandler.GetTopSizeRegions, setMethods(http.MethodGet), setAuditBackend(prometheus)) diff --git a/tools/pd-ctl/pdctl/command/region_command.go b/tools/pd-ctl/pdctl/command/region_command.go index e03de1c62ac..d7e19967c7a 100644 --- a/tools/pd-ctl/pdctl/command/region_command.go +++ b/tools/pd-ctl/pdctl/command/region_command.go @@ -37,6 +37,8 @@ var ( regionsCheckPrefix = "pd/api/v1/regions/check" regionsWriteFlowPrefix = "pd/api/v1/regions/writeflow" regionsReadFlowPrefix = "pd/api/v1/regions/readflow" + regionsWriteQueryPrefix = "pd/api/v1/regions/writequery" + regionsReadQueryPrefix = "pd/api/v1/regions/readquery" regionsConfVerPrefix = "pd/api/v1/regions/confver" regionsVersionPrefix = "pd/api/v1/regions/version" regionsSizePrefix = "pd/api/v1/regions/size" @@ -66,17 +68,17 @@ func NewRegionCommand() *cobra.Command { r.AddCommand(NewRangesWithRangeHolesCommand()) topRead := &cobra.Command{ - Use: `topread [--jq=""]`, - Short: "show regions with top read flow", - Run: showRegionsTopCommand(regionsReadFlowPrefix), + Use: `topread [byte|query] [--jq=""]`, + Short: "show regions with top read flow or query", + Run: showTopReadRegions, } topRead.Flags().String("jq", "", "jq query") r.AddCommand(topRead) topWrite := &cobra.Command{ - Use: `topwrite [--jq=""]`, - Short: "show regions with top write flow", - Run: showRegionsTopCommand(regionsWriteFlowPrefix), + Use: `topwrite [byte|query] [--jq=""]`, + Short: "show regions with top write flow or query", + Run: showTopWriteRegions, } topWrite.Flags().String("jq", "", "jq query") r.AddCommand(topWrite) @@ -212,6 +214,9 @@ func showRegionsTopCommand(prefix string) run { return } prefix += "?limit=" + args[0] + } else if len(args) > 1 { + cmd.Println(cmd.UsageString()) + return } r, err := doRequest(cmd, prefix, http.MethodGet, http.Header{}) if err != nil { @@ -226,6 +231,40 @@ func showRegionsTopCommand(prefix string) run { } } +func showTopReadRegions(cmd *cobra.Command, args []string) { + // default to show top read flow + if len(args) == 0 { + showRegionsTopCommand(regionsReadFlowPrefix)(cmd, args) + return + } + // default to show top read flow with limit + switch args[0] { + case "query": + showRegionsTopCommand(regionsReadQueryPrefix)(cmd, args[1:]) + case "byte": + showRegionsTopCommand(regionsReadFlowPrefix)(cmd, args[1:]) + default: + showRegionsTopCommand(regionsReadFlowPrefix)(cmd, args) + } +} + +func showTopWriteRegions(cmd *cobra.Command, args []string) { + // default to show top write flow + if len(args) == 0 { + showRegionsTopCommand(regionsWriteFlowPrefix)(cmd, args) + return + } + // default to show top write flow with limit + switch args[0] { + case "query": + showRegionsTopCommand(regionsWriteQueryPrefix)(cmd, args[1:]) + case "byte": + showRegionsTopCommand(regionsWriteFlowPrefix)(cmd, args[1:]) + default: + showRegionsTopCommand(regionsWriteFlowPrefix)(cmd, args) + } +} + // NewRegionWithKeyCommand return a region with key subcommand of regionCmd func NewRegionWithKeyCommand() *cobra.Command { r := &cobra.Command{ diff --git a/tools/pd-ctl/tests/config/config_test.go b/tools/pd-ctl/tests/config/config_test.go index 07a7c2aa990..c6430789cfc 100644 --- a/tools/pd-ctl/tests/config/config_test.go +++ b/tools/pd-ctl/tests/config/config_test.go @@ -1144,6 +1144,34 @@ func (suite *configTestSuite) checkMicroServiceConfig(cluster *pdTests.TestClust re.False(svr.GetMicroServiceConfig().EnableSchedulingFallback) } +func (suite *configTestSuite) TestRegionRules() { + suite.env.RunTestInTwoModes(suite.checkRegionRules) +} + +func (suite *configTestSuite) checkRegionRules(cluster *pdTests.TestCluster) { + re := suite.Require() + leaderServer := cluster.GetLeaderServer() + pdAddr := leaderServer.GetAddr() + cmd := ctl.GetRootCmd() + + storeID, regionID := uint64(1), uint64(2) + store := &metapb.Store{ + Id: storeID, + State: metapb.StoreState_Up, + } + pdTests.MustPutStore(re, cluster, store) + pdTests.MustPutRegion(re, cluster, regionID, storeID, []byte{}, []byte{}) + + args := []string{"-u", pdAddr, "config", "placement-rules", "show", "--region=" + strconv.Itoa(int(regionID)), "--detail"} + output, err := tests.ExecuteCommand(cmd, args...) + re.NoError(err) + fit := &placement.RegionFit{} + re.NoError(json.Unmarshal(output, fit)) + re.Len(fit.RuleFits, 1) + re.Equal(placement.DefaultGroupID, fit.RuleFits[0].Rule.GroupID) + re.Equal(placement.DefaultRuleID, fit.RuleFits[0].Rule.ID) +} + func assertBundles(re *require.Assertions, a, b []placement.GroupBundle) { re.Len(b, len(a)) for i := 0; i < len(a); i++ { diff --git a/tools/pd-ctl/tests/helper.go b/tools/pd-ctl/tests/helper.go index 90626afe5c0..bdacae48c22 100644 --- a/tools/pd-ctl/tests/helper.go +++ b/tools/pd-ctl/tests/helper.go @@ -82,3 +82,12 @@ func CheckRegionsInfo(re *require.Assertions, output *response.RegionsInfo, expe CheckRegionInfo(re, &got[i], region) } } + +// CheckRegionsInfoWithoutSort is used to check the test results without sort. +func CheckRegionsInfoWithoutSort(re *require.Assertions, output *response.RegionsInfo, expected []*core.RegionInfo) { + re.Len(expected, output.Count) + got := output.Regions + for i, region := range expected { + CheckRegionInfo(re, &got[i], region) + } +} diff --git a/tools/pd-ctl/tests/region/region_test.go b/tools/pd-ctl/tests/region/region_test.go index 5df30520115..b328fd88286 100644 --- a/tools/pd-ctl/tests/region/region_test.go +++ b/tools/pd-ctl/tests/region/region_test.go @@ -83,6 +83,7 @@ func TestRegion(t *testing.T) { r1 := pdTests.MustPutRegion(re, cluster, 1, 1, []byte("a"), []byte("b"), core.SetWrittenBytes(1000), core.SetReadBytes(1000), core.SetRegionConfVer(1), core.SetRegionVersion(1), core.SetApproximateSize(1), core.SetApproximateKeys(100), + core.SetReadQuery(100), core.SetWrittenQuery(100), core.SetPeers([]*metapb.Peer{ {Id: 1, StoreId: 1}, {Id: 5, StoreId: 2}, @@ -92,15 +93,18 @@ func TestRegion(t *testing.T) { r2 := pdTests.MustPutRegion(re, cluster, 2, 1, []byte("b"), []byte("c"), core.SetWrittenBytes(2000), core.SetReadBytes(0), core.SetRegionConfVer(2), core.SetRegionVersion(3), core.SetApproximateSize(144), core.SetApproximateKeys(14400), + core.SetReadQuery(200), core.SetWrittenQuery(200), ) r3 := pdTests.MustPutRegion(re, cluster, 3, 1, []byte("c"), []byte("d"), core.SetWrittenBytes(500), core.SetReadBytes(800), core.SetRegionConfVer(3), core.SetRegionVersion(2), core.SetApproximateSize(30), core.SetApproximateKeys(3000), + core.SetReadQuery(300), core.SetWrittenQuery(300), core.WithDownPeers([]*pdpb.PeerStats{{Peer: downPeer, DownSeconds: 3600}}), core.WithPendingPeers([]*metapb.Peer{downPeer}), core.WithLearners([]*metapb.Peer{{Id: 3, StoreId: 1}})) r4 := pdTests.MustPutRegion(re, cluster, 4, 1, []byte("d"), []byte("e"), - core.SetWrittenBytes(100), core.SetReadBytes(100), core.SetRegionConfVer(1), - core.SetRegionVersion(1), core.SetApproximateSize(10), core.SetApproximateKeys(1000), + core.SetWrittenBytes(100), core.SetReadBytes(100), core.SetRegionConfVer(4), + core.SetRegionVersion(4), core.SetApproximateSize(10), core.SetApproximateKeys(1000), + core.SetReadQuery(400), core.SetWrittenQuery(400), ) defer cluster.Destroy() @@ -115,26 +119,6 @@ func TestRegion(t *testing.T) { // region store command {[]string{"region", "store", "1"}, leaderServer.GetStoreRegions(1)}, {[]string{"region", "store", "1"}, []*core.RegionInfo{r1, r2, r3, r4}}, - // region topread [limit] command - {[]string{"region", "topread", "2"}, api.TopNRegions(leaderServer.GetRegions(), func(a, b *core.RegionInfo) bool { return a.GetBytesRead() < b.GetBytesRead() }, 2)}, - // region topwrite [limit] command - {[]string{"region", "topwrite", "2"}, api.TopNRegions(leaderServer.GetRegions(), func(a, b *core.RegionInfo) bool { return a.GetBytesWritten() < b.GetBytesWritten() }, 2)}, - // region topconfver [limit] command - {[]string{"region", "topconfver", "2"}, api.TopNRegions(leaderServer.GetRegions(), func(a, b *core.RegionInfo) bool { - return a.GetMeta().GetRegionEpoch().GetConfVer() < b.GetMeta().GetRegionEpoch().GetConfVer() - }, 2)}, - // region topversion [limit] command - {[]string{"region", "topversion", "2"}, api.TopNRegions(leaderServer.GetRegions(), func(a, b *core.RegionInfo) bool { - return a.GetMeta().GetRegionEpoch().GetVersion() < b.GetMeta().GetRegionEpoch().GetVersion() - }, 2)}, - // region topsize [limit] command - {[]string{"region", "topsize", "2"}, api.TopNRegions(leaderServer.GetRegions(), func(a, b *core.RegionInfo) bool { - return a.GetApproximateSize() < b.GetApproximateSize() - }, 2)}, - // region topkeys [limit] command - {[]string{"region", "topkeys", "2"}, api.TopNRegions(leaderServer.GetRegions(), func(a, b *core.RegionInfo) bool { - return a.GetApproximateKeys() < b.GetApproximateKeys() - }, 2)}, // region check extra-peer command {[]string{"region", "check", "extra-peer"}, []*core.RegionInfo{r1}}, // region check miss-peer command @@ -172,10 +156,65 @@ func TestRegion(t *testing.T) { output, err := tests.ExecuteCommand(cmd, args...) re.NoError(err) regions := &response.RegionsInfo{} - re.NoError(json.Unmarshal(output, regions)) + re.NoError(json.Unmarshal(output, regions), string(output)) tests.CheckRegionsInfo(re, regions, testCase.expect) } + testRegionsCases = []struct { + args []string + expect []*core.RegionInfo + }{ + // region topread [limit] command + {[]string{"region", "topread"}, api.TopNRegions(leaderServer.GetRegions(), func(a, b *core.RegionInfo) bool { return a.GetBytesRead() < b.GetBytesRead() }, 4)}, + // region topwrite [limit] command + {[]string{"region", "topwrite"}, api.TopNRegions(leaderServer.GetRegions(), func(a, b *core.RegionInfo) bool { return a.GetBytesWritten() < b.GetBytesWritten() }, 4)}, + // region topread [limit] command + {[]string{"region", "topread", "2"}, api.TopNRegions(leaderServer.GetRegions(), func(a, b *core.RegionInfo) bool { return a.GetBytesRead() < b.GetBytesRead() }, 2)}, + // region topwrite [limit] command + {[]string{"region", "topwrite", "2"}, api.TopNRegions(leaderServer.GetRegions(), func(a, b *core.RegionInfo) bool { return a.GetBytesWritten() < b.GetBytesWritten() }, 2)}, + // region topread byte [limit] command + {[]string{"region", "topread", "byte"}, api.TopNRegions(leaderServer.GetRegions(), func(a, b *core.RegionInfo) bool { return a.GetBytesRead() < b.GetBytesRead() }, 4)}, + // region topwrite byte [limit] command + {[]string{"region", "topwrite", "byte"}, api.TopNRegions(leaderServer.GetRegions(), func(a, b *core.RegionInfo) bool { return a.GetBytesWritten() < b.GetBytesWritten() }, 4)}, + // region topread byte [limit] command + {[]string{"region", "topread", "query"}, api.TopNRegions(leaderServer.GetRegions(), func(a, b *core.RegionInfo) bool { return a.GetReadQueryNum() < b.GetReadQueryNum() }, 4)}, + // region topwrite byte [limit] command + {[]string{"region", "topwrite", "query"}, api.TopNRegions(leaderServer.GetRegions(), func(a, b *core.RegionInfo) bool { return a.GetWriteQueryNum() < b.GetWriteQueryNum() }, 4)}, + // region topread byte [limit] command + {[]string{"region", "topread", "byte", "2"}, api.TopNRegions(leaderServer.GetRegions(), func(a, b *core.RegionInfo) bool { return a.GetBytesRead() < b.GetBytesRead() }, 2)}, + // region topwrite byte [limit] command + {[]string{"region", "topwrite", "byte", "2"}, api.TopNRegions(leaderServer.GetRegions(), func(a, b *core.RegionInfo) bool { return a.GetBytesWritten() < b.GetBytesWritten() }, 2)}, + // region topread byte [limit] command + {[]string{"region", "topread", "query", "2"}, api.TopNRegions(leaderServer.GetRegions(), func(a, b *core.RegionInfo) bool { return a.GetReadQueryNum() < b.GetReadQueryNum() }, 2)}, + // region topwrite byte [limit] command + {[]string{"region", "topwrite", "query", "2"}, api.TopNRegions(leaderServer.GetRegions(), func(a, b *core.RegionInfo) bool { return a.GetWriteQueryNum() < b.GetWriteQueryNum() }, 2)}, + // region topconfver [limit] command + {[]string{"region", "topconfver", "2"}, api.TopNRegions(leaderServer.GetRegions(), func(a, b *core.RegionInfo) bool { + return a.GetMeta().GetRegionEpoch().GetConfVer() < b.GetMeta().GetRegionEpoch().GetConfVer() + }, 2)}, + // region topversion [limit] command + {[]string{"region", "topversion", "2"}, api.TopNRegions(leaderServer.GetRegions(), func(a, b *core.RegionInfo) bool { + return a.GetMeta().GetRegionEpoch().GetVersion() < b.GetMeta().GetRegionEpoch().GetVersion() + }, 2)}, + // region topsize [limit] command + {[]string{"region", "topsize", "2"}, api.TopNRegions(leaderServer.GetRegions(), func(a, b *core.RegionInfo) bool { + return a.GetApproximateSize() < b.GetApproximateSize() + }, 2)}, + // region topkeys [limit] command + {[]string{"region", "topkeys", "2"}, api.TopNRegions(leaderServer.GetRegions(), func(a, b *core.RegionInfo) bool { + return a.GetApproximateKeys() < b.GetApproximateKeys() + }, 2)}, + } + + for _, testCase := range testRegionsCases { + args := append([]string{"-u", pdAddr}, testCase.args...) + output, err := tests.ExecuteCommand(cmd, args...) + re.NoError(err) + regions := &response.RegionsInfo{} + re.NoError(json.Unmarshal(output, regions), string(output)) + tests.CheckRegionsInfoWithoutSort(re, regions, testCase.expect) + } + var testRegionCases = []struct { args []string expect *core.RegionInfo From 33ae3b614e46fe0b3a1a94a62b3944883129c829 Mon Sep 17 00:00:00 2001 From: lhy1024 Date: Fri, 12 Apr 2024 12:34:53 +0800 Subject: [PATCH 2/5] scheduler: use move-hot-write-leader operator (#7852) close tikv/pd#7848 Signed-off-by: lhy1024 Co-authored-by: ti-chi-bot[bot] <108142056+ti-chi-bot[bot]@users.noreply.github.com> --- metrics/grafana/pd.json | 15 +++++ pkg/schedule/schedulers/hot_region.go | 45 ++------------- pkg/schedule/schedulers/hot_region_test.go | 57 +++++++++++-------- pkg/utils/operatorutil/operator_check.go | 66 ++++++++++++++-------- 4 files changed, 96 insertions(+), 87 deletions(-) diff --git a/metrics/grafana/pd.json b/metrics/grafana/pd.json index 89f2828757f..9941004e0c2 100644 --- a/metrics/grafana/pd.json +++ b/metrics/grafana/pd.json @@ -5881,6 +5881,21 @@ "intervalFactor": 1, "legendFormat": "store-{{store}}-in", "refId": "B" + }, + { + "expr": "- sum(delta(pd_scheduler_hot_region_direction{k8s_cluster=\"$k8s_cluster\", tidb_cluster=\"$tidb_cluster\", store=~\"$store\",type=\"move-leader\",direction=\"out\",rw=\"write\"}[1m]))by (store)", + "format": "time_series", + "intervalFactor": 2, + "legendFormat": "store-{{store}}-out", + "refId": "C", + "step": 4 + }, + { + "expr": "sum(delta(pd_scheduler_hot_region_direction{k8s_cluster=\"$k8s_cluster\", tidb_cluster=\"$tidb_cluster\", store=~\"$store\",type=\"move-leader\",direction=\"in\",rw=\"write\"}[1m]))by (store)", + "format": "time_series", + "intervalFactor": 1, + "legendFormat": "store-{{store}}-in", + "refId": "D" } ], "thresholds": [], diff --git a/pkg/schedule/schedulers/hot_region.go b/pkg/schedule/schedulers/hot_region.go index b6293c2dac9..b4e904c1481 100644 --- a/pkg/schedule/schedulers/hot_region.go +++ b/pkg/schedule/schedulers/hot_region.go @@ -1546,20 +1546,12 @@ func (bs *balanceSolver) buildOperators() (ops []*operator.Operator) { targetLabel := strconv.FormatUint(dstStoreID, 10) dim := bs.rankToDimString() - var createOperator func(region *core.RegionInfo, srcStoreID, dstStoreID uint64) (op *operator.Operator, typ string, err error) - switch bs.rwTy { - case utils.Read: - createOperator = bs.createReadOperator - case utils.Write: - createOperator = bs.createWriteOperator - } - - currentOp, typ, err := createOperator(bs.cur.region, srcStoreID, dstStoreID) + currentOp, typ, err := bs.createOperator(bs.cur.region, srcStoreID, dstStoreID) if err == nil { bs.decorateOperator(currentOp, false, sourceLabel, targetLabel, typ, dim) ops = []*operator.Operator{currentOp} if bs.cur.revertRegion != nil { - currentOp, typ, err = createOperator(bs.cur.revertRegion, dstStoreID, srcStoreID) + currentOp, typ, err = bs.createOperator(bs.cur.revertRegion, dstStoreID, srcStoreID) if err == nil { bs.decorateOperator(currentOp, true, targetLabel, sourceLabel, typ, dim) ops = append(ops, currentOp) @@ -1725,11 +1717,11 @@ func (bs *balanceSolver) createSplitOperator(regions []*core.RegionInfo, strateg return operators } -func (bs *balanceSolver) createReadOperator(region *core.RegionInfo, srcStoreID, dstStoreID uint64) (op *operator.Operator, typ string, err error) { +func (bs *balanceSolver) createOperator(region *core.RegionInfo, srcStoreID, dstStoreID uint64) (op *operator.Operator, typ string, err error) { if region.GetStorePeer(dstStoreID) != nil { typ = "transfer-leader" op, err = operator.CreateTransferLeaderOperator( - "transfer-hot-read-leader", + "transfer-hot-"+bs.rwTy.String()+"-leader", bs, region, dstStoreID, @@ -1741,7 +1733,7 @@ func (bs *balanceSolver) createReadOperator(region *core.RegionInfo, srcStoreID, if region.GetLeader().GetStoreId() == srcStoreID { typ = "move-leader" op, err = operator.CreateMoveLeaderOperator( - "move-hot-read-leader", + "move-hot-"+bs.rwTy.String()+"-leader", bs, region, operator.OpHotRegion, @@ -1750,7 +1742,7 @@ func (bs *balanceSolver) createReadOperator(region *core.RegionInfo, srcStoreID, } else { typ = "move-peer" op, err = operator.CreateMovePeerOperator( - "move-hot-read-peer", + "move-hot-"+bs.rwTy.String()+"-peer", bs, region, operator.OpHotRegion, @@ -1761,31 +1753,6 @@ func (bs *balanceSolver) createReadOperator(region *core.RegionInfo, srcStoreID, return } -func (bs *balanceSolver) createWriteOperator(region *core.RegionInfo, srcStoreID, dstStoreID uint64) (op *operator.Operator, typ string, err error) { - if region.GetStorePeer(dstStoreID) != nil { - typ = "transfer-leader" - op, err = operator.CreateTransferLeaderOperator( - "transfer-hot-write-leader", - bs, - region, - dstStoreID, - []uint64{}, - operator.OpHotRegion) - } else { - srcPeer := region.GetStorePeer(srcStoreID) // checked in `filterHotPeers` - dstPeer := &metapb.Peer{StoreId: dstStoreID, Role: srcPeer.Role} - typ = "move-peer" - op, err = operator.CreateMovePeerOperator( - "move-hot-write-peer", - bs, - region, - operator.OpHotRegion, - srcStoreID, - dstPeer) - } - return -} - func (bs *balanceSolver) decorateOperator(op *operator.Operator, isRevert bool, sourceLabel, targetLabel, typ, dim string) { op.SetPriorityLevel(constant.High) op.FinishedCounters = append(op.FinishedCounters, diff --git a/pkg/schedule/schedulers/hot_region_test.go b/pkg/schedule/schedulers/hot_region_test.go index 5f6cca892ee..cfc5196909f 100644 --- a/pkg/schedule/schedulers/hot_region_test.go +++ b/pkg/schedule/schedulers/hot_region_test.go @@ -70,6 +70,11 @@ func clearPendingInfluence(h *hotScheduler) { h.regionPendings = make(map[uint64]*pendingInfluence) } +func newTestRegion(id uint64) *core.RegionInfo { + peers := []*metapb.Peer{{Id: id*100 + 1, StoreId: 1}, {Id: id*100 + 2, StoreId: 2}, {Id: id*100 + 3, StoreId: 3}} + return core.NewRegionInfo(&metapb.Region{Id: id, Peers: peers}, peers[0]) +} + func TestUpgrade(t *testing.T) { re := require.New(t) cancel, _, _, oc := prepareSchedulersTest() @@ -193,20 +198,6 @@ func checkGCPendingOpInfos(re *require.Assertions, enablePlacementRules bool) { } } -func newTestRegion(id uint64) *core.RegionInfo { - peers := []*metapb.Peer{{Id: id*100 + 1, StoreId: 1}, {Id: id*100 + 2, StoreId: 2}, {Id: id*100 + 3, StoreId: 3}} - return core.NewRegionInfo(&metapb.Region{Id: id, Peers: peers}, peers[0]) -} - -func TestHotWriteRegionScheduleByteRateOnly(t *testing.T) { - re := require.New(t) - statistics.Denoising = false - statisticsInterval = 0 - checkHotWriteRegionScheduleByteRateOnly(re, false /* disable placement rules */) - checkHotWriteRegionScheduleByteRateOnly(re, true /* enable placement rules */) - checkHotWriteRegionPlacement(re, true) -} - func TestSplitIfRegionTooHot(t *testing.T) { re := require.New(t) statistics.Denoising = false @@ -395,6 +386,15 @@ func TestSplitBucketsByLoad(t *testing.T) { } } +func TestHotWriteRegionScheduleByteRateOnly(t *testing.T) { + re := require.New(t) + statistics.Denoising = false + statisticsInterval = 0 + checkHotWriteRegionScheduleByteRateOnly(re, false /* disable placement rules */) + checkHotWriteRegionScheduleByteRateOnly(re, true /* enable placement rules */) + checkHotWriteRegionPlacement(re, true) +} + func checkHotWriteRegionPlacement(re *require.Assertions, enablePlacementRules bool) { cancel, _, tc, oc := prepareSchedulersTest() defer cancel() @@ -446,14 +446,13 @@ func checkHotWriteRegionPlacement(re *require.Assertions, enablePlacementRules b tc.RuleManager.DeleteRule("pd", "follower") ops, _ = hb.Schedule(tc, false) re.NotEmpty(ops) - // TODO: fix the test - // re.NotContains(ops[0].Step(1).String(), "transfer leader") + re.NotContains(ops[0].Step(1).String(), "transfer leader") } func checkHotWriteRegionScheduleByteRateOnly(re *require.Assertions, enablePlacementRules bool) { cancel, opt, tc, oc := prepareSchedulersTest() defer cancel() - tc.SetClusterVersion(versioninfo.MinSupportedVersion(versioninfo.Version4_0)) + tc.SetClusterVersion(versioninfo.MinSupportedVersion(versioninfo.ConfChangeV2)) tc.SetEnablePlacementRules(enablePlacementRules) labels := []string{"zone", "host"} tc.SetMaxReplicasWithLabel(enablePlacementRules, 3, labels...) @@ -511,12 +510,14 @@ func checkHotWriteRegionScheduleByteRateOnly(re *require.Assertions, enablePlace switch op.Len() { case 1: // balance by leader selected + re.Equal("transfer-hot-write-leader", op.Desc()) operatorutil.CheckTransferLeaderFrom(re, op, operator.OpHotRegion, 1) - case 4: + case 5: // balance by peer selected + re.Equal("move-hot-write-leader", op.Desc()) if op.RegionID() == 2 { // peer in store 1 of the region 2 can transfer to store 5 or store 6 because of the label - operatorutil.CheckTransferPeerWithLeaderTransferFrom(re, op, operator.OpHotRegion, 1) + operatorutil.CheckTransferPeerWithLeaderTransfer(re, op, operator.OpHotRegion, 1, 0) } else { // peer in store 1 of the region 1,3 can only transfer to store 6 operatorutil.CheckTransferPeerWithLeaderTransfer(re, op, operator.OpHotRegion, 1, 6) @@ -536,10 +537,10 @@ func checkHotWriteRegionScheduleByteRateOnly(re *require.Assertions, enablePlace ops, _ := hb.Schedule(tc, false) op := ops[0] clearPendingInfluence(hb.(*hotScheduler)) - re.Equal(4, op.Len()) + re.Equal(5, op.Len()) if op.RegionID() == 2 { // peer in store 1 of the region 2 can transfer to store 5 or store 6 because of the label - operatorutil.CheckTransferPeerWithLeaderTransferFrom(re, op, operator.OpHotRegion, 1) + operatorutil.CheckTransferPeerWithLeaderTransfer(re, op, operator.OpHotRegion, 1, 0) } else { // peer in store 1 of the region 1,3 can only transfer to store 6 operatorutil.CheckTransferPeerWithLeaderTransfer(re, op, operator.OpHotRegion, 1, 6) @@ -636,7 +637,7 @@ func TestHotWriteRegionScheduleByteRateOnlyWithTiFlash(t *testing.T) { statisticsInterval = 0 cancel, _, tc, oc := prepareSchedulersTest() defer cancel() - tc.SetClusterVersion(versioninfo.MinSupportedVersion(versioninfo.Version4_0)) + tc.SetClusterVersion(versioninfo.MinSupportedVersion(versioninfo.ConfChangeV2)) tc.SetHotRegionCacheHitsThreshold(0) re.NoError(tc.RuleManager.SetRules([]*placement.Rule{ { @@ -730,9 +731,11 @@ func TestHotWriteRegionScheduleByteRateOnlyWithTiFlash(t *testing.T) { switch op.Len() { case 1: // balance by leader selected + re.Equal("transfer-hot-write-leader", op.Desc()) operatorutil.CheckTransferLeaderFrom(re, op, operator.OpHotRegion, 1) case 2: // balance by peer selected + re.Equal("move-hot-write-leader", op.Desc()) operatorutil.CheckTransferLearner(re, op, operator.OpHotRegion, 8, 10) default: re.FailNow("wrong op: " + op.String()) @@ -823,12 +826,14 @@ func TestHotWriteRegionScheduleByteRateOnlyWithTiFlash(t *testing.T) { switch op.Len() { case 1: // balance by leader selected + re.Equal("transfer-hot-write-leader", op.Desc()) operatorutil.CheckTransferLeaderFrom(re, op, operator.OpHotRegion, 1) - case 4: + case 5: // balance by peer selected + re.Equal("move-hot-write-leader", op.Desc()) if op.RegionID() == 2 { // peer in store 1 of the region 2 can transfer to store 5 or store 6 because of the label - operatorutil.CheckTransferPeerWithLeaderTransferFrom(re, op, operator.OpHotRegion, 1) + operatorutil.CheckTransferPeerWithLeaderTransfer(re, op, operator.OpHotRegion, 1, 0) } else { // peer in store 1 of the region 1,3 can only transfer to store 6 operatorutil.CheckTransferPeerWithLeaderTransfer(re, op, operator.OpHotRegion, 1, 6) @@ -1164,9 +1169,11 @@ func checkHotWriteRegionScheduleWithPendingInfluence(re *require.Assertions, dim switch op.Len() { case 1: // balance by leader selected + re.Equal("transfer-hot-write-leader", op.Desc()) operatorutil.CheckTransferLeaderFrom(re, op, operator.OpHotRegion, 1) - case 4: + case 5: // balance by peer selected + re.Equal("move-hot-write-leader", op.Desc()) operatorutil.CheckTransferPeerWithLeaderTransfer(re, op, operator.OpHotRegion, 1, 4) cnt++ if cnt == 3 { diff --git a/pkg/utils/operatorutil/operator_check.go b/pkg/utils/operatorutil/operator_check.go index 5d366cf8fb9..61efd84ef1a 100644 --- a/pkg/utils/operatorutil/operator_check.go +++ b/pkg/utils/operatorutil/operator_check.go @@ -65,11 +65,27 @@ func trimTransferLeaders(op *operator.Operator) (steps []operator.OpStep, lastLe // CheckTransferPeer checks if the operator is to transfer peer between the specified source and target stores. func CheckTransferPeer(re *require.Assertions, op *operator.Operator, kind operator.OpKind, sourceID, targetID uint64) { re.NotNil(op) + var addLearnerTo, removePeerFrom uint64 steps, _ := trimTransferLeaders(op) - re.Len(steps, 3) - re.Equal(targetID, steps[0].(operator.AddLearner).ToStore) - re.IsType(operator.PromoteLearner{}, steps[1]) - re.Equal(sourceID, steps[2].(operator.RemovePeer).FromStore) + switch len(steps) { + case 3: // without joint consensus + re.IsType(operator.AddLearner{}, steps[0]) + re.IsType(operator.PromoteLearner{}, steps[1]) + re.IsType(operator.RemovePeer{}, steps[2]) + addLearnerTo = steps[0].(operator.AddLearner).ToStore + removePeerFrom = steps[2].(operator.RemovePeer).FromStore + case 4: // with joint consensus + re.IsType(operator.AddLearner{}, steps[0]) + re.IsType(operator.ChangePeerV2Enter{}, steps[1]) + re.IsType(operator.ChangePeerV2Leave{}, steps[2]) + re.IsType(operator.RemovePeer{}, steps[3]) + addLearnerTo = steps[0].(operator.AddLearner).ToStore + removePeerFrom = steps[3].(operator.RemovePeer).FromStore + default: + re.FailNow("unexpected operator steps") + } + re.Equal(sourceID, removePeerFrom) + re.Equal(targetID, addLearnerTo) kind |= operator.OpRegion re.Equal(kind, op.Kind()&kind) } @@ -88,32 +104,36 @@ func CheckTransferLearner(re *require.Assertions, op *operator.Operator, kind op // CheckTransferPeerWithLeaderTransfer checks if the operator is to transfer // peer between the specified source and target stores and it meanwhile // transfers the leader out of source store. +// If targetID is 0, it means the operator is to transfer peer to any store. func CheckTransferPeerWithLeaderTransfer(re *require.Assertions, op *operator.Operator, kind operator.OpKind, sourceID, targetID uint64) { re.NotNil(op) + var addLearnerTo, removePeerFrom uint64 steps, lastLeader := trimTransferLeaders(op) - re.Len(steps, 3) - re.Equal(targetID, steps[0].(operator.AddLearner).ToStore) - re.IsType(operator.PromoteLearner{}, steps[1]) - re.Equal(sourceID, steps[2].(operator.RemovePeer).FromStore) + switch len(steps) { + case 3: // without joint consensus + re.IsType(operator.AddLearner{}, steps[0]) + re.IsType(operator.PromoteLearner{}, steps[1]) + re.IsType(operator.RemovePeer{}, steps[2]) + addLearnerTo = steps[0].(operator.AddLearner).ToStore + removePeerFrom = steps[2].(operator.RemovePeer).FromStore + case 4: // with joint consensus + re.IsType(operator.AddLearner{}, steps[0]) + re.IsType(operator.ChangePeerV2Enter{}, steps[1]) + re.IsType(operator.ChangePeerV2Leave{}, steps[2]) + re.IsType(operator.RemovePeer{}, steps[3]) + addLearnerTo = steps[0].(operator.AddLearner).ToStore + removePeerFrom = steps[3].(operator.RemovePeer).FromStore + default: + re.FailNow("unexpected operator steps") + } re.NotZero(lastLeader) re.NotEqual(sourceID, lastLeader) kind |= operator.OpRegion re.Equal(kind, op.Kind()&kind) -} - -// CheckTransferPeerWithLeaderTransferFrom checks if the operator is to transfer -// peer out of the specified store and it meanwhile transfers the leader out of -// the store. -func CheckTransferPeerWithLeaderTransferFrom(re *require.Assertions, op *operator.Operator, kind operator.OpKind, sourceID uint64) { - re.NotNil(op) - steps, lastLeader := trimTransferLeaders(op) - re.IsType(operator.AddLearner{}, steps[0]) - re.IsType(operator.PromoteLearner{}, steps[1]) - re.Equal(sourceID, steps[2].(operator.RemovePeer).FromStore) - re.NotZero(lastLeader) - re.NotEqual(sourceID, lastLeader) - kind |= operator.OpRegion | operator.OpLeader - re.Equal(kind, op.Kind()&kind) + re.Equal(sourceID, removePeerFrom) + if targetID != 0 { + re.Equal(targetID, addLearnerTo) + } } // CheckAddPeer checks if the operator is to add peer on specified store. From 5da6def96a3094385fdfd6d037a74845aaf51e4e Mon Sep 17 00:00:00 2001 From: lhy1024 Date: Fri, 12 Apr 2024 12:51:23 +0800 Subject: [PATCH 3/5] pd-ctl: hidden some hot scheduler config (#7892) ref tikv/pd#5691 Signed-off-by: lhy1024 Co-authored-by: ti-chi-bot[bot] <108142056+ti-chi-bot[bot]@users.noreply.github.com> --- tools/pd-ctl/pdctl/command/scheduler.go | 29 +++++++++++++++ .../pd-ctl/tests/scheduler/scheduler_test.go | 35 +++++++------------ 2 files changed, 42 insertions(+), 22 deletions(-) diff --git a/tools/pd-ctl/pdctl/command/scheduler.go b/tools/pd-ctl/pdctl/command/scheduler.go index d5deba670ad..c1db24cc176 100644 --- a/tools/pd-ctl/pdctl/command/scheduler.go +++ b/tools/pd-ctl/pdctl/command/scheduler.go @@ -649,6 +649,18 @@ func addStoreToSchedulerConfig(cmd *cobra.Command, schedulerName string, args [] postJSON(cmd, path.Join(schedulerConfigPrefix, schedulerName, "config"), input) } +var hiddenHotConfig = []string{ + "max-zombie-rounds", + "max-peer-number", + "byte-rate-rank-step-ratio", + "key-rate-rank-step-ratio", + "query-rate-rank-step-ratio", + "count-rank-step-ratio", + "great-dec-ratio", + "minor-dec-ratio", + "enable-for-tiflash", +} + func listSchedulerConfigCommandFunc(cmd *cobra.Command, args []string) { if len(args) != 0 { cmd.Println(cmd.UsageString()) @@ -667,6 +679,23 @@ func listSchedulerConfigCommandFunc(cmd *cobra.Command, args []string) { cmd.Println(err) return } + if p == "balance-hot-region-scheduler" { + schedulerConfig := make(map[string]any) + err := json.Unmarshal([]byte(r), &schedulerConfig) + if err != nil { + cmd.Println(err) + return + } + for _, config := range hiddenHotConfig { + delete(schedulerConfig, config) + } + b, err := json.MarshalIndent(schedulerConfig, "", " ") + if err != nil { + cmd.Println(err) + return + } + r = string(b) + } cmd.Println(r) } diff --git a/tools/pd-ctl/tests/scheduler/scheduler_test.go b/tools/pd-ctl/tests/scheduler/scheduler_test.go index b5a2128752b..00fab12b99b 100644 --- a/tools/pd-ctl/tests/scheduler/scheduler_test.go +++ b/tools/pd-ctl/tests/scheduler/scheduler_test.go @@ -386,28 +386,19 @@ func (suite *schedulerTestSuite) checkScheduler(cluster *pdTests.TestCluster) { // test hot region config expected1 := map[string]any{ - "min-hot-byte-rate": float64(100), - "min-hot-key-rate": float64(10), - "min-hot-query-rate": float64(10), - "max-zombie-rounds": float64(3), - "max-peer-number": float64(1000), - "byte-rate-rank-step-ratio": 0.05, - "key-rate-rank-step-ratio": 0.05, - "query-rate-rank-step-ratio": 0.05, - "count-rank-step-ratio": 0.01, - "great-dec-ratio": 0.95, - "minor-dec-ratio": 0.99, - "src-tolerance-ratio": 1.05, - "dst-tolerance-ratio": 1.05, - "read-priorities": []any{"byte", "key"}, - "write-leader-priorities": []any{"key", "byte"}, - "write-peer-priorities": []any{"byte", "key"}, - "strict-picking-store": "true", - "enable-for-tiflash": "true", - "rank-formula-version": "v2", - "split-thresholds": 0.2, - "history-sample-duration": "5m0s", - "history-sample-interval": "30s", + "min-hot-byte-rate": float64(100), + "min-hot-key-rate": float64(10), + "min-hot-query-rate": float64(10), + "src-tolerance-ratio": 1.05, + "dst-tolerance-ratio": 1.05, + "read-priorities": []any{"byte", "key"}, + "write-leader-priorities": []any{"key", "byte"}, + "write-peer-priorities": []any{"byte", "key"}, + "strict-picking-store": "true", + "rank-formula-version": "v2", + "split-thresholds": 0.2, + "history-sample-duration": "5m0s", + "history-sample-interval": "30s", } checkHotSchedulerConfig := func(expect map[string]any) { testutil.Eventually(re, func() bool { From c01d8fd76091d5591bd72282a82ead7204a258ba Mon Sep 17 00:00:00 2001 From: Ryan Leung Date: Fri, 12 Apr 2024 16:27:24 +0800 Subject: [PATCH 4/5] *: make `TestTSOKeyspaceGroupSplit` stable (#8059) close tikv/pd#7038 Signed-off-by: Ryan Leung Co-authored-by: ti-chi-bot[bot] <108142056+ti-chi-bot[bot]@users.noreply.github.com> --- tests/integrations/mcs/tso/keyspace_group_manager_test.go | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/tests/integrations/mcs/tso/keyspace_group_manager_test.go b/tests/integrations/mcs/tso/keyspace_group_manager_test.go index f678e901419..9194811cd37 100644 --- a/tests/integrations/mcs/tso/keyspace_group_manager_test.go +++ b/tests/integrations/mcs/tso/keyspace_group_manager_test.go @@ -277,7 +277,10 @@ func (suite *tsoKeyspaceGroupManagerTestSuite) TestTSOKeyspaceGroupSplit() { }) // Wait for the split to complete automatically even there is no TSO request from the outside. testutil.Eventually(re, func() bool { - kg2 := handlersutil.MustLoadKeyspaceGroupByID(re, suite.pdLeaderServer, 2) + kg2, code := handlersutil.TryLoadKeyspaceGroupByID(re, suite.pdLeaderServer, 2) + if code != http.StatusOK { + return false + } re.Equal(uint32(2), kg2.ID) re.Equal([]uint32{222, 333}, kg2.Keyspaces) return !kg2.IsSplitting() From 340c58c02d15fbcf0a529da30c214dea3ed3a3dc Mon Sep 17 00:00:00 2001 From: Ryan Leung Date: Mon, 15 Apr 2024 10:58:35 +0800 Subject: [PATCH 5/5] *: fix client log (#8060) ref tikv/pd#4399 Signed-off-by: Ryan Leung --- client/tso_service_discovery.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/client/tso_service_discovery.go b/client/tso_service_discovery.go index 34ef16f88b0..f88403bb322 100644 --- a/client/tso_service_discovery.go +++ b/client/tso_service_discovery.go @@ -465,7 +465,7 @@ func (c *tsoServiceDiscovery) updateMember() error { oldGroupID := c.GetKeyspaceGroupID() if oldGroupID != keyspaceGroup.Id { log.Info("[tso] the keyspace group changed", - zap.Uint32("keyspace-id", keyspaceGroup.Id), + zap.Uint32("keyspace-id", keyspaceID), zap.Uint32("new-keyspace-group-id", keyspaceGroup.Id), zap.Uint32("old-keyspace-group-id", oldGroupID)) }