-
Notifications
You must be signed in to change notification settings - Fork 1.3k
/
stats.go
598 lines (522 loc) · 15 KB
/
stats.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
/*
*
* k6 - a next-generation load testing tool
* Copyright (C) 2016 Load Impact
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU Affero General Public License as
* published by the Free Software Foundation, either version 3 of the
* License, or (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU Affero General Public License for more details.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*
*/
package stats
import (
"context"
"encoding/json"
"errors"
"fmt"
"strconv"
"strings"
"time"
"github.com/mailru/easyjson/jwriter"
"gopkg.in/guregu/null.v3"
)
const (
counterString = "counter"
gaugeString = "gauge"
trendString = "trend"
rateString = "rate"
defaultString = "default"
timeString = "time"
dataString = "data"
)
// Possible values for MetricType.
const (
Counter = MetricType(iota) // A counter that sums its data points
Gauge // A gauge that displays the latest value
Trend // A trend, min/max/avg/med are interesting
Rate // A rate, displays % of values that aren't 0
)
// Possible values for ValueType.
const (
Default = ValueType(iota) // Values are presented as-is
Time // Values are timestamps (nanoseconds)
Data // Values are data amounts (bytes)
)
// The serialized metric type is invalid.
var ErrInvalidMetricType = errors.New("invalid metric type")
// The serialized value type is invalid.
var ErrInvalidValueType = errors.New("invalid value type")
// A MetricType specifies the type of a metric.
type MetricType int
// MarshalJSON serializes a MetricType as a human readable string.
func (t MetricType) MarshalJSON() ([]byte, error) {
txt, err := t.MarshalText()
if err != nil {
return nil, err
}
return []byte(`"` + string(txt) + `"`), nil
}
// MarshalText serializes a MetricType as a human readable string.
func (t MetricType) MarshalText() ([]byte, error) {
switch t {
case Counter:
return []byte(counterString), nil
case Gauge:
return []byte(gaugeString), nil
case Trend:
return []byte(trendString), nil
case Rate:
return []byte(rateString), nil
default:
return nil, ErrInvalidMetricType
}
}
// UnmarshalText deserializes a MetricType from a string representation.
func (t *MetricType) UnmarshalText(data []byte) error {
switch string(data) {
case counterString:
*t = Counter
case gaugeString:
*t = Gauge
case trendString:
*t = Trend
case rateString:
*t = Rate
default:
return ErrInvalidMetricType
}
return nil
}
func (t MetricType) String() string {
switch t {
case Counter:
return counterString
case Gauge:
return gaugeString
case Trend:
return trendString
case Rate:
return rateString
default:
return "[INVALID]"
}
}
// The type of values a metric contains.
type ValueType int
// MarshalJSON serializes a ValueType to a JSON string.
func (t ValueType) MarshalJSON() ([]byte, error) {
txt, err := t.MarshalText()
if err != nil {
return nil, err
}
return []byte(`"` + string(txt) + `"`), nil
}
// MarshalText serializes a ValueType as a human readable string.
func (t ValueType) MarshalText() ([]byte, error) {
switch t {
case Default:
return []byte(defaultString), nil
case Time:
return []byte(timeString), nil
case Data:
return []byte(dataString), nil
default:
return nil, ErrInvalidValueType
}
}
// UnmarshalText deserializes a ValueType from a string representation.
func (t *ValueType) UnmarshalText(data []byte) error {
switch string(data) {
case defaultString:
*t = Default
case timeString:
*t = Time
case dataString:
*t = Data
default:
return ErrInvalidValueType
}
return nil
}
func (t ValueType) String() string {
switch t {
case Default:
return defaultString
case Time:
return timeString
case Data:
return dataString
default:
return "[INVALID]"
}
}
// SampleTags is an immutable string[string] map for tags. Once a tag
// set is created, direct modification is prohibited. It has
// copy-on-write semantics and uses pointers for faster comparison
// between maps, since the same tag set is often used for multiple samples.
// All methods should not panic, even if they are called on a nil pointer.
//easyjson:skip
type SampleTags struct {
tags map[string]string
json []byte
}
// Get returns an empty string and false if the the requested key is not
// present or its value and true if it is.
func (st *SampleTags) Get(key string) (string, bool) {
if st == nil {
return "", false
}
val, ok := st.tags[key]
return val, ok
}
// IsEmpty checks for a nil pointer or zero tags.
// It's necessary because of this envconfig issue: https://github.com/kelseyhightower/envconfig/issues/113
func (st *SampleTags) IsEmpty() bool {
return st == nil || len(st.tags) == 0
}
// IsEqual tries to compare two tag sets with maximum efficiency.
func (st *SampleTags) IsEqual(other *SampleTags) bool {
if st == other {
return true
}
if st == nil || other == nil || len(st.tags) != len(other.tags) {
return false
}
for k, v := range st.tags {
if otherv, ok := other.tags[k]; !ok || v != otherv {
return false
}
}
return true
}
func (st *SampleTags) Contains(other *SampleTags) bool {
if st == other || other == nil {
return true
}
if st == nil || len(st.tags) < len(other.tags) {
return false
}
for k, v := range other.tags {
if myv, ok := st.tags[k]; !ok || myv != v {
return false
}
}
return true
}
// MarshalJSON serializes SampleTags to a JSON string and caches
// the result. It is not thread safe in the sense that the Go race
// detector will complain if it's used concurrently, but no data
// should be corrupted.
func (st *SampleTags) MarshalJSON() ([]byte, error) {
if st.IsEmpty() {
return []byte("null"), nil
}
if st.json != nil {
return st.json, nil
}
res, err := json.Marshal(st.tags)
if err != nil {
return res, err
}
st.json = res
return res, nil
}
// MarshalEasyJSON supports easyjson.Marshaler interface
func (st *SampleTags) MarshalEasyJSON(w *jwriter.Writer) {
w.RawByte('{')
first := true
for k, v := range st.tags {
if first {
first = false
} else {
w.RawByte(',')
}
w.String(k)
w.RawByte(':')
w.String(v)
}
w.RawByte('}')
}
// UnmarshalJSON deserializes SampleTags from a JSON string.
func (st *SampleTags) UnmarshalJSON(data []byte) error {
if st == nil {
*st = SampleTags{}
}
return json.Unmarshal(data, &st.tags)
}
// CloneTags copies the underlying set of a sample tags and
// returns it. If the receiver is nil, it returns an empty non-nil map.
func (st *SampleTags) CloneTags() map[string]string {
if st == nil {
return map[string]string{}
}
res := make(map[string]string, len(st.tags))
for k, v := range st.tags {
res[k] = v
}
return res
}
// NewSampleTags *copies* the supplied tag set and returns a new SampleTags
// instance with the key-value pairs from it.
func NewSampleTags(data map[string]string) *SampleTags {
if len(data) == 0 {
return nil
}
tags := map[string]string{}
for k, v := range data {
tags[k] = v
}
return &SampleTags{tags: tags}
}
// IntoSampleTags "consumes" the passed map and creates a new SampleTags
// struct with the data. The map is set to nil as a hint that it shouldn't
// be changed after it has been transformed into an "immutable" tag set.
// Oh, how I miss Rust and move semantics... :)
func IntoSampleTags(data *map[string]string) *SampleTags {
if len(*data) == 0 {
return nil
}
res := SampleTags{tags: *data}
*data = nil
return &res
}
// A Sample is a single measurement.
type Sample struct {
Metric *Metric
Time time.Time
Tags *SampleTags
Value float64
}
// SampleContainer is a simple abstraction that allows sample
// producers to attach extra information to samples they return
type SampleContainer interface {
GetSamples() []Sample
}
// Samples is just the simplest SampleContainer implementation
// that will be used when there's no need for extra information
type Samples []Sample
// GetSamples just implements the SampleContainer interface
func (s Samples) GetSamples() []Sample {
return s
}
// ConnectedSampleContainer is an extension of the SampleContainer
// interface that should be implemented when emitted samples
// are connected and share the same time and tags.
type ConnectedSampleContainer interface {
SampleContainer
GetTags() *SampleTags
GetTime() time.Time
}
// ConnectedSamples is the simplest ConnectedSampleContainer
// implementation that will be used when there's no need for
// extra information
type ConnectedSamples struct {
Samples []Sample
Tags *SampleTags
Time time.Time
}
// GetSamples implements the SampleContainer and ConnectedSampleContainer
// interfaces and returns the stored slice with samples.
func (cs ConnectedSamples) GetSamples() []Sample {
return cs.Samples
}
// GetTags implements ConnectedSampleContainer interface and returns stored tags.
func (cs ConnectedSamples) GetTags() *SampleTags {
return cs.Tags
}
// GetTime implements ConnectedSampleContainer interface and returns stored time.
func (cs ConnectedSamples) GetTime() time.Time {
return cs.Time
}
// GetSamples implement the ConnectedSampleContainer interface
// for a single Sample, since it's obviously connected with itself :)
func (s Sample) GetSamples() []Sample {
return []Sample{s}
}
// GetTags implements ConnectedSampleContainer interface
// and returns the sample's tags.
func (s Sample) GetTags() *SampleTags {
return s.Tags
}
// GetTime just implements ConnectedSampleContainer interface
// and returns the sample's time.
func (s Sample) GetTime() time.Time {
return s.Time
}
// Ensure that interfaces are implemented correctly
var (
_ SampleContainer = Sample{}
_ SampleContainer = Samples{}
)
var (
_ ConnectedSampleContainer = Sample{}
_ ConnectedSampleContainer = ConnectedSamples{}
)
// GetBufferedSamples will read all present (i.e. buffered or currently being pushed)
// values in the input channel and return them as a slice.
func GetBufferedSamples(input <-chan SampleContainer) (result []SampleContainer) {
for {
select {
case val, ok := <-input:
if !ok {
return
}
result = append(result, val)
default:
return
}
}
}
// PushIfNotDone first checks if the supplied context is done and doesn't push
// the sample container if it is.
func PushIfNotDone(ctx context.Context, output chan<- SampleContainer, sample SampleContainer) bool {
if ctx.Err() != nil {
return false
}
output <- sample
return true
}
// TODO: move to the metrics/ package
// A Metric defines the shape of a set of data.
type Metric struct {
Name string `json:"name"`
Type MetricType `json:"type"`
Contains ValueType `json:"contains"`
// TODO: decouple the metrics from the sinks and thresholds... have them
// linked, but not in the same struct?
Tainted null.Bool `json:"tainted"`
Thresholds Thresholds `json:"thresholds"`
Submetrics []*Submetric `json:"submetrics"`
Sub *Submetric `json:"-"`
Sink Sink `json:"-"`
Observed bool `json:"-"`
}
// Sample samples the metric at the given time, with the provided tags and value
func (m *Metric) Sample(t time.Time, tags *SampleTags, value float64) Sample {
return Sample{
Time: t,
Tags: tags,
Value: value,
Metric: m,
}
}
func New(name string, typ MetricType, t ...ValueType) *Metric {
vt := Default
if len(t) > 0 {
vt = t[0]
}
var sink Sink
switch typ {
case Counter:
sink = &CounterSink{}
case Gauge:
sink = &GaugeSink{}
case Trend:
sink = &TrendSink{}
case Rate:
sink = &RateSink{}
default:
return nil
}
return &Metric{Name: name, Type: typ, Contains: vt, Sink: sink}
}
// A Submetric represents a filtered dataset based on a parent metric.
type Submetric struct {
Name string `json:"name"`
Suffix string `json:"suffix"` // TODO: rename?
Tags *SampleTags `json:"tags"`
Metric *Metric `json:"-"`
Parent *Metric `json:"-"`
}
// AddSubmetric creates a new submetric from the key:value threshold definition
// and adds it to the metric's submetrics list.
func (m *Metric) AddSubmetric(keyValues string) (*Submetric, error) {
keyValues = strings.TrimSpace(keyValues)
if len(keyValues) == 0 {
return nil, fmt.Errorf("submetric criteria for metric '%s' cannot be empty", m.Name)
}
kvs := strings.Split(keyValues, ",")
rawTags := make(map[string]string, len(kvs))
for _, kv := range kvs {
if kv == "" {
continue
}
parts := strings.SplitN(kv, ":", 2)
key := strings.Trim(strings.TrimSpace(parts[0]), `"'`)
if len(parts) != 2 {
rawTags[key] = ""
continue
}
value := strings.Trim(strings.TrimSpace(parts[1]), `"'`)
rawTags[key] = value
}
tags := IntoSampleTags(&rawTags)
for _, sm := range m.Submetrics {
if sm.Tags.IsEqual(tags) {
return nil, fmt.Errorf(
"sub-metric with params '%s' already exists for metric %s: %s",
keyValues, m.Name, sm.Name,
)
}
}
subMetric := &Submetric{
Name: m.Name + "{" + keyValues + "}",
Suffix: keyValues,
Tags: tags,
Parent: m,
}
subMetricMetric := New(subMetric.Name, m.Type, m.Contains)
subMetricMetric.Sub = subMetric // sigh
subMetric.Metric = subMetricMetric
m.Submetrics = append(m.Submetrics, subMetric)
return subMetric, nil
}
// parsePercentile is a helper function to parse and validate percentile notations
func parsePercentile(stat string) (float64, error) {
if !strings.HasPrefix(stat, "p(") || !strings.HasSuffix(stat, ")") {
return 0, fmt.Errorf("invalid trend stat '%s', unknown format", stat)
}
percentile, err := strconv.ParseFloat(stat[2:len(stat)-1], 64)
if err != nil || (percentile < 0) || (percentile > 100) {
return 0, fmt.Errorf("invalid percentile trend stat value '%s', provide a number between 0 and 100", stat)
}
return percentile, nil
}
// GetResolversForTrendColumns checks if passed trend columns are valid for use in
// the summary output and then returns a map of the corresponding resolvers.
func GetResolversForTrendColumns(trendColumns []string) (map[string]func(s *TrendSink) float64, error) {
staticResolvers := map[string]func(s *TrendSink) float64{
"avg": func(s *TrendSink) float64 { return s.Avg },
"min": func(s *TrendSink) float64 { return s.Min },
"med": func(s *TrendSink) float64 { return s.Med },
"max": func(s *TrendSink) float64 { return s.Max },
"count": func(s *TrendSink) float64 { return float64(s.Count) },
}
dynamicResolver := func(percentile float64) func(s *TrendSink) float64 {
return func(s *TrendSink) float64 {
return s.P(percentile / 100)
}
}
result := make(map[string]func(s *TrendSink) float64, len(trendColumns))
for _, stat := range trendColumns {
if staticStat, ok := staticResolvers[stat]; ok {
result[stat] = staticStat
continue
}
percentile, err := parsePercentile(stat)
if err != nil {
return nil, err
}
result[stat] = dynamicResolver(percentile)
}
return result, nil
}