@@ -12,8 +12,8 @@ import (
12
12
)
13
13
14
14
var (
15
- httpError = stats .NewCounter32 ("parrot.monitoring.error;error=http" )
16
- invalidError = stats .NewCounter32 ("parrot.monitoring.error;error=invalid" )
15
+ httpError = stats .NewCounter32 ("parrot.monitoring.error;error=http" )
16
+ invalidError = stats .NewCounter32 ("parrot.monitoring.error;error=invalid" )
17
17
)
18
18
19
19
type seriesStats struct {
@@ -24,12 +24,23 @@ type seriesStats struct {
24
24
deltaSum float64
25
25
//the number of timestamps where value != ts
26
26
numNonMatching int32
27
-
28
27
//tracks the last seen non-NaN time stamp (useful for lag
29
28
lastSeen uint32
30
29
}
31
30
31
+ type partitionMetrics struct {
32
+ //number of missing values for each series
33
+ nanCount * stats.Gauge32
34
+ //time since the last value was recorded
35
+ lag * stats.Gauge32
36
+ //total amount of drift between expected value and actual values
37
+ deltaSum * stats.Gauge32
38
+ //total number of entries where drift occurred
39
+ nonMatching * stats.Gauge32
40
+ }
41
+
32
42
func monitor () {
43
+ metricsBySeries := initMetricsBySeries ()
33
44
for tick := range time .NewTicker (queryInterval ).C {
34
45
35
46
query := graphite .ExecuteRenderQuery (buildRequest (tick ))
@@ -65,16 +76,27 @@ func monitor() {
65
76
}
66
77
}
67
78
68
- //number of missing values for each series
69
- stats .NewGauge32 (fmt .Sprintf ("parrot.monitoring.nancount;partition=%d" , partition )).Set (int (serStats .nans ))
70
- //time since the last value was recorded
71
- stats .NewGauge32 (fmt .Sprintf ("parrot.monitoring.lag;partition=%d" , partition )).Set (int (serStats .lastTs - serStats .lastSeen ))
72
- //total amount of drift between expected value and actual values
73
- stats .NewGauge32 (fmt .Sprintf ("parrot.monitoring.deltaSum;partition=%d" , partition )).Set (int (serStats .deltaSum ))
74
- //total number of entries where drift occurred
75
- stats .NewGauge32 (fmt .Sprintf ("parrot.monitoring.nonMatching;partition=%d" , partition )).Set (int (serStats .numNonMatching ))
79
+ metrics := metricsBySeries [partition ]
80
+ metrics .nanCount .Set (int (serStats .nans ))
81
+ metrics .lag .Set (int (serStats .lastTs - serStats .lastSeen ))
82
+ metrics .deltaSum .Set (int (serStats .deltaSum ))
83
+ metrics .nonMatching .Set (int (serStats .numNonMatching ))
84
+ }
85
+ }
86
+ }
87
+
88
+ func initMetricsBySeries () []partitionMetrics {
89
+ var metricsBySeries []partitionMetrics
90
+ for p := 0 ; p < int (partitionCount ); p ++ {
91
+ metrics := partitionMetrics {
92
+ nanCount : stats .NewGauge32 (fmt .Sprintf ("parrot.monitoring.nancount;partition=%d" , p )),
93
+ lag : stats .NewGauge32 (fmt .Sprintf ("parrot.monitoring.lag;partition=%d" , p )),
94
+ deltaSum : stats .NewGauge32 (fmt .Sprintf ("parrot.monitoring.deltaSum;partition=%d" , p )),
95
+ nonMatching : stats .NewGauge32 (fmt .Sprintf ("parrot.monitoring.nonMatching;partition=%d" , p )),
76
96
}
97
+ metricsBySeries = append (metricsBySeries , metrics )
77
98
}
99
+ return metricsBySeries
78
100
}
79
101
80
102
func buildRequest (now time.Time ) * http.Request {
0 commit comments