Skip to content
This repository was archived by the owner on Aug 23, 2023. It is now read-only.

Commit 65c69af

Browse files
author
woodsaj
committed
input refactor
support interval autodetection and streamlined datapoint format
1 parent c134e22 commit 65c69af

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

52 files changed

+1707
-909
lines changed

api/ccache_test.go

+6-1
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@ import (
1515
"github.com/grafana/metrictank/idx/memory"
1616
"github.com/grafana/metrictank/mdata"
1717
"github.com/grafana/metrictank/mdata/cache"
18+
"github.com/grafana/metrictank/mdata/memorystore"
1819
"gopkg.in/raintank/schema.v1"
1920
)
2021

@@ -32,11 +33,15 @@ func newSrv(delSeries, delArchives int) (*Server, *cache.MockCache) {
3233
mockCache := cache.NewMockCache()
3334
mockCache.DelMetricSeries = delSeries
3435
mockCache.DelMetricArchives = delArchives
35-
metrics := mdata.NewAggMetrics(store, mockCache, false, 0, 0, 0)
36+
metrics := memorystore.NewAggMetrics(0, 0, 0)
37+
mdata.MemoryStore = metrics
38+
mdata.BackendStore = store
39+
mdata.Cache = mockCache
3640
srv.BindMemoryStore(metrics)
3741
srv.BindCache(mockCache)
3842

3943
metricIndex := memory.New()
44+
mdata.Idx = metricIndex
4045
srv.BindMetricIndex(metricIndex)
4146
return srv, mockCache
4247
}

api/dataprocessor_test.go

+25-13
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@ import (
1515
"github.com/grafana/metrictank/mdata/cache"
1616
"github.com/grafana/metrictank/mdata/cache/accnt"
1717
"github.com/grafana/metrictank/mdata/chunk"
18+
"github.com/grafana/metrictank/mdata/memorystore"
1819
"github.com/grafana/metrictank/test"
1920
"gopkg.in/raintank/schema.v1"
2021
)
@@ -346,7 +347,10 @@ func TestGetSeriesFixed(t *testing.T) {
346347
mdata.SetSingleAgg(conf.Avg, conf.Min, conf.Max)
347348
mdata.SetSingleSchema(conf.NewRetentionMT(10, 100, 600, 10, true))
348349

349-
metrics := mdata.NewAggMetrics(store, &cache.MockCache{}, false, 0, 0, 0)
350+
metrics := memorystore.NewAggMetrics(0, 0, 0)
351+
mdata.Cache = &cache.MockCache{}
352+
mdata.BackendStore = store
353+
mdata.MemoryStore = metrics
350354
srv, _ := NewServer()
351355
srv.BindBackendStore(store)
352356
srv.BindMemoryStore(metrics)
@@ -360,13 +364,14 @@ func TestGetSeriesFixed(t *testing.T) {
360364
for from := uint32(11); from <= 20; from++ { // should always yield result with first point at 20 (because from is inclusive)
361365
for to := uint32(31); to <= 40; to++ { // should always yield result with last point at 30 (because to is exclusive)
362366
name := fmt.Sprintf("case.data.offset.%d.query:%d-%d", offset, from, to)
363-
364-
metric := metrics.GetOrCreate(name, name, 0, 0)
365-
metric.Add(offset, 10) // this point will always be quantized to 10
366-
metric.Add(10+offset, 20) // this point will always be quantized to 20, so it should be selected
367-
metric.Add(20+offset, 30) // this point will always be quantized to 30, so it should be selected
368-
metric.Add(30+offset, 40) // this point will always be quantized to 40
369-
metric.Add(40+offset, 50) // this point will always be quantized to 50
367+
agg := mdata.Aggregations.Get(0)
368+
s := mdata.Schemas.Get(0)
369+
metric, _ := metrics.LoadOrStore(name, memorystore.NewAggMetric(name, s.Retentions, s.ReorderWindow, &agg))
370+
metric.Add(&schema.MetricPoint{name, offset, 10}) // this point will always be quantized to 10
371+
metric.Add(&schema.MetricPoint{name, 10 + offset, 20}) // this point will always be quantized to 20, so it should be selected
372+
metric.Add(&schema.MetricPoint{name, 20 + offset, 30}) // this point will always be quantized to 30, so it should be selected
373+
metric.Add(&schema.MetricPoint{name, 30 + offset, 40}) // this point will always be quantized to 40
374+
metric.Add(&schema.MetricPoint{name, 40 + offset, 50}) // this point will always be quantized to 50
370375
req := models.NewReq(name, name, name, from, to, 1000, 10, consolidation.Avg, 0, cluster.Manager.ThisNode(), 0, 0)
371376
req.ArchInterval = 10
372377
points, err := srv.getSeriesFixed(test.NewContext(), req, consolidation.None)
@@ -578,7 +583,10 @@ func TestGetSeriesCachedStore(t *testing.T) {
578583
store := mdata.NewMockStore()
579584
srv.BindBackendStore(store)
580585

581-
metrics := mdata.NewAggMetrics(store, &cache.MockCache{}, false, 0, 0, 0)
586+
metrics := memorystore.NewAggMetrics(0, 0, 0)
587+
mdata.Cache = &cache.MockCache{}
588+
mdata.BackendStore = store
589+
mdata.MemoryStore = metrics
582590
srv.BindMemoryStore(metrics)
583591
metric := "metric1"
584592
var c *cache.CCache
@@ -757,7 +765,10 @@ func TestGetSeriesAggMetrics(t *testing.T) {
757765
cluster.Init("default", "test", time.Now(), "http", 6060)
758766
store := mdata.NewMockStore()
759767

760-
metrics := mdata.NewAggMetrics(store, &cache.MockCache{}, false, 0, 0, 0)
768+
metrics := memorystore.NewAggMetrics(0, 0, 0)
769+
mdata.Cache = &cache.MockCache{}
770+
mdata.BackendStore = store
771+
mdata.MemoryStore = metrics
761772
srv, _ := NewServer()
762773
srv.BindBackendStore(store)
763774
srv.BindMemoryStore(metrics)
@@ -768,10 +779,11 @@ func TestGetSeriesAggMetrics(t *testing.T) {
768779
req := reqRaw(metricKey, from, to, 100, 10, consolidation.None, 0, 0)
769780
req.ArchInterval = archInterval
770781
ctx := newRequestContext(test.NewContext(), &req, consolidation.None)
771-
772-
metric := metrics.GetOrCreate(metricKey, metricKey, 0, 0)
782+
agg := mdata.Aggregations.Get(0)
783+
s := mdata.Schemas.Get(0)
784+
metric, _ := metrics.LoadOrStore(metricKey, memorystore.NewAggMetric(metricKey, s.Retentions, s.ReorderWindow, &agg))
773785
for i := uint32(50); i < 3000; i++ {
774-
metric.Add(i, float64(i^2))
786+
metric.Add(&schema.MetricPoint{metricKey, i, float64(i ^ 2)})
775787
}
776788

777789
res, err := srv.getSeriesAggMetrics(ctx)

api/graphite.go

+2
Original file line numberDiff line numberDiff line change
@@ -366,6 +366,8 @@ func (s *Server) metricsFind(ctx *middleware.Context, request models.GraphiteFin
366366
response.Write(ctx, response.NewMsgpack(200, findPickle(nodes, request, fromUnix, toUnix)))
367367
case "pickle":
368368
response.Write(ctx, response.NewPickle(200, findPickle(nodes, request, fromUnix, toUnix)))
369+
case "raw":
370+
response.Write(ctx, response.NewJson(200, nodes, request.Jsonp))
369371
}
370372
}
371373

api/models/graphite.go

+1-1
Original file line numberDiff line numberDiff line change
@@ -147,7 +147,7 @@ type GraphiteTagDelSeriesResp struct {
147147
type GraphiteFind struct {
148148
FromTo
149149
Query string `json:"query" form:"query" binding:"Required"`
150-
Format string `json:"format" form:"format" binding:"In(,completer,json,treejson,msgpack,pickle)"`
150+
Format string `json:"format" form:"format" binding:"In(,completer,json,treejson,msgpack,pickle,raw)"`
151151
Jsonp string `json:"jsonp" form:"jsonp"`
152152
}
153153

cmd/mt-kafka-mdm-sniff-out-of-order/main.go

+8-1
Original file line numberDiff line numberDiff line change
@@ -57,7 +57,14 @@ func newInputOOOFinder(format string) *inputOOOFinder {
5757
}
5858
}
5959

60-
func (ip *inputOOOFinder) Process(metric *schema.MetricData, partition int32) {
60+
func (ip *inputOOOFinder) Process(point schema.DataPoint, partition int32) {
61+
metric := point.Data()
62+
// we can only print full metricData messages.
63+
// TODO: we should update this tool to support printing the optimized
64+
// schame.MetricPoint as well.
65+
if metric == nil {
66+
return
67+
}
6168
if *prefix != "" && !strings.HasPrefix(metric.Metric, *prefix) {
6269
return
6370
}

cmd/mt-kafka-mdm-sniff/main.go

+8-1
Original file line numberDiff line numberDiff line change
@@ -46,7 +46,14 @@ func newInputPrinter(format string) inputPrinter {
4646
}
4747
}
4848

49-
func (ip inputPrinter) Process(metric *schema.MetricData, partition int32) {
49+
func (ip inputPrinter) Process(point schema.DataPoint, partition int32) {
50+
metric := point.Data()
51+
// we can only print full metricData messages.
52+
// TODO: we should update this tool to support printing the optimized
53+
// schame.MetricPoint as well.
54+
if metric == nil {
55+
return
56+
}
5057
if *prefix != "" && !strings.HasPrefix(metric.Metric, *prefix) {
5158
return
5259
}

cmd/mt-replicator-via-tsdb/main.go

-120
This file was deleted.

0 commit comments

Comments
 (0)