-
Notifications
You must be signed in to change notification settings - Fork 3.8k
/
metrics.go
189 lines (168 loc) · 6.44 KB
/
metrics.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
// Copyright 2018 The Cockroach Authors.
//
// Use of this software is governed by the Business Source License
// included in the file licenses/BSL.txt.
//
// As of the Change Date specified in that file, in accordance with
// the Business Source License, use of this software will be governed
// by the Apache License, Version 2.0, included in the file
// licenses/APL.txt.
package jobs
import (
"fmt"
"strings"
"time"
"github.com/cockroachdb/cockroach/pkg/jobs/jobspb"
"github.com/cockroachdb/cockroach/pkg/server/telemetry"
"github.com/cockroachdb/cockroach/pkg/util/metric"
io_prometheus_client "github.com/prometheus/client_model/go"
)
// Metrics are for production monitoring of each job type.
type Metrics struct {
JobMetrics [jobspb.NumJobTypes]*JobTypeMetrics
Changefeed metric.Struct
}
// JobTypeMetrics is a metric.Struct containing metrics for each type of job.
type JobTypeMetrics struct {
CurrentlyRunning *metric.Gauge
ResumeCompleted *metric.Counter
ResumeRetryError *metric.Counter
ResumeFailed *metric.Counter
FailOrCancelCompleted *metric.Counter
FailOrCancelRetryError *metric.Counter
FailOrCancelFailed *metric.Counter
}
// MetricStruct implements the metric.Struct interface.
func (JobTypeMetrics) MetricStruct() {}
func makeMetaCurrentlyRunning(typeStr string) metric.Metadata {
return metric.Metadata{
Name: fmt.Sprintf("jobs.%s.currently_running", typeStr),
Help: fmt.Sprintf("Number of %s jobs currently running in Resume or OnFailOrCancel state",
typeStr),
Measurement: "jobs",
Unit: metric.Unit_COUNT,
MetricType: io_prometheus_client.MetricType_GAUGE,
}
}
func makeMetaResumeCompeted(typeStr string) metric.Metadata {
return metric.Metadata{
Name: fmt.Sprintf("jobs.%s.resume_completed", typeStr),
Help: fmt.Sprintf("Number of %s jobs which successfully resumed to completion",
typeStr),
Measurement: "jobs",
Unit: metric.Unit_COUNT,
MetricType: io_prometheus_client.MetricType_GAUGE,
}
}
func makeMetaResumeRetryError(typeStr string) metric.Metadata {
return metric.Metadata{
Name: fmt.Sprintf("jobs.%s.resume_retry_error", typeStr),
Help: fmt.Sprintf("Number of %s jobs which failed with a retriable error",
typeStr),
Measurement: "jobs",
Unit: metric.Unit_COUNT,
MetricType: io_prometheus_client.MetricType_GAUGE,
}
}
func makeMetaResumeFailed(typeStr string) metric.Metadata {
return metric.Metadata{
Name: fmt.Sprintf("jobs.%s.resume_failed", typeStr),
Help: fmt.Sprintf("Number of %s jobs which failed with a non-retriable error",
typeStr),
Measurement: "jobs",
Unit: metric.Unit_COUNT,
MetricType: io_prometheus_client.MetricType_GAUGE,
}
}
func makeMetaFailOrCancelCompeted(typeStr string) metric.Metadata {
return metric.Metadata{
Name: fmt.Sprintf("jobs.%s.fail_or_cancel_completed", typeStr),
Help: fmt.Sprintf("Number of %s jobs which successfully completed "+
"their failure or cancelation process",
typeStr),
Measurement: "jobs",
Unit: metric.Unit_COUNT,
MetricType: io_prometheus_client.MetricType_GAUGE,
}
}
func makeMetaFailOrCancelRetryError(typeStr string) metric.Metadata {
return metric.Metadata{
Name: fmt.Sprintf("jobs.%s.fail_or_cancel_retry_error", typeStr),
Help: fmt.Sprintf("Number of %s jobs which failed with a retriable "+
"error on their failure or cancelation process",
typeStr),
Measurement: "jobs",
Unit: metric.Unit_COUNT,
MetricType: io_prometheus_client.MetricType_GAUGE,
}
}
func makeMetaFailOrCancelFailed(typeStr string) metric.Metadata {
return metric.Metadata{
Name: fmt.Sprintf("jobs.%s.fail_or_cancel_failed", typeStr),
Help: fmt.Sprintf("Number of %s jobs which failed with a "+
"non-retriable error on their failure or cancelation process",
typeStr),
Measurement: "jobs",
Unit: metric.Unit_COUNT,
MetricType: io_prometheus_client.MetricType_GAUGE,
}
}
// MetricStruct implements the metric.Struct interface.
func (Metrics) MetricStruct() {}
// init initializes the metrics for job monitoring.
func (m *Metrics) init(histogramWindowInterval time.Duration) {
if MakeChangefeedMetricsHook != nil {
m.Changefeed = MakeChangefeedMetricsHook(histogramWindowInterval)
}
for i := 0; i < jobspb.NumJobTypes; i++ {
jt := jobspb.Type(i)
if jt == jobspb.TypeUnspecified { // do not track TypeUnspecified
continue
}
typeStr := strings.ToLower(strings.Replace(jt.String(), " ", "_", -1))
m.JobMetrics[jt] = &JobTypeMetrics{
CurrentlyRunning: metric.NewGauge(makeMetaCurrentlyRunning(typeStr)),
ResumeCompleted: metric.NewCounter(makeMetaResumeCompeted(typeStr)),
ResumeRetryError: metric.NewCounter(makeMetaResumeRetryError(typeStr)),
ResumeFailed: metric.NewCounter(makeMetaResumeFailed(typeStr)),
FailOrCancelCompleted: metric.NewCounter(makeMetaFailOrCancelCompeted(typeStr)),
FailOrCancelRetryError: metric.NewCounter(makeMetaFailOrCancelRetryError(typeStr)),
FailOrCancelFailed: metric.NewCounter(makeMetaFailOrCancelFailed(typeStr)),
}
}
}
// MakeChangefeedMetricsHook allows for registration of changefeed metrics from
// ccl code.
var MakeChangefeedMetricsHook func(time.Duration) metric.Struct
// JobTelemetryMetrics is a telemetry metrics for individual job types.
type JobTelemetryMetrics struct {
Successful telemetry.Counter
Failed telemetry.Counter
Canceled telemetry.Counter
}
// newJobTelemetryMetrics creates a new JobTelemetryMetrics object
// for a given job type name.
func newJobTelemetryMetrics(jobName string) *JobTelemetryMetrics {
return &JobTelemetryMetrics{
Successful: telemetry.GetCounterOnce(fmt.Sprintf("sql.schema.job.%s.successful", jobName)),
Failed: telemetry.GetCounterOnce(fmt.Sprintf("sql.schema.job.%s.failed", jobName)),
Canceled: telemetry.GetCounterOnce(fmt.Sprintf("sql.schema.job.%s.canceled", jobName)),
}
}
// getJobTelemetryMetricsArray initializes an array of job related telemetry
// metrics
func getJobTelemetryMetricsArray() [jobspb.NumJobTypes]*JobTelemetryMetrics {
var metrics [jobspb.NumJobTypes]*JobTelemetryMetrics
for i := 0; i < jobspb.NumJobTypes; i++ {
jt := jobspb.Type(i)
if jt == jobspb.TypeUnspecified { // do not track TypeUnspecified
continue
}
typeStr := strings.ToLower(strings.Replace(jt.String(), " ", "_", -1))
metrics[i] = newJobTelemetryMetrics(typeStr)
}
return metrics
}
// TelemetryMetrics contains telemetry metrics for different
// job types.
var TelemetryMetrics = getJobTelemetryMetricsArray()