Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

kv/client: add cached regions metric in region router (#2994) #3034

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 8 additions & 0 deletions cdc/kv/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,13 @@ var (
Name: "region_token",
Help: "size of region token in kv client",
}, []string{"store", "changefeed", "capture"})
cachedRegionSize = prometheus.NewGaugeVec(
prometheus.GaugeOpts{
Namespace: "ticdc",
Subsystem: "kvclient",
Name: "cached_region",
Help: "cached region that has not requested to TiKV in kv client",
}, []string{"store", "changefeed", "capture"})
batchResolvedEventSize = prometheus.NewHistogramVec(
prometheus.HistogramOpts{
Namespace: "ticdc",
Expand Down Expand Up @@ -113,6 +120,7 @@ func InitMetrics(registry *prometheus.Registry) {
registry.MustRegister(sendEventCounter)
registry.MustRegister(clientChannelSize)
registry.MustRegister(clientRegionTokenSize)
registry.MustRegister(cachedRegionSize)
registry.MustRegister(batchResolvedEventSize)
registry.MustRegister(etcdRequestCounter)
registry.MustRegister(grpcPoolStreamGauge)
Expand Down
25 changes: 20 additions & 5 deletions cdc/kv/token_region.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,22 +45,28 @@ type LimitRegionRouter interface {
Run(ctx context.Context) error
}

// srrMetrics keeps metrics of a Sized Region Router
type srrMetrics struct {
capture string
changefeed string
tokens map[string]prometheus.Gauge
// mapping from id(TiKV store address) to token used
tokens map[string]prometheus.Gauge
// mapping from id(TiKV store address) to cached regions
cachedRegions map[string]prometheus.Gauge
}

func newSrrMetrics(ctx context.Context) *srrMetrics {
captureAddr := util.CaptureAddrFromCtx(ctx)
changefeed := util.ChangefeedIDFromCtx(ctx)
return &srrMetrics{
capture: captureAddr,
changefeed: changefeed,
tokens: make(map[string]prometheus.Gauge),
capture: captureAddr,
changefeed: changefeed,
tokens: make(map[string]prometheus.Gauge),
cachedRegions: make(map[string]prometheus.Gauge),
}
}

// each changefeed on a capture maintains a sizedRegionRouter
type sizedRegionRouter struct {
buffer map[string][]singleRegionInfo
output chan singleRegionInfo
Expand Down Expand Up @@ -96,10 +102,16 @@ func (r *sizedRegionRouter) AddRegion(sri singleRegionInfo) {
r.output <- sri
} else {
r.buffer[id] = append(r.buffer[id], sri)
if _, ok := r.metrics.cachedRegions[id]; !ok {
r.metrics.cachedRegions[id] = cachedRegionSize.WithLabelValues(id, r.metrics.changefeed, r.metrics.capture)
}
r.metrics.cachedRegions[id].Inc()
}
r.lock.Unlock()
}

// Acquire implements LimitRegionRouter.Acquire
// param: id is TiKV store address
func (r *sizedRegionRouter) Acquire(id string) {
r.lock.Lock()
defer r.lock.Unlock()
Expand All @@ -110,6 +122,8 @@ func (r *sizedRegionRouter) Acquire(id string) {
r.metrics.tokens[id].Inc()
}

// Release implements LimitRegionRouter.Release
// param: id is TiKV store address
func (r *sizedRegionRouter) Release(id string) {
r.lock.Lock()
defer r.lock.Unlock()
Expand All @@ -131,7 +145,7 @@ func (r *sizedRegionRouter) Run(ctx context.Context) error {
r.lock.Lock()
for id, buf := range r.buffer {
available := r.sizeLimit - r.tokens[id]
// the tokens used could be more then size limit, since we have
// the tokens used could be more than size limit, since we have
// a sized channel as level1 cache
if available <= 0 {
continue
Expand All @@ -156,6 +170,7 @@ func (r *sizedRegionRouter) Run(ctx context.Context) error {
}
}
r.buffer[id] = r.buffer[id][available:]
r.metrics.cachedRegions[id].Sub(float64(available))
}
r.lock.Unlock()
}
Expand Down
212 changes: 210 additions & 2 deletions metrics/grafana/ticdc.json
Original file line number Diff line number Diff line change
Expand Up @@ -5988,6 +5988,214 @@
"align": false,
"alignLevel": null
}
},
{
"aliasColors": {},
"bars": false,
"dashLength": 10,
"dashes": false,
"datasource": "${DS_TEST-CLUSTER}",
"description": "The number of regions that have not connected to TiKV",
"fieldConfig": {
"defaults": {
"links": []
},
"overrides": []
},
"fill": 1,
"fillGradient": 0,
"gridPos": {
"h": 7,
"w": 12,
"x": 0,
"y": 81
},
"hiddenSeries": false,
"id": 251,
"legend": {
"alignAsTable": true,
"avg": false,
"current": true,
"hideEmpty": true,
"hideZero": true,
"max": true,
"min": false,
"rightSide": true,
"show": true,
"total": false,
"values": true
},
"lines": true,
"linewidth": 1,
"links": [],
"nullPointMode": "null",
"options": {
"alertThreshold": true
},
"paceLength": 10,
"percentage": false,
"pointradius": 2,
"points": false,
"renderer": "flot",
"seriesOverrides": [],
"spaceLength": 10,
"stack": false,
"steppedLine": false,
"targets": [
{
"exemplar": true,
"expr": "sum(ticdc_kvclient_cached_region{tidb_cluster=\"$tidb_cluster\", changefeed=~\"$changefeed\", capture=~\"$capture\"}) by (capture, changefeed, store)",
"format": "time_series",
"interval": "",
"intervalFactor": 1,
"legendFormat": "{{changefeed}}-{{capture}}-{{store}}",
"refId": "A"
}
],
"thresholds": [],
"timeFrom": null,
"timeRegions": [],
"timeShift": null,
"title": "KV client cached regions",
"tooltip": {
"shared": true,
"sort": 0,
"value_type": "individual"
},
"type": "graph",
"xaxis": {
"buckets": null,
"mode": "time",
"name": null,
"show": true,
"values": []
},
"yaxes": [
{
"format": "short",
"label": null,
"logBase": 1,
"max": null,
"min": null,
"show": true
},
{
"format": "short",
"label": null,
"logBase": 1,
"max": null,
"min": null,
"show": true
}
],
"yaxis": {
"align": false,
"alignLevel": null
}
},
{
"aliasColors": {},
"bars": false,
"dashLength": 10,
"dashes": false,
"datasource": "${DS_TEST-CLUSTER}",
"description": "Estimate the remaining time for a changefeed initialization (on a specific capture)",
"fieldConfig": {
"defaults": {
"unit": "s"
},
"overrides": []
},
"fill": 1,
"fillGradient": 0,
"gridPos": {
"h": 7,
"w": 12,
"x": 12,
"y": 81
},
"hiddenSeries": false,
"id": 252,
"legend": {
"alignAsTable": true,
"avg": false,
"current": true,
"hideEmpty": true,
"hideZero": true,
"max": true,
"min": false,
"rightSide": true,
"show": true,
"total": false,
"values": true
},
"lines": true,
"linewidth": 1,
"links": [],
"nullPointMode": "null",
"options": {
"alertThreshold": true
},
"paceLength": 10,
"percentage": false,
"pointradius": 2,
"points": false,
"renderer": "flot",
"seriesOverrides": [],
"spaceLength": 10,
"stack": false,
"steppedLine": false,
"targets": [
{
"exemplar": true,
"expr": "abs(sum(ticdc_kvclient_cached_region{tidb_cluster=\"$tidb_cluster\", changefeed=~\"$changefeed\", capture=~\"$capture\"} / deriv(ticdc_kvclient_cached_region{tidb_cluster=\"$tidb_cluster\", changefeed=~\"$changefeed\", capture=~\"$capture\"}[1m])) by (capture, changefeed, store))",
"format": "time_series",
"interval": "",
"intervalFactor": 1,
"legendFormat": "{{changefeed}}-{{capture}}-{{store}}",
"refId": "A"
}
],
"thresholds": [],
"timeFrom": null,
"timeRegions": [],
"timeShift": null,
"title": "Estimate remaining time for initialization",
"tooltip": {
"shared": true,
"sort": 0,
"value_type": "individual"
},
"type": "graph",
"xaxis": {
"buckets": null,
"mode": "time",
"name": null,
"show": true,
"values": []
},
"yaxes": [
{
"format": "s",
"label": "",
"logBase": 2,
"max": null,
"min": null,
"show": true
},
{
"format": "short",
"label": null,
"logBase": 1,
"max": null,
"min": null,
"show": true
}
],
"yaxis": {
"align": false,
"alignLevel": null
}
}
],
"title": "Events",
Expand Down Expand Up @@ -9216,5 +9424,5 @@
"timezone": "browser",
"title": "Test-Cluster-TiCDC",
"uid": "YiGL8hBZ1",
"version": 22
}
"version": 23
}