@@ -29,6 +29,7 @@ import (
29
29
"github.com/prometheus/common/model"
30
30
clientset "k8s.io/client-go/kubernetes"
31
31
"k8s.io/klog/v2"
32
+
32
33
)
33
34
34
35
const (
@@ -37,6 +38,9 @@ const (
37
38
probeIntervalDefault = 30 * time .Second
38
39
ksmNamespace = "kube-state-metrics-perf-test"
39
40
ksmServiceName = "kube-state-metrics"
41
+ crsmLatencyName = "CustomResourceStateMetricsLatency"
42
+ crsmNamespace = "custom-resource-state-metrics-perf-test"
43
+ crsmServiceName = "custom-resource-state-metrics"
40
44
ksmSelfPort = 8081
41
45
ksmMetricsPort = 8080
42
46
)
@@ -53,10 +57,25 @@ type ksmLatencyMeasurement struct {
53
57
wg sync.WaitGroup
54
58
}
55
59
60
+ type crsmLatencyMeasurement struct {
61
+ ctx context.Context
62
+ cancel func ()
63
+ isRunning bool
64
+ namespace string
65
+ serviceName string
66
+ metricsPort int
67
+ selfPort int
68
+ initialLatency * measurementutil.Histogram
69
+ wg sync.WaitGroup
70
+ }
71
+
56
72
func init () {
57
73
if err := measurement .Register (ksmLatencyName , CreateKSMLatencyMeasurement ); err != nil {
58
74
klog .Fatalf ("Cannot register %s: %v" , ksmLatencyName , err )
59
75
}
76
+ if err := measurement .Register (crsmLatencyName , CreateCRSMLatencyMeasurement ); err != nil {
77
+ klog .Fatalf ("Cannot register %s: %v" , crsmLatencyName , err )
78
+ }
60
79
}
61
80
62
81
// CreateKSMLatencyMeasurement creates a new Kube State
@@ -73,6 +92,19 @@ func CreateKSMLatencyMeasurement() measurement.Measurement {
73
92
}
74
93
}
75
94
95
+ // CreateCRSMLatencyMeasurement creates a new Custom Resource State Metrics Measurement.
96
+ func CreateCRSMLatencyMeasurement () measurement.Measurement {
97
+ ctx , cancel := context .WithCancel (context .Background ())
98
+ return & crsmLatencyMeasurement {
99
+ namespace : crsmNamespace ,
100
+ serviceName : crsmServiceName ,
101
+ selfPort : ksmSelfPort ,
102
+ metricsPort : ksmMetricsPort ,
103
+ ctx : ctx ,
104
+ cancel : cancel ,
105
+ }
106
+ }
107
+
76
108
// Execute supports two actions:
77
109
// - start - starts goroutine and queries /metrics every probeIntervalDefault interval,
78
110
// it also collects initial latency metrics.
@@ -100,35 +132,73 @@ func (m *ksmLatencyMeasurement) Execute(config *measurement.Config) ([]measureme
100
132
// the scrape interval so we should cancel.
101
133
m .startQuerying (m .ctx , client , probeIntervalDefault )
102
134
// Retrieve initial latency when first call is done.
103
- m .initialLatency , err = m .retrieveKSMLatencyMetrics (m .ctx , client )
135
+ m .initialLatency , err = m .retrieveLatencyMetrics (m .ctx , client )
104
136
return nil , err
105
137
case "gather" :
106
138
defer m .cancel ()
107
- return m .createKSMLatencySummary (m .ctx , client )
139
+ return m .createLatencySummary (m .ctx , client )
108
140
default :
109
141
return nil , fmt .Errorf ("unknown action %v" , action )
110
142
}
111
143
}
112
144
113
- func (m * ksmLatencyMeasurement ) stop () error {
114
- if ! m .isRunning {
115
- return fmt .Errorf ("%s: measurement was not running" , m )
145
+ // Execute for crsmLatencyMeasurement
146
+ func (m * crsmLatencyMeasurement ) Execute (config * measurement.Config ) ([]measurement.Summary , error ) {
147
+ if ! config .CloudProvider .Features ().SupportKubeStateMetrics {
148
+ klog .Infof ("not executing CRSMLatencyMeasurement: unsupported for provider, %s" , config .ClusterFramework .GetClusterConfig ().Provider .Name ())
149
+ return nil , nil
116
150
}
117
- m .cancel ()
118
- m .wg .Wait ()
119
- return nil
120
- }
121
-
122
- // createKSMLatencyReport gathers the latency one last time and creates the summary based on the Quantile from the sub histograms.
123
- // Afterwards it creates the Summary Report.
124
- func (m * ksmLatencyMeasurement ) createKSMLatencySummary (ctx context.Context , client clientset.Interface ) ([]measurement.Summary , error ) {
125
- latestLatency , err := m .retrieveKSMLatencyMetrics (ctx , client )
151
+ action , err := util .GetString (config .Params , "action" )
126
152
if err != nil {
127
153
return nil , err
128
154
}
129
- if err = m .stop (); err != nil {
155
+ client := config .ClusterFramework .GetClientSets ().GetClient ()
156
+ switch action {
157
+ case "start" :
158
+ if m .isRunning {
159
+ klog .V (2 ).Infof ("%s: measurement already running" , m )
160
+ return nil , nil
161
+ }
162
+ m .startQuerying (m .ctx , client , probeIntervalDefault )
163
+ m .initialLatency , err = m .retrieveLatencyMetrics (m .ctx , client )
164
+ return nil , err
165
+ case "gather" :
166
+ defer m .cancel ()
167
+ return m .createLatencySummary (m .ctx , client )
168
+ default :
169
+ return nil , fmt .Errorf ("unknown action %v" , action )
170
+ }
171
+ }
172
+
173
+ func getMetricsFromService (ctx context.Context , client clientset.Interface , namespace , serviceName string , port int ) (string , error ) {
174
+ ctx , cancel := context .WithTimeout (ctx , 2 * time .Minute )
175
+ defer cancel ()
176
+ out , err := client .CoreV1 ().RESTClient ().Get ().
177
+ Resource ("services" ).
178
+ SubResource ("proxy" ).
179
+ Namespace (namespace ).
180
+ Name (fmt .Sprintf ("%v:%v" , serviceName , port )).
181
+ Suffix ("metrics" ).
182
+ Do (ctx ).Raw ()
183
+ return string (out ), err
184
+ }
185
+
186
+ func (m * ksmLatencyMeasurement ) stop () error {
187
+ if ! m .isRunning {
188
+ return fmt .Errorf ("%s: measurement was not running" , m )
189
+ }
190
+ m .isRunning = false
191
+ m .cancel ()
192
+ m .wg .Wait ()
193
+ return nil
194
+ }
195
+
196
+ func (m * ksmLatencyMeasurement ) createLatencySummary (ctx context.Context , client clientset.Interface ) ([]measurement.Summary , error ) {
197
+ latestLatency , err := m .retrieveLatencyMetrics (ctx , client )
198
+ if err != nil {
130
199
return nil , err
131
200
}
201
+ m .stop ()
132
202
// We want to subtract the latest histogram from the first one we collect.
133
203
finalLatency := HistogramSub (latestLatency , m .initialLatency )
134
204
// Pretty Print the report.
@@ -149,61 +219,39 @@ func (m *ksmLatencyMeasurement) createKSMLatencySummary(ctx context.Context, cli
149
219
func (m * ksmLatencyMeasurement ) startQuerying (ctx context.Context , client clientset.Interface , interval time.Duration ) {
150
220
m .isRunning = true
151
221
m .wg .Add (1 )
152
- go m .queryLoop (ctx , client , interval )
153
- }
154
-
155
- func (m * ksmLatencyMeasurement ) queryLoop (ctx context.Context , client clientset.Interface , interval time.Duration ) {
156
- defer m .wg .Done ()
157
- for {
158
- select {
159
- case <- ctx .Done ():
160
- return
161
- case <- time .After (interval ):
162
- var output string
163
- output , err := m .getMetricsFromService (ctx , client , m .metricsPort )
164
- if err != nil {
165
- klog .V (2 ).Infof ("error during fetching metrics from service: %v" , err )
166
- }
167
- if output == "" {
168
- klog .V (2 ).Infof ("/metrics endpoint of kube-state-metrics returned no data in namespace: %s from service: %s port: %d" , m .namespace , m .serviceName , m .metricsPort )
222
+ go func () {
223
+ defer m .wg .Done ()
224
+ for {
225
+ select {
226
+ case <- ctx .Done ():
227
+ return
228
+ case <- time .After (interval ):
229
+ _ , err := getMetricsFromService (ctx , client , m .namespace , m .serviceName , m .metricsPort )
230
+ if err != nil {
231
+ klog .V (2 ).Infof ("error during fetching metrics from service: %v" , err )
232
+ }
169
233
}
170
-
171
234
}
172
- }
235
+ }()
173
236
}
174
237
175
- func (m * ksmLatencyMeasurement ) retrieveKSMLatencyMetrics (ctx context.Context , c clientset.Interface ) (* measurementutil.Histogram , error ) {
176
- ksmHist := measurementutil .NewHistogram (nil )
177
- output , err := m . getMetricsFromService (ctx , c , m .selfPort )
238
+ func (m * ksmLatencyMeasurement ) retrieveLatencyMetrics (ctx context.Context , c clientset.Interface ) (* measurementutil.Histogram , error ) {
239
+ hist := measurementutil .NewHistogram (nil )
240
+ output , err := getMetricsFromService (ctx , c , m . namespace , m . serviceName , m .selfPort )
178
241
if err != nil {
179
- return ksmHist , err
242
+ return hist , err
180
243
}
181
244
samples , err := measurementutil .ExtractMetricSamples (output )
182
245
if err != nil {
183
- return ksmHist , err
246
+ return hist , err
184
247
}
185
248
for _ , sample := range samples {
186
- switch sample .Metric [model .MetricNameLabel ] {
187
- case ksmRequestDurationMetricName :
188
- measurementutil .ConvertSampleToHistogram (sample , ksmHist )
249
+ if sample .Metric [model .MetricNameLabel ] == ksmRequestDurationMetricName {
250
+ measurementutil .ConvertSampleToHistogram (sample , hist )
189
251
}
190
252
}
191
- return ksmHist , nil
253
+ return hist , nil
192
254
}
193
-
194
- func (m * ksmLatencyMeasurement ) getMetricsFromService (ctx context.Context , client clientset.Interface , port int ) (string , error ) {
195
- ctx , cancel := context .WithTimeout (ctx , 2 * time .Minute )
196
- defer cancel ()
197
- out , err := client .CoreV1 ().RESTClient ().Get ().
198
- Resource ("services" ).
199
- SubResource ("proxy" ).
200
- Namespace (m .namespace ).
201
- Name (fmt .Sprintf ("%v:%v" , m .serviceName , port )).
202
- Suffix ("metrics" ).
203
- Do (ctx ).Raw ()
204
- return string (out ), err
205
- }
206
-
207
255
// Dispose cleans up after the measurement.
208
256
func (m * ksmLatencyMeasurement ) Dispose () {
209
257
if err := m .stop (); err != nil {
@@ -215,3 +263,78 @@ func (m *ksmLatencyMeasurement) Dispose() {
215
263
func (m * ksmLatencyMeasurement ) String () string {
216
264
return ksmLatencyName
217
265
}
266
+
267
+ func (m * crsmLatencyMeasurement ) stop () error {
268
+ if ! m .isRunning {
269
+ return fmt .Errorf ("%s: measurement was not running" , m )
270
+ }
271
+ m .isRunning = false
272
+ m .cancel ()
273
+ m .wg .Wait ()
274
+ return nil
275
+ }
276
+
277
+ func (m * crsmLatencyMeasurement ) createLatencySummary (ctx context.Context , client clientset.Interface ) ([]measurement.Summary , error ) {
278
+ latestLatency , err := m .retrieveLatencyMetrics (ctx , client )
279
+ if err != nil {
280
+ return nil , err
281
+ }
282
+ m .stop ()
283
+ finalLatency := HistogramSub (latestLatency , m .initialLatency )
284
+ result := & measurementutil.LatencyMetric {}
285
+ if err = SetQuantileFromHistogram (result , finalLatency ); err != nil {
286
+ return nil , err
287
+ }
288
+ content , err := util .PrettyPrintJSON (result )
289
+ if err != nil {
290
+ return nil , err
291
+ }
292
+ return []measurement.Summary {measurement .CreateSummary (crsmLatencyName , "json" , content )}, nil
293
+ }
294
+
295
+ func (m * crsmLatencyMeasurement ) startQuerying (ctx context.Context , client clientset.Interface , interval time.Duration ) {
296
+ m .isRunning = true
297
+ m .wg .Add (1 )
298
+ go func () {
299
+ defer m .wg .Done ()
300
+ for {
301
+ select {
302
+ case <- ctx .Done ():
303
+ return
304
+ case <- time .After (interval ):
305
+ _ , err := getMetricsFromService (ctx , client , m .namespace , m .serviceName , m .metricsPort )
306
+ if err != nil {
307
+ klog .V (2 ).Infof ("error during fetching metrics from service: %v" , err )
308
+ }
309
+ }
310
+ }
311
+ }()
312
+ }
313
+
314
+ func (m * crsmLatencyMeasurement ) retrieveLatencyMetrics (ctx context.Context , c clientset.Interface ) (* measurementutil.Histogram , error ) {
315
+ hist := measurementutil .NewHistogram (nil )
316
+ output , err := getMetricsFromService (ctx , c , m .namespace , m .serviceName , m .selfPort )
317
+ if err != nil {
318
+ return hist , err
319
+ }
320
+ samples , err := measurementutil .ExtractMetricSamples (output )
321
+ if err != nil {
322
+ return hist , err
323
+ }
324
+ for _ , sample := range samples {
325
+ if sample .Metric [model .MetricNameLabel ] == ksmRequestDurationMetricName {
326
+ measurementutil .ConvertSampleToHistogram (sample , hist )
327
+ }
328
+ }
329
+ return hist , nil
330
+ }
331
+
332
+ func (m * crsmLatencyMeasurement ) Dispose () {
333
+ if err := m .stop (); err != nil {
334
+ klog .V (2 ).Infof ("error during dispose call: %v" , err )
335
+ }
336
+ }
337
+
338
+ func (m * crsmLatencyMeasurement ) String () string {
339
+ return crsmLatencyName
340
+ }
0 commit comments