Skip to content
This repository was archived by the owner on Aug 23, 2023. It is now read-only.

Commit c134e22

Browse files
author
woodsaj
committed
Automatic detection of metric interval
If a metric is received with an Interval field set to 0, then we need to automatically detect the interval. This is achieved by buffering the last 3 points in memory. We then sort the points by timestamp and get the time difference between the first 2, and last 2. The lower difference is used as the interval. Metrics are delayed from being processed until the Interval is known, so when a new series is sent it will take 3x interval before any data is queriable. Future effort needs to be done to adjust the interval to known valid intervals. eg 1s, 5s, 10s, 30s, 1m, 5m, etc....
1 parent 8ac5a28 commit c134e22

File tree

4 files changed

+122
-47
lines changed

4 files changed

+122
-47
lines changed

input/carbon/carbon.go

+3-7
Original file line numberDiff line numberDiff line change
@@ -33,7 +33,6 @@ type Carbon struct {
3333
handlerWaitGroup sync.WaitGroup
3434
quit chan struct{}
3535
connTrack *ConnTrack
36-
intervalGetter IntervalGetter
3736
}
3837

3938
type ConnTrack struct {
@@ -102,10 +101,6 @@ func New() *Carbon {
102101
}
103102
}
104103

105-
func (c *Carbon) IntervalGetter(i IntervalGetter) {
106-
c.intervalGetter = i
107-
}
108-
109104
func (c *Carbon) Start(handler input.Handler, fatal chan struct{}) error {
110105
c.Handler = handler
111106
l, err := net.ListenTCP("tcp", c.addr)
@@ -186,17 +181,18 @@ func (c *Carbon) handle(conn net.Conn) {
186181
}
187182
nameSplits := strings.Split(string(key), ";")
188183
md := &schema.MetricData{
184+
Id: nameSplits[0], //this is a temporary ID. It will be updated once we determine the Interval
189185
Name: nameSplits[0],
190186
Metric: nameSplits[0],
191-
Interval: c.intervalGetter.GetInterval(nameSplits[0]),
187+
Interval: 0,
192188
Value: val,
193189
Unit: "unknown",
194190
Time: int64(ts),
195191
Mtype: "gauge",
196192
Tags: nameSplits[1:],
197193
OrgId: 1, // admin org
198194
}
199-
md.SetId()
195+
200196
metricsPerMessage.ValueUint32(1)
201197
c.Handler.Process(md, int32(partitionId))
202198
}

input/carbon/intervalgetter.go

-34
This file was deleted.

input/input.go

+119-3
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,8 @@ package input
44

55
import (
66
"fmt"
7+
"sort"
8+
"sync"
79
"time"
810

911
"github.com/grafana/metrictank/idx"
@@ -29,10 +31,21 @@ type DefaultHandler struct {
2931

3032
metrics mdata.Metrics
3133
metricIndex idx.MetricIndex
34+
35+
buffer sync.Map
36+
}
37+
38+
type IntervalLookupRecord struct {
39+
sync.Mutex
40+
41+
Interval int
42+
DataPoints []*schema.MetricData
43+
Last time.Time
44+
pos int
3245
}
3346

34-
func NewDefaultHandler(metrics mdata.Metrics, metricIndex idx.MetricIndex, input string) DefaultHandler {
35-
return DefaultHandler{
47+
func NewDefaultHandler(metrics mdata.Metrics, metricIndex idx.MetricIndex, input string) *DefaultHandler {
48+
return &DefaultHandler{
3649
metricsReceived: stats.NewCounter32(fmt.Sprintf("input.%s.metrics_received", input)),
3750
MetricInvalid: stats.NewCounter32(fmt.Sprintf("input.%s.metric_invalid", input)),
3851
MsgsAge: stats.NewMeter32(fmt.Sprintf("input.%s.message_age", input), false),
@@ -46,10 +59,17 @@ func NewDefaultHandler(metrics mdata.Metrics, metricIndex idx.MetricIndex, input
4659

4760
// process makes sure the data is stored and the metadata is in the index
4861
// concurrency-safe.
49-
func (in DefaultHandler) Process(metric *schema.MetricData, partition int32) {
62+
func (in *DefaultHandler) Process(metric *schema.MetricData, partition int32) {
5063
if metric == nil {
5164
return
5265
}
66+
67+
if metric.Interval <= 0 {
68+
// we need to buffer enough of this metric to be able to work out its interval.
69+
// once that is done, we can update the Id and call in.Process again
70+
in.SetInterval(metric, partition)
71+
return
72+
}
5373
in.metricsReceived.Inc()
5474
err := metric.Validate()
5575
if err != nil {
@@ -72,3 +92,99 @@ func (in DefaultHandler) Process(metric *schema.MetricData, partition int32) {
7292
m.Add(uint32(metric.Time), metric.Value)
7393
in.pressureTank.Add(int(time.Since(pre).Nanoseconds()))
7494
}
95+
96+
func (in *DefaultHandler) SetInterval(metric *schema.MetricData, partition int32) {
97+
b, ok := in.buffer.Load(metric.Id)
98+
if !ok {
99+
dp := make([]*schema.MetricData, 1, 3)
100+
dp[0] = metric
101+
in.buffer.Store(metric.Id, &IntervalLookupRecord{
102+
DataPoints: dp,
103+
})
104+
return
105+
}
106+
ilr := b.(*IntervalLookupRecord)
107+
ilr.Lock()
108+
// check for duplicate TS
109+
if ilr.DataPoints[ilr.pos].Time == metric.Time {
110+
//drop the metric as it is a duplicate.
111+
ilr.Unlock()
112+
return
113+
}
114+
115+
// add this datapoint to our circular buffer
116+
if len(ilr.DataPoints) < 3 {
117+
ilr.DataPoints = append(ilr.DataPoints, metric)
118+
ilr.pos++
119+
} else {
120+
if ilr.pos == 2 {
121+
ilr.pos = 0
122+
} else {
123+
ilr.pos++
124+
}
125+
ilr.DataPoints[ilr.pos] = metric
126+
}
127+
128+
// if the interval is already known and was updated in the last 24hours, use it.
129+
if ilr.Interval != 0 && time.Since(ilr.Last) > 84600 {
130+
metric.Interval = ilr.Interval
131+
metric.SetId()
132+
in.Process(metric, partition)
133+
ilr.Unlock()
134+
return
135+
}
136+
137+
if len(ilr.DataPoints) < 3 {
138+
// we dont have 3 points yet.
139+
ilr.Unlock()
140+
return
141+
}
142+
log.Debug("input: calculating interval of %s", metric.Id)
143+
144+
delta1 := ilr.DataPoints[1].Time - ilr.DataPoints[0].Time
145+
// make sure the points are not out of order
146+
if delta1 < 0 {
147+
delta1 = -1 * delta1
148+
}
149+
// make sure the points are not out of order
150+
delta2 := ilr.DataPoints[2].Time - ilr.DataPoints[1].Time
151+
if delta2 < 0 {
152+
delta2 = -1 * delta2
153+
}
154+
interval := 0
155+
156+
// To protect against dropped metrics and out of order metrics, use the smallest delta as the interval.
157+
// Because we have 3 datapoints, it doesnt matter what order they are in. The smallest delta is always
158+
// going to be correct. (unless their are out of order and dropped metrics)
159+
if delta1 <= delta2 {
160+
interval = int(delta1)
161+
} else {
162+
interval = int(delta2)
163+
}
164+
165+
// TODO: align the interval to the likely value. eg. if the value is 27, make it 30. if it is 68, use 60. etc...
166+
167+
if ilr.Last.IsZero() {
168+
// sort the datapoints then update their interval and process them properly.
169+
sort.Sort(SortedMetricData(ilr.DataPoints))
170+
ilr.pos = 2 // as the metrics are sorted, the oldest point is at the end of the slice
171+
for _, md := range ilr.DataPoints {
172+
md.Interval = interval
173+
md.SetId()
174+
in.Process(md, partition)
175+
}
176+
} else {
177+
metric.Interval = interval
178+
metric.SetId()
179+
in.Process(metric, partition)
180+
}
181+
ilr.Interval = interval
182+
ilr.Last = time.Now()
183+
ilr.Unlock()
184+
}
185+
186+
type SortedMetricData []*schema.MetricData
187+
188+
func (a SortedMetricData) Len() int { return len(a) }
189+
func (a SortedMetricData) Swap(i, j int) { a[i], a[j] = a[j], a[i] }
190+
func (a SortedMetricData) Less(i, j int) bool { return a[i].Time < a[j].Time }

metrictank.go

-3
Original file line numberDiff line numberDiff line change
@@ -379,9 +379,6 @@ func main() {
379379
***********************************/
380380
pluginFatal := make(chan struct{})
381381
for _, plugin := range inputs {
382-
if carbonPlugin, ok := plugin.(*inCarbon.Carbon); ok {
383-
carbonPlugin.IntervalGetter(inCarbon.NewIndexIntervalGetter(metricIndex))
384-
}
385382
err = plugin.Start(input.NewDefaultHandler(metrics, metricIndex, plugin.Name()), pluginFatal)
386383
if err != nil {
387384
shutdown()

0 commit comments

Comments
 (0)