Skip to content
This repository was archived by the owner on Aug 23, 2023. It is now read-only.

Commit 36498c0

Browse files
committed
Add additional validation for correct number of points that are correctly spaced etc
1 parent 1f707a6 commit 36498c0

File tree

1 file changed

+29
-5
lines changed

1 file changed

+29
-5
lines changed

cmd/mt-parrot/monitor.go

+29-5
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,12 @@ type seriesStats struct {
3030
numNonMatching int32
3131
//tracks the last seen non-NaN time stamp (useful for lag
3232
lastSeen uint32
33+
//the expected number of points were received
34+
correctNumPoints bool
35+
//the last ts matches `now`
36+
correctAlignment bool
37+
//all points are sorted and 1 period apart
38+
correctSpacing bool
3339
}
3440

3541
type partitionMetrics struct {
@@ -58,25 +64,31 @@ func monitor() {
5864
}
5965

6066
for _, s := range query.Decoded {
61-
processPartitionSeries(s)
67+
processPartitionSeries(s, tick)
6268
}
6369
}
6470
}
6571

66-
67-
func processPartitionSeries(s graphite.Series) {
68-
log.Infof("%d - %d", s.Datapoints[0].Ts, s.Datapoints[len(s.Datapoints)-1].Ts)
72+
func processPartitionSeries(s graphite.Series, now time.Time) {
6973
partition, err := strconv.Atoi(s.Target)
7074
if err != nil {
7175
log.Debug("unable to parse partition", err)
7276
invalidError.Inc()
7377
return
7478
}
79+
if len(s.Datapoints) < 2 {
80+
log.Debugf("partition has invalid number of datapoints: %d", len(s.Datapoints))
81+
invalidError.Inc()
82+
return
83+
}
84+
7585
serStats := seriesStats{}
7686
serStats.lastTs = s.Datapoints[len(s.Datapoints)-1].Ts
87+
serStats.correctAlignment = int64(serStats.lastTs) == now.Unix()
88+
serStats.correctNumPoints = len(s.Datapoints) == int(lookbackPeriod/testMetricsInterval)+1
89+
serStats.correctSpacing = checkSpacing(s.Datapoints)
7790

7891
for _, dp := range s.Datapoints {
79-
8092
if math.IsNaN(dp.Val) {
8193
serStats.nans += 1
8294
continue
@@ -96,6 +108,18 @@ func processPartitionSeries(s graphite.Series) {
96108
metrics.nonMatching.Set(int(serStats.numNonMatching))
97109
}
98110

111+
func checkSpacing(points []graphite.Point) bool {
112+
previous := points[0].Ts
113+
for i := 1; i < len(points); i++ {
114+
current := points[i].Ts
115+
if current-previous != uint32(testMetricsInterval.Seconds()) {
116+
return false
117+
}
118+
previous = current
119+
}
120+
return true
121+
}
122+
99123
func initMetricsBySeries() {
100124
for p := 0; p < int(partitionCount); p++ {
101125
metrics := partitionMetrics{

0 commit comments

Comments
 (0)