Skip to content
This repository was archived by the owner on Aug 23, 2023. It is now read-only.
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.

Commit 1eec530

Browse files
committedMar 19, 2018
idx & input: changing approach. no need for composite msg.Point type
instead, use different codepaths in the input handler and index this is a bit simpler
1 parent 1c511b7 commit 1eec530

File tree

8 files changed

+158
-90
lines changed

8 files changed

+158
-90
lines changed
 

‎idx/cassandra/cassandra.go

+52-10
Original file line numberDiff line numberDiff line change
@@ -264,10 +264,40 @@ func (c *CasIdx) Stop() {
264264
c.session.Close()
265265
}
266266

267-
func (c *CasIdx) AddOrUpdate(point msg.Point, partition int32) idx.Archive {
267+
func (c *CasIdx) UpdateMaybe(point schema.MetricPointId2, partition int32) (idx.Archive, bool) {
268268
pre := time.Now()
269-
existing, inMemory := c.MemoryIdx.Get(data.Id)
270-
archive := c.MemoryIdx.AddOrUpdate(data, partition)
269+
mkey := schema.MKey{
270+
Key: point.MetricPointId1.Id,
271+
Org: point.Org,
272+
}
273+
274+
// note that both functions return an 'ok' bool.
275+
// abeit very unlikely,
276+
// the idx entry could be pruned in between the two calls and so they could be different
277+
existing, inMemory := c.MemoryIdx.Get(mkey)
278+
archive, inMemory2 := c.MemoryIdx.UpdateMaybe(point, partition)
279+
280+
if !updateCassIdx {
281+
statUpdateDuration.Value(time.Since(pre))
282+
return archive, inMemory2
283+
}
284+
285+
if inMemory {
286+
c.deleteOldIfMoved(existing, partition)
287+
}
288+
289+
if inMemory2 {
290+
archive = c.updateCassandraIfStale(inMemory, archive, partition)
291+
}
292+
293+
statUpdateDuration.Value(time.Since(pre))
294+
return archive, inMemory2
295+
}
296+
297+
func (c *CasIdx) AddOrUpdate(mkey schema.MKey, data *schema.MetricData, partition int32) idx.Archive {
298+
pre := time.Now()
299+
existing, inMemory := c.MemoryIdx.Get(mkey)
300+
archive := c.MemoryIdx.AddOrUpdate(mkey, data, partition)
271301
stat := statUpdateDuration
272302
if !inMemory {
273303
stat = statAddDuration
@@ -277,22 +307,35 @@ func (c *CasIdx) AddOrUpdate(point msg.Point, partition int32) idx.Archive {
277307
return archive
278308
}
279309

280-
now := uint32(time.Now().Unix())
310+
if inMemory {
311+
c.deleteOldIfMoved(existing, partition)
312+
}
313+
314+
archive = c.updateCassandraIfStale(inMemory, archive, partition)
315+
stat.Value(time.Since(pre))
316+
return archive
317+
}
281318

282-
// Cassandra uses partition id as the partitioning key, so an "update" that changes the partition for
283-
// an existing metricDef will just create a new row in the table and wont remove the old row.
284-
// So we need to explicitly delete the old entry.
285-
if inMemory && existing.Partition != partition {
319+
// Cassandra uses partition id as the partitioning key, so an "update" that changes the partition for
320+
// an existing metricDef will just create a new row in the table and wont remove the old row.
321+
// So we need to explicitly delete the old entry.
322+
func (c *CasIdx) deleteOldIfMoved(existing idx.Archive, partition int32) {
323+
if existing.Partition != partition {
286324
go func() {
287325
if err := c.deleteDef(&existing); err != nil {
288326
log.Error(3, err.Error())
289327
}
290328
}()
291329
}
330+
}
331+
332+
// updateCassandraIfStale saves the archive to cassandra if needed and
333+
// updates the memory index with the updated fields.
334+
func (c *CasIdx) updateCassandraIfStale(inMemory bool, archive idx.Archive, partition int32) idx.Archive {
335+
now := uint32(time.Now().Unix())
292336

293337
// check if we need to save to cassandra.
294338
if archive.LastSave >= (now - updateInterval32) {
295-
stat.Value(time.Since(pre))
296339
return archive
297340
}
298341

@@ -320,7 +363,6 @@ func (c *CasIdx) AddOrUpdate(point msg.Point, partition int32) idx.Archive {
320363
}
321364
}
322365

323-
stat.Value(time.Since(pre))
324366
return archive
325367
}
326368

‎idx/idx.go

+5-3
Original file line numberDiff line numberDiff line change
@@ -6,8 +6,6 @@ import (
66
"errors"
77
"time"
88

9-
"github.com/grafana/metrictank/msg"
10-
119
schema "gopkg.in/raintank/schema.v1"
1210
)
1311

@@ -58,9 +56,13 @@ type MetricIndex interface {
5856
// Stop shuts down the index.
5957
Stop()
6058

59+
// UpdateMaybe updates an existing archive, if found.
60+
// it returns the existing archive (if any), and whether it was found
61+
UpdateMaybe(point schema.MetricPointId2, partition int32) (Archive, bool)
62+
6163
// AddOrUpdate makes sure a metric is known in the index,
6264
// and should be called for every received metric.
63-
AddOrUpdate(point msg.Point, partition int32) Archive
65+
AddOrUpdate(mkey schema.MKey, data *schema.MetricData, partition int32) Archive
6466

6567
// Get returns the archive for the requested id.
6668
Get(key schema.MKey) (Archive, bool)

‎idx/memory/memory.go

+33-3
Original file line numberDiff line numberDiff line change
@@ -206,13 +206,43 @@ func (m *MemoryIdx) Stop() {
206206
return
207207
}
208208

209-
func (m *MemoryIdx) AddOrUpdate(data *schema.MetricData, partition int32) idx.Archive {
209+
// UpdateMaybe updates an existing archive, if found.
210+
// it returns the existing archive (if any), and whether it was found
211+
func (m *MemoryIdx) UpdateMaybe(point schema.MetricPointId2, partition int32) (idx.Archive, bool) {
212+
pre := time.Now()
213+
214+
mkey := schema.MKey{
215+
Key: point.MetricPointId1.Id,
216+
Org: point.Org,
217+
}
218+
219+
m.Lock()
220+
defer m.Unlock()
221+
222+
existing, ok := m.defById[mkey]
223+
if ok {
224+
log.Debug("metricDef with id %v already in index", mkey)
225+
existing.LastUpdate = int64(point.MetricPointId1.Time)
226+
existing.Partition = partition
227+
statUpdate.Inc()
228+
statUpdateDuration.Value(time.Since(pre))
229+
return *existing, true
230+
}
231+
232+
return idx.Archive{}, false
233+
}
234+
235+
// AddOrUpdate returns the corresponding Archive for the MetricData.
236+
// if it is existing -> updates lastUpdate based on .Time, and partition
237+
// if was new -> adds new MetricDefinition to index
238+
func (m *MemoryIdx) AddOrUpdate(mkey schema.MKey, data *schema.MetricData, partition int32) idx.Archive {
210239
pre := time.Now()
211240
m.Lock()
212241
defer m.Unlock()
213-
existing, ok := m.defById[data.Id]
242+
243+
existing, ok := m.defById[mkey]
214244
if ok {
215-
log.Debug("metricDef with id %s already in index.", data.Id)
245+
log.Debug("metricDef with id %s already in index.", mkey)
216246
existing.LastUpdate = data.Time
217247
existing.Partition = partition
218248
statUpdate.Inc()

‎input/carbon/carbon.go

+3-5
Original file line numberDiff line numberDiff line change
@@ -12,7 +12,6 @@ import (
1212

1313
"github.com/grafana/metrictank/cluster"
1414
"github.com/grafana/metrictank/input"
15-
"github.com/grafana/metrictank/msg"
1615
"github.com/grafana/metrictank/stats"
1716
"github.com/metrics20/go-metrics20/carbon20"
1817
"github.com/raintank/worldping-api/pkg/log"
@@ -186,8 +185,7 @@ func (c *Carbon) handle(conn net.Conn) {
186185
continue
187186
}
188187
nameSplits := strings.Split(string(key), ";")
189-
pointMsg := msg.Point{}
190-
pointMsg.Md = &schema.MetricData{
188+
md := &schema.MetricData{
191189
Name: nameSplits[0],
192190
Metric: nameSplits[0],
193191
Interval: c.intervalGetter.GetInterval(nameSplits[0]),
@@ -198,9 +196,9 @@ func (c *Carbon) handle(conn net.Conn) {
198196
Tags: nameSplits[1:],
199197
OrgId: 1, // admin org
200198
}
201-
pointMsg.Md.SetId()
199+
md.SetId()
202200
metricsPerMessage.ValueUint32(1)
203-
c.Handler.Process(pointMsg, int32(partitionId))
201+
c.Handler.ProcessMetricData(md, int32(partitionId))
204202
}
205203
c.handlerWaitGroup.Done()
206204
}

‎input/input.go

+50-42
Original file line numberDiff line numberDiff line change
@@ -10,13 +10,13 @@ import (
1010

1111
"github.com/grafana/metrictank/idx"
1212
"github.com/grafana/metrictank/mdata"
13-
"github.com/grafana/metrictank/msg"
1413
"github.com/grafana/metrictank/stats"
1514
"github.com/raintank/worldping-api/pkg/log"
1615
)
1716

1817
type Handler interface {
19-
Process(pointMsg msg.Point, partition int32)
18+
ProcessMetricData(md *schema.MetricData, partition int32)
19+
ProcessMetricPoint(point schema.MetricPointId2, partition int32)
2020
}
2121

2222
// TODO: clever way to document all metrics for all different inputs
@@ -46,55 +46,63 @@ func NewDefaultHandler(metrics mdata.Metrics, metricIndex idx.MetricIndex, input
4646
}
4747
}
4848

49-
// process makes sure the data is stored and the metadata is in the index
49+
// ProcessMetricPoint updates the index if possible, and stores the data if we have an index entry
5050
// concurrency-safe.
51-
func (in DefaultHandler) Process(pointMsg msg.Point, partition int32) {
51+
func (in DefaultHandler) ProcessMetricPoint(point schema.MetricPointId2, partition int32) {
5252
in.metricsReceived.Inc()
53-
var mkey schema.MKey
54-
var timestamp uint32
55-
var value float64
56-
57-
if pointMsg.Val == 0 {
58-
err := pointMsg.Md.Validate()
59-
if err != nil {
60-
in.MetricInvalid.Inc()
61-
log.Debug("in: Invalid metric %v: %s", pointMsg.Md, err)
62-
return
63-
}
64-
if pointMsg.Md.Time == 0 {
65-
in.MetricInvalid.Inc()
66-
log.Warn("in: invalid metric. metric.Time is 0. %s", pointMsg.Md.Id)
67-
return
68-
}
69-
70-
mkey, err = schema.MKeyFromString(pointMsg.Md.Id)
71-
if err != nil {
72-
log.Debug("in: Invalid metric %v: %s", pointMsg.Md, err)
73-
}
74-
75-
timestamp = uint32(pointMsg.Md.Time)
76-
value = pointMsg.Md.Value
77-
} else {
78-
if !pointMsg.Point.Valid() {
79-
in.MetricInvalid.Inc()
80-
log.Debug("in: Invalid metric %v", pointMsg.Point)
81-
return
82-
}
83-
mkey = schema.MKey{
84-
Key: pointMsg.Point.MetricPointId1.Id,
85-
Org: pointMsg.Point.Org,
86-
}
87-
timestamp = pointMsg.Point.MetricPointId1.Time
88-
value = pointMsg.Point.MetricPointId1.Value
53+
if !point.Valid() {
54+
in.MetricInvalid.Inc()
55+
log.Debug("in: Invalid metric %v", point)
56+
return
57+
}
58+
mkey := schema.MKey{
59+
Key: point.MetricPointId1.Id,
60+
Org: point.Org,
61+
}
62+
63+
pre := time.Now()
64+
archive, ok := in.metricIndex.UpdateMaybe(point, partition)
65+
in.pressureIdx.Add(int(time.Since(pre).Nanoseconds()))
66+
67+
if !ok {
68+
return
69+
}
70+
71+
pre = time.Now()
72+
m := in.metrics.GetOrCreate(mkey, archive.SchemaId, archive.AggId)
73+
m.Add(point.MetricPointId1.Time, point.MetricPointId1.Value)
74+
in.pressureTank.Add(int(time.Since(pre).Nanoseconds()))
75+
76+
}
77+
78+
// ProcessMetricData assures the data is stored and the metadata is in the index
79+
// concurrency-safe.
80+
func (in DefaultHandler) ProcessMetricData(md *schema.MetricData, partition int32) {
81+
in.metricsReceived.Inc()
82+
err := md.Validate()
83+
if err != nil {
84+
in.MetricInvalid.Inc()
85+
log.Debug("in: Invalid metric %v: %s", md, err)
86+
return
87+
}
88+
if md.Time == 0 {
89+
in.MetricInvalid.Inc()
90+
log.Warn("in: invalid metric. metric.Time is 0. %s", md.Id)
91+
return
92+
}
8993

94+
mkey, err := schema.MKeyFromString(md.Id)
95+
if err != nil {
96+
log.Error(3, "in: Invalid metric %v: could not parse ID: %s", md, err)
97+
return
9098
}
9199

92100
pre := time.Now()
93-
archive := in.metricIndex.AddOrUpdate(pointMsg, partition)
101+
archive := in.metricIndex.AddOrUpdate(mkey, md, partition)
94102
in.pressureIdx.Add(int(time.Since(pre).Nanoseconds()))
95103

96104
pre = time.Now()
97105
m := in.metrics.GetOrCreate(mkey, archive.SchemaId, archive.AggId)
98-
m.Add(timestamp, value)
106+
m.Add(uint32(md.Time), md.Value)
99107
in.pressureTank.Add(int(time.Since(pre).Nanoseconds()))
100108
}

‎input/kafkamdm/kafkamdm.go

+12-11
Original file line numberDiff line numberDiff line change
@@ -8,14 +8,15 @@ import (
88
"sync"
99
"time"
1010

11+
schema "gopkg.in/raintank/schema.v1"
12+
1113
"github.com/Shopify/sarama"
1214
"github.com/raintank/worldping-api/pkg/log"
1315
"github.com/rakyll/globalconf"
1416

1517
"github.com/grafana/metrictank/cluster"
1618
"github.com/grafana/metrictank/input"
1719
"github.com/grafana/metrictank/kafka"
18-
"github.com/grafana/metrictank/msg"
1920
"github.com/grafana/metrictank/stats"
2021
)
2122

@@ -351,30 +352,30 @@ func (k *KafkaMdm) consumePartition(topic string, partition int32, currentOffset
351352
}
352353

353354
func (k *KafkaMdm) handleMsg(data []byte, partition int32) {
354-
pointMsg := msg.Point{}
355355
if len(data) == 29 && data[0] == 0 {
356-
pointMsg.Val = 1
357-
pointMsg.Point.MetricPointId1.UnmarshalDirect(data[1:])
358-
pointMsg.Point.Org = uint32(orgId)
359-
k.Handler.Process(pointMsg, partition)
356+
var point schema.MetricPointId2
357+
point.MetricPointId1.UnmarshalDirect(data[1:])
358+
point.Org = uint32(orgId)
359+
k.Handler.ProcessMetricPoint(point, partition)
360360
return
361361
}
362362

363363
if len(data) == 33 && data[0] == 1 {
364-
pointMsg.Val = 1
365-
pointMsg.Point.UnmarshalDirect(data[1:])
366-
k.Handler.Process(pointMsg, partition)
364+
var point schema.MetricPointId2
365+
point.UnmarshalDirect(data[1:])
366+
k.Handler.ProcessMetricPoint(point, partition)
367367
return
368368
}
369369

370-
_, err := pointMsg.Md.UnmarshalMsg(data)
370+
md := schema.MetricData{}
371+
_, err := md.UnmarshalMsg(data)
371372
if err != nil {
372373
metricsDecodeErr.Inc()
373374
log.Error(3, "kafka-mdm decode error, skipping message. %s", err)
374375
return
375376
}
376377
metricsPerMessage.ValueUint32(1)
377-
k.Handler.Process(pointMsg, partition)
378+
k.Handler.ProcessMetricData(&md, partition)
378379
}
379380

380381
// Stop will initiate a graceful stop of the Consumer (permanent)

‎input/prometheus/prometheus.go

+3-5
Original file line numberDiff line numberDiff line change
@@ -10,7 +10,6 @@ import (
1010
"github.com/golang/snappy"
1111
"github.com/grafana/metrictank/cluster"
1212
"github.com/grafana/metrictank/input"
13-
"github.com/grafana/metrictank/msg"
1413
"github.com/prometheus/common/model"
1514
"github.com/prometheus/prometheus/prompb"
1615
"github.com/raintank/worldping-api/pkg/log"
@@ -100,8 +99,7 @@ func (p *prometheusWriteHandler) handle(w http.ResponseWriter, req *http.Request
10099
}
101100
if name != "" {
102101
for _, sample := range ts.Samples {
103-
pointMsg := msg.Point{}
104-
pointMsg.Md = &schema.MetricData{
102+
md := &schema.MetricData{
105103
Name: name,
106104
Metric: name,
107105
Interval: 15,
@@ -112,8 +110,8 @@ func (p *prometheusWriteHandler) handle(w http.ResponseWriter, req *http.Request
112110
Tags: tagSet,
113111
OrgId: 1,
114112
}
115-
pointMsg.Md.SetId()
116-
p.Process(pointMsg, int32(partitionID))
113+
md.SetId()
114+
p.ProcessMetricData(md, int32(partitionID))
117115
}
118116
} else {
119117
w.WriteHeader(400)

‎msg/msg.go

-11
This file was deleted.

0 commit comments

Comments
 (0)
This repository has been archived.