Skip to content
This repository was archived by the owner on Aug 23, 2023. It is now read-only.

WIP: Automatic detection of metric interval #849

Closed
wants to merge 11 commits into from
7 changes: 6 additions & 1 deletion api/ccache_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ import (
"github.com/grafana/metrictank/idx/memory"
"github.com/grafana/metrictank/mdata"
"github.com/grafana/metrictank/mdata/cache"
"github.com/grafana/metrictank/mdata/memorystore"
"gopkg.in/raintank/schema.v1"
)

Expand All @@ -32,11 +33,15 @@ func newSrv(delSeries, delArchives int) (*Server, *cache.MockCache) {
mockCache := cache.NewMockCache()
mockCache.DelMetricSeries = delSeries
mockCache.DelMetricArchives = delArchives
metrics := mdata.NewAggMetrics(store, mockCache, false, 0, 0, 0)
metrics := memorystore.NewAggMetrics(0, 0, 0)
mdata.MemoryStore = metrics
mdata.BackendStore = store
mdata.Cache = mockCache
srv.BindMemoryStore(metrics)
srv.BindCache(mockCache)

metricIndex := memory.New()
mdata.Idx = metricIndex
srv.BindMetricIndex(metricIndex)
return srv, mockCache
}
Expand Down
38 changes: 25 additions & 13 deletions api/dataprocessor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ import (
"github.com/grafana/metrictank/mdata/cache"
"github.com/grafana/metrictank/mdata/cache/accnt"
"github.com/grafana/metrictank/mdata/chunk"
"github.com/grafana/metrictank/mdata/memorystore"
"github.com/grafana/metrictank/test"
"gopkg.in/raintank/schema.v1"
)
Expand Down Expand Up @@ -346,7 +347,10 @@ func TestGetSeriesFixed(t *testing.T) {
mdata.SetSingleAgg(conf.Avg, conf.Min, conf.Max)
mdata.SetSingleSchema(conf.NewRetentionMT(10, 100, 600, 10, true))

metrics := mdata.NewAggMetrics(store, &cache.MockCache{}, false, 0, 0, 0)
metrics := memorystore.NewAggMetrics(0, 0, 0)
mdata.Cache = &cache.MockCache{}
mdata.BackendStore = store
mdata.MemoryStore = metrics
srv, _ := NewServer()
srv.BindBackendStore(store)
srv.BindMemoryStore(metrics)
Expand All @@ -360,13 +364,14 @@ func TestGetSeriesFixed(t *testing.T) {
for from := uint32(11); from <= 20; from++ { // should always yield result with first point at 20 (because from is inclusive)
for to := uint32(31); to <= 40; to++ { // should always yield result with last point at 30 (because to is exclusive)
name := fmt.Sprintf("case.data.offset.%d.query:%d-%d", offset, from, to)

metric := metrics.GetOrCreate(name, name, 0, 0)
metric.Add(offset, 10) // this point will always be quantized to 10
metric.Add(10+offset, 20) // this point will always be quantized to 20, so it should be selected
metric.Add(20+offset, 30) // this point will always be quantized to 30, so it should be selected
metric.Add(30+offset, 40) // this point will always be quantized to 40
metric.Add(40+offset, 50) // this point will always be quantized to 50
agg := mdata.Aggregations.Get(0)
s := mdata.Schemas.Get(0)
metric, _ := metrics.LoadOrStore(name, memorystore.NewAggMetric(name, s.Retentions, s.ReorderWindow, &agg))
metric.Add(&schema.MetricPoint{name, offset, 10}, 0) // this point will always be quantized to 10
metric.Add(&schema.MetricPoint{name, 10 + offset, 20}, 0) // this point will always be quantized to 20, so it should be selected
metric.Add(&schema.MetricPoint{name, 20 + offset, 30}, 0) // this point will always be quantized to 30, so it should be selected
metric.Add(&schema.MetricPoint{name, 30 + offset, 40}, 0) // this point will always be quantized to 40
metric.Add(&schema.MetricPoint{name, 40 + offset, 50}, 0) // this point will always be quantized to 50
req := models.NewReq(name, name, name, from, to, 1000, 10, consolidation.Avg, 0, cluster.Manager.ThisNode(), 0, 0)
req.ArchInterval = 10
points, err := srv.getSeriesFixed(test.NewContext(), req, consolidation.None)
Expand Down Expand Up @@ -578,7 +583,10 @@ func TestGetSeriesCachedStore(t *testing.T) {
store := mdata.NewMockStore()
srv.BindBackendStore(store)

metrics := mdata.NewAggMetrics(store, &cache.MockCache{}, false, 0, 0, 0)
metrics := memorystore.NewAggMetrics(0, 0, 0)
mdata.Cache = &cache.MockCache{}
mdata.BackendStore = store
mdata.MemoryStore = metrics
srv.BindMemoryStore(metrics)
metric := "metric1"
var c *cache.CCache
Expand Down Expand Up @@ -757,7 +765,10 @@ func TestGetSeriesAggMetrics(t *testing.T) {
cluster.Init("default", "test", time.Now(), "http", 6060)
store := mdata.NewMockStore()

metrics := mdata.NewAggMetrics(store, &cache.MockCache{}, false, 0, 0, 0)
metrics := memorystore.NewAggMetrics(0, 0, 0)
mdata.Cache = &cache.MockCache{}
mdata.BackendStore = store
mdata.MemoryStore = metrics
srv, _ := NewServer()
srv.BindBackendStore(store)
srv.BindMemoryStore(metrics)
Expand All @@ -768,10 +779,11 @@ func TestGetSeriesAggMetrics(t *testing.T) {
req := reqRaw(metricKey, from, to, 100, 10, consolidation.None, 0, 0)
req.ArchInterval = archInterval
ctx := newRequestContext(test.NewContext(), &req, consolidation.None)

metric := metrics.GetOrCreate(metricKey, metricKey, 0, 0)
agg := mdata.Aggregations.Get(0)
s := mdata.Schemas.Get(0)
metric, _ := metrics.LoadOrStore(metricKey, memorystore.NewAggMetric(metricKey, s.Retentions, s.ReorderWindow, &agg))
for i := uint32(50); i < 3000; i++ {
metric.Add(i, float64(i^2))
metric.Add(&schema.MetricPoint{metricKey, i, float64(i ^ 2)}, 0)
}

res, err := srv.getSeriesAggMetrics(ctx)
Expand Down
2 changes: 2 additions & 0 deletions api/graphite.go
Original file line number Diff line number Diff line change
Expand Up @@ -366,6 +366,8 @@ func (s *Server) metricsFind(ctx *middleware.Context, request models.GraphiteFin
response.Write(ctx, response.NewMsgpack(200, findPickle(nodes, request, fromUnix, toUnix)))
case "pickle":
response.Write(ctx, response.NewPickle(200, findPickle(nodes, request, fromUnix, toUnix)))
case "raw":
response.Write(ctx, response.NewJson(200, nodes, request.Jsonp))
}
}

Expand Down
2 changes: 1 addition & 1 deletion api/models/graphite.go
Original file line number Diff line number Diff line change
Expand Up @@ -147,7 +147,7 @@ type GraphiteTagDelSeriesResp struct {
type GraphiteFind struct {
FromTo
Query string `json:"query" form:"query" binding:"Required"`
Format string `json:"format" form:"format" binding:"In(,completer,json,treejson,msgpack,pickle)"`
Format string `json:"format" form:"format" binding:"In(,completer,json,treejson,msgpack,pickle,raw)"`
Jsonp string `json:"jsonp" form:"jsonp"`
}

Expand Down
9 changes: 8 additions & 1 deletion cmd/mt-kafka-mdm-sniff-out-of-order/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,14 @@ func newInputOOOFinder(format string) *inputOOOFinder {
}
}

func (ip *inputOOOFinder) Process(metric *schema.MetricData, partition int32) {
func (ip *inputOOOFinder) Process(point schema.DataPoint, partition int32) {
metric := point.Data()
// we can only print full metricData messages.
// TODO: we should update this tool to support printing the optimized
// schame.MetricPoint as well.
if metric == nil {
return
}
if *prefix != "" && !strings.HasPrefix(metric.Metric, *prefix) {
return
}
Expand Down
9 changes: 8 additions & 1 deletion cmd/mt-kafka-mdm-sniff/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,14 @@ func newInputPrinter(format string) inputPrinter {
}
}

func (ip inputPrinter) Process(metric *schema.MetricData, partition int32) {
func (ip inputPrinter) Process(point schema.DataPoint, partition int32) {
metric := point.Data()
// we can only print full metricData messages.
// TODO: we should update this tool to support printing the optimized
// schame.MetricPoint as well.
if metric == nil {
return
}
if *prefix != "" && !strings.HasPrefix(metric.Metric, *prefix) {
return
}
Expand Down
120 changes: 0 additions & 120 deletions cmd/mt-replicator-via-tsdb/main.go

This file was deleted.

Loading