Skip to content

Commit

Permalink
schedule: add metrcis for region scatter (#3582) (#3596)
Browse files Browse the repository at this point in the history
  • Loading branch information
ti-srebot authored Apr 16, 2021
1 parent a811014 commit 3dd3c7b
Show file tree
Hide file tree
Showing 4 changed files with 267 additions and 5 deletions.
214 changes: 214 additions & 0 deletions metrics/grafana/pd.json
Original file line number Diff line number Diff line change
Expand Up @@ -6853,6 +6853,220 @@
"title": "Scheduler",
"type": "row"
},
{
"collapsed": true,
"gridPos": {
"h": 1,
"w": 24,
"x": 0,
"y": 19
},
"id": 1437,
"panels": [
{
"aliasColors": {},
"bars": false,
"dashLength": 10,
"dashes": false,
"datasource": "tidb-cluster",
"description": "",
"fill": 0,
"gridPos": {
"h": 8,
"w": 12,
"x": 0,
"y": 20
},
"id": 1433,
"legend": {
"alignAsTable": true,
"avg": true,
"current": true,
"hideEmpty": true,
"hideZero": true,
"max": true,
"min": false,
"rightSide": true,
"show": true,
"total": false,
"values": true
},
"lines": true,
"linewidth": 2,
"links": [],
"nullPointMode": "null",
"percentage": false,
"pointradius": 2,
"points": false,
"renderer": "flot",
"seriesOverrides": [],
"spaceLength": 10,
"stack": false,
"steppedLine": false,
"targets": [
{
"expr": "sum(delta(pd_schedule_scatter_operators_count{tidb_cluster=\"$tidb_cluster\", instance=\"$instance\", type=\"skip\"}[1m])) by (event)",
"format": "time_series",
"intervalFactor": 2,
"legendFormat": "skip-{{event}}",
"refId": "A"
},
{
"expr": "delta(pd_schedule_scatter_operators_count{tidb_cluster=\"$tidb_cluster\", instance=\"$instance\", type=\"fail\"}[1m])",
"format": "time_series",
"intervalFactor": 2,
"legendFormat": "fail",
"refId": "B"
},
{
"expr": "delta(pd_schedule_scatter_operators_count{tidb_cluster=\"$tidb_cluster\", instance=\"$instance\", type=\"success\"}[1m])",
"format": "time_series",
"intervalFactor": 2,
"legendFormat": "success",
"refId": "C"
}
],
"thresholds": [],
"timeFrom": null,
"timeRegions": [],
"timeShift": null,
"title": "scatter operator event",
"tooltip": {
"shared": true,
"sort": 0,
"value_type": "individual"
},
"type": "graph",
"xaxis": {
"buckets": null,
"mode": "time",
"name": null,
"show": true,
"values": []
},
"yaxes": [
{
"format": "opm",
"label": null,
"logBase": 1,
"max": null,
"min": null,
"show": true
},
{
"format": "short",
"label": null,
"logBase": 1,
"max": null,
"min": null,
"show": true
}
],
"yaxis": {
"align": false,
"alignLevel": null
}
},
{
"aliasColors": {},
"bars": false,
"dashLength": 10,
"dashes": false,
"datasource": "tidb-cluster",
"fill": 0,
"gridPos": {
"h": 8,
"w": 12,
"x": 12,
"y": 20
},
"id": 1435,
"legend": {
"alignAsTable": true,
"avg": false,
"current": true,
"hideEmpty": true,
"hideZero": true,
"max": true,
"min": false,
"rightSide": true,
"show": true,
"total": false,
"values": true
},
"lines": true,
"linewidth": 2,
"links": [],
"nullPointMode": "null",
"percentage": false,
"pointradius": 2,
"points": false,
"renderer": "flot",
"seriesOverrides": [],
"spaceLength": 10,
"stack": false,
"steppedLine": false,
"targets": [
{
"expr": "sum(delta(pd_schedule_scatter_distribution{tidb_cluster=\"$tidb_cluster\", instance=\"$instance\",engine=\"tikv\",is_leader=\"false\"}[1m])) by (store)",
"format": "time_series",
"intervalFactor": 1,
"legendFormat": "peer-{{store}}",
"refId": "A"
},
{
"expr": "sum(delta(pd_schedule_scatter_distribution{tidb_cluster=\"$tidb_cluster\", instance=\"$instance\",engine=\"tikv\",is_leader=\"true\"}[1m])) by (store)",
"format": "time_series",
"intervalFactor": 1,
"legendFormat": "leader-{{store}}",
"refId": "B"
}
],
"thresholds": [],
"timeFrom": null,
"timeRegions": [],
"timeShift": null,
"title": "scatter store selection",
"tooltip": {
"shared": true,
"sort": 0,
"value_type": "individual"
},
"type": "graph",
"xaxis": {
"buckets": null,
"mode": "time",
"name": null,
"show": true,
"values": []
},
"yaxes": [
{
"format": "opm",
"label": null,
"logBase": 1,
"max": null,
"min": null,
"show": true
},
{
"format": "short",
"label": null,
"logBase": 1,
"max": null,
"min": null,
"show": true
}
],
"yaxis": {
"align": false,
"alignLevel": null
}
}
],
"title": "Scatter and Splitter",
"type": "row"
},
{
"collapsed": true,
"gridPos": {
Expand Down
4 changes: 3 additions & 1 deletion server/schedule/filter/filters.go
Original file line number Diff line number Diff line change
Expand Up @@ -389,7 +389,7 @@ func (f *StoreStateFilter) anyConditionMatch(typ int, opt *config.PersistOptions
funcs = []conditionFunc{f.isTombstone, f.isOffline, f.isDown, f.isDisconnected, f.isBusy,
f.exceedAddLimit, f.tooManySnapshots, f.tooManyPendingPeers}
case scatterRegionTarget:
funcs = []conditionFunc{f.isTombstone, f.isOffline, f.isDown, f.isDisconnected}
funcs = []conditionFunc{f.isTombstone, f.isOffline, f.isDown, f.isDisconnected, f.isBusy}
}
for _, cf := range funcs {
if cf(opt, store) {
Expand Down Expand Up @@ -686,6 +686,8 @@ const (
EngineKey = "engine"
// EngineTiFlash is the tiflash value of the engine label.
EngineTiFlash = "tiflash"
// EngineTiKV indicates the tikv engine in metrics
EngineTiKV = "tikv"
)

var allSpecialUses = []string{SpecialUseHotRegion, SpecialUseReserved}
Expand Down
18 changes: 18 additions & 0 deletions server/schedule/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,22 @@ var (
Name: "store_limit_cost",
Help: "limit rate cost of store.",
}, []string{"store", "limit_type"})

scatterCounter = prometheus.NewCounterVec(
prometheus.CounterOpts{
Namespace: "pd",
Subsystem: "schedule",
Name: "scatter_operators_count",
Help: "Counter of region scatter operators.",
}, []string{"type", "event"})

scatterDistributionCounter = prometheus.NewCounterVec(
prometheus.CounterOpts{
Namespace: "pd",
Subsystem: "schedule",
Name: "scatter_distribution",
Help: "Counter of the distribution in scatter.",
}, []string{"store", "is_leader", "engine"})
)

func init() {
Expand All @@ -83,4 +99,6 @@ func init() {
prometheus.MustRegister(storeLimitRateGauge)
prometheus.MustRegister(storeLimitCostCounter)
prometheus.MustRegister(operatorWaitCounter)
prometheus.MustRegister(scatterCounter)
prometheus.MustRegister(scatterDistributionCounter)
}
36 changes: 32 additions & 4 deletions server/schedule/region_scatterer.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ import (
"context"
"fmt"
"math"
"strconv"
"sync"
"time"

Expand Down Expand Up @@ -137,6 +138,7 @@ const maxRetryLimit = 30
func (r *RegionScatterer) ScatterRegionsByRange(startKey, endKey []byte, group string, retryLimit int) ([]*operator.Operator, map[uint64]error, error) {
regions := r.cluster.ScanRegions(startKey, endKey, -1)
if len(regions) < 1 {
scatterCounter.WithLabelValues("skip", "empty-region").Inc()
return nil, nil, errors.New("empty region")
}
failures := make(map[uint64]error, len(regions))
Expand All @@ -155,13 +157,16 @@ func (r *RegionScatterer) ScatterRegionsByRange(startKey, endKey []byte, group s
// ScatterRegionsByID directly scatter regions by ScatterRegions
func (r *RegionScatterer) ScatterRegionsByID(regionsID []uint64, group string, retryLimit int) ([]*operator.Operator, map[uint64]error, error) {
if len(regionsID) < 1 {
scatterCounter.WithLabelValues("skip", "empty-region").Inc()
return nil, nil, errors.New("empty region")
}
failures := make(map[uint64]error, len(regionsID))
var regions []*core.RegionInfo
for _, id := range regionsID {
region := r.cluster.GetRegion(id)
if region == nil {
scatterCounter.WithLabelValues("skip", "no-region").Inc()
log.Warn("failed to find region during scatter", zap.Uint64("region-id", id))
failures[id] = errors.New(fmt.Sprintf("failed to find region %v", id))
continue
}
Expand All @@ -187,6 +192,7 @@ func (r *RegionScatterer) ScatterRegionsByID(regionsID []uint64, group string, r
// and the value of the failures indicates the failure error.
func (r *RegionScatterer) ScatterRegions(regions map[uint64]*core.RegionInfo, failures map[uint64]error, group string, retryLimit int) ([]*operator.Operator, error) {
if len(regions) < 1 {
scatterCounter.WithLabelValues("skip", "empty-region").Inc()
return nil, errors.New("empty region")
}
if retryLimit > maxRetryLimit {
Expand Down Expand Up @@ -226,14 +232,20 @@ func (r *RegionScatterer) ScatterRegions(regions map[uint64]*core.RegionInfo, fa
func (r *RegionScatterer) Scatter(region *core.RegionInfo, group string) (*operator.Operator, error) {
if !opt.IsRegionReplicated(r.cluster, region) {
r.cluster.AddSuspectRegions(region.GetID())
scatterCounter.WithLabelValues("skip", "not-replicated").Inc()
log.Warn("region not replicated during scatter", zap.Uint64("region-id", region.GetID()))
return nil, errors.Errorf("region %d is not fully replicated", region.GetID())
}

if region.GetLeader() == nil {
scatterCounter.WithLabelValues("skip", "no-leader").Inc()
log.Warn("region no leader during scatter", zap.Uint64("region-id", region.GetID()))
return nil, errors.Errorf("region %d has no leader", region.GetID())
}

if r.cluster.IsRegionHot(region) {
scatterCounter.WithLabelValues("skip", "hot").Inc()
log.Warn("region too hot during scatter", zap.Uint64("region-id", region.GetID()))
return nil, errors.Errorf("region %d is hot", region.GetID())
}

Expand Down Expand Up @@ -286,15 +298,19 @@ func (r *RegionScatterer) scatterRegion(region *core.RegionInfo, group string) *

op, err := operator.CreateScatterRegionOperator("scatter-region", r.cluster, region, targetPeers, targetLeader)
if err != nil {
scatterCounter.WithLabelValues("fail", "").Inc()
for _, peer := range region.GetPeers() {
targetPeers[peer.GetStoreId()] = peer
}
r.Put(targetPeers, region.GetLeader().GetStoreId(), group)
log.Debug("fail to create scatter region operator", errs.ZapError(err))
return nil
}
r.Put(targetPeers, targetLeader, group)
op.SetPriorityLevel(core.HighPriority)
if op != nil {
scatterCounter.WithLabelValues("success", "").Inc()
r.Put(targetPeers, targetLeader, group)
op.SetPriorityLevel(core.HighPriority)
}
return op
}

Expand All @@ -305,15 +321,15 @@ func (r *RegionScatterer) selectCandidates(region *core.RegionInfo, sourceStoreI
return nil
}
filters := []filter.Filter{
filter.NewExcludedFilter("scatter-region", nil, selectedStores),
filter.NewExcludedFilter(r.name, nil, selectedStores),
}
scoreGuard := filter.NewPlacementSafeguard(r.name, r.cluster, region, sourceStore)
filters = append(filters, context.filters...)
filters = append(filters, scoreGuard)
stores := r.cluster.GetStores()
candidates := make([]uint64, 0)
for _, store := range stores {
if filter.Target(r.cluster.GetOpts(), store, filters) && !store.IsBusy() {
if filter.Target(r.cluster.GetOpts(), store, filters) {
candidates = append(candidates, store.GetID())
}
}
Expand Down Expand Up @@ -375,10 +391,22 @@ func (r *RegionScatterer) Put(peers map[uint64]*metapb.Peer, leaderStoreID uint6
store := r.cluster.GetStore(storeID)
if ordinaryFilter.Target(r.cluster.GetOpts(), store) {
r.ordinaryEngine.selectedPeer.Put(storeID, group)
scatterDistributionCounter.WithLabelValues(
fmt.Sprintf("%v", storeID),
strconv.FormatBool(false),
filter.EngineTiKV).Inc()
} else {
engine := store.GetLabelValue(filter.EngineKey)
r.specialEngines[engine].selectedPeer.Put(storeID, group)
scatterDistributionCounter.WithLabelValues(
fmt.Sprintf("%v", storeID),
strconv.FormatBool(false),
engine).Inc()
}
}
r.ordinaryEngine.selectedLeader.Put(leaderStoreID, group)
scatterDistributionCounter.WithLabelValues(
fmt.Sprintf("%v", leaderStoreID),
strconv.FormatBool(true),
filter.EngineTiKV).Inc()
}

0 comments on commit 3dd3c7b

Please sign in to comment.