Skip to content
This repository was archived by the owner on Aug 23, 2023. It is now read-only.

Commit c596193

Browse files
author
woodsaj
committed
store save state as a property of the aggemtric instead of chunk
Rather then tracking the state of each individual chunk, just keep a record of the most recent saveStart(add to write queue)/saveFinish(write to cassandra) as properties of the aggMetric. When we save chunks we always save all unsaved chunks, so we dont lose anything by tracking the save state for all chunks in one variable. issue #452
1 parent 79c34d9 commit c596193

File tree

7 files changed

+79
-161
lines changed

7 files changed

+79
-161
lines changed

dashboard.json

+4-8
Original file line numberDiff line numberDiff line change
@@ -38,7 +38,7 @@
3838
{
3939
"aliasColors": {
4040
"ES backpressure": "#052B51",
41-
"add-to-saving": "#C15C17",
41+
"add-to-closed": "#C15C17",
4242
"primary": "#2F575E",
4343
"store backpressure": "#962D82",
4444
"too old": "#890F02"
@@ -124,22 +124,18 @@
124124
},
125125
{
126126
"refId": "C",
127-
"target": "alias(perSecond(metrictank.stats.$environment.$instance.tank.add_to_saved_chunk.counter32), 'add-to-saved')"
127+
"target": "alias(perSecond(metrictank.stats.$environment.$instance.tank.add_to_closed_chunk.counter32), 'add-to-closed')"
128128
},
129129
{
130130
"refId": "D",
131-
"target": "alias(perSecond(metrictank.stats.$environment.$instance.tank.add_to_saving_chunk.counter32), 'add-to-saving')"
132-
},
133-
{
134-
"refId": "E",
135131
"target": "aliasSub(perSecond(metrictank.stats.$environment.$instance.input.*.metrics_decode_err.counter32), '.*\\.([^\\.]+)\\.metrics_decode_err.*', '\\1 decode err')"
136132
},
137133
{
138-
"refId": "F",
134+
"refId": "E",
139135
"target": "aliasSub(perSecond(metrictank.stats.$environment.$instance.input.*.metric_invalid.counter32), '.*\\.([^\\.]+)\\.metric_invalid.*', '\\1 metric invalid')"
140136
},
141137
{
142-
"refId": "G",
138+
"refId": "F",
143139
"target": "aliasByNode(scale(perSecond(metrictank.stats.$environment.$instance.input.*.pressure.*.counter32), 0.000000001), 6, 7)"
144140
}
145141
],

docs/metrics.md

+5-9
Original file line numberDiff line numberDiff line change
@@ -144,15 +144,11 @@ the duration of a put in the wait queue
144144
how many rows come per get response
145145
* `store.cassandra.to_iter`:
146146
the duration of converting chunks to iterators
147-
* `tank.add_to_saved_chunk`:
148-
points received - by a secondary node - for the most recent chunk when that chunk
149-
has already been saved by a primary. A secondary can add this data to its chunks.
150-
* `tank.add_to_saving_chunk`:
151-
points received - by the primary node - for the most recent chunk
152-
when that chunk is already being saved (or has been saved).
153-
this indicates that your GC is actively sealing chunks and saving them before you have the chance to send
154-
your (infrequent) updates. The primary won't add them to its in-memory chunks, but secondaries will
155-
(because they are never in "saving" state for them), see below.
147+
* `tank.add_to_closed_chunk`:
148+
points received for the most recent chunk when that chunk is already being "closed",
149+
ie the end-of-stream marker has been written to the chunk.
150+
This indicates that your GC is actively sealing chunks and saving them before you have the chance to send
151+
your (infrequent) updates. Any points revcieved for a chunk that has already been closed are discarded.
156152
* `tank.chunk_operations.clear`:
157153
a counter of how many chunks are cleared (replaced by new chunks)
158154
* `tank.chunk_operations.create`:

mdata/aggmetric.go

+51-125
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,9 @@ type AggMetric struct {
3030
aggregators []*Aggregator
3131
firstChunkT0 uint32
3232
ttl uint32
33+
lastSaveStart uint32 // last chunk T0 that was added to the write Queue.
34+
lastSaveFinish uint32 // last chunk T0 successfully written to Cassandra.
35+
lastWrite uint32
3336
}
3437

3538
// NewAggMetric creates a metric with given key, it retains the given number of chunks each chunkSpan seconds long
@@ -42,6 +45,7 @@ func NewAggMetric(store Store, key string, chunkSpan, numChunks uint32, ttl uint
4245
NumChunks: numChunks,
4346
Chunks: make([]*chunk.Chunk, 0, numChunks),
4447
ttl: ttl,
48+
lastWrite: uint32(time.Now().Unix()),
4549
}
4650
for _, as := range aggsetting {
4751
m.aggregators = append(m.aggregators, NewAggregator(store, key, as.Span, as.ChunkSpan, as.NumChunks, as.Ttl))
@@ -54,99 +58,15 @@ func NewAggMetric(store Store, key string, chunkSpan, numChunks uint32, ttl uint
5458
func (a *AggMetric) SyncChunkSaveState(ts uint32) {
5559
a.Lock()
5660
defer a.Unlock()
57-
chunk := a.getChunkByT0(ts)
58-
if chunk != nil {
59-
if LogLevel < 2 {
60-
log.Debug("AM marking chunk %s:%d as saved.", a.Key, chunk.T0)
61-
}
62-
chunk.Saved = true
63-
}
64-
}
65-
66-
/* Get a chunk by its T0. It is expected that the caller has acquired a.Lock()*/
67-
func (a *AggMetric) getChunkByT0(ts uint32) *chunk.Chunk {
68-
// we have no chunks.
69-
if len(a.Chunks) == 0 {
70-
return nil
71-
}
72-
73-
currentT0 := a.Chunks[a.CurrentChunkPos].T0
74-
75-
if ts == currentT0 {
76-
//found our chunk.
77-
return a.Chunks[a.CurrentChunkPos]
78-
}
79-
80-
// requested Chunk is not in our dataset.
81-
if ts > currentT0 {
82-
return nil
83-
}
84-
85-
// requested Chunk is not in our dataset.
86-
if len(a.Chunks) == 1 {
87-
return nil
88-
}
89-
90-
// calculate the number of chunks ago our requested T0 is,
91-
// assuming that chunks are sequential.
92-
chunksAgo := int((currentT0 - ts) / a.ChunkSpan)
93-
94-
numChunks := len(a.Chunks)
95-
oldestPos := a.CurrentChunkPos + 1
96-
if oldestPos >= numChunks {
97-
oldestPos = 0
61+
if ts > a.lastSaveFinish {
62+
a.lastSaveFinish = ts
9863
}
99-
100-
var guess int
101-
102-
if chunksAgo >= (numChunks - 1) {
103-
// set guess to the oldest chunk.
104-
guess = oldestPos
105-
} else {
106-
guess = a.CurrentChunkPos - chunksAgo
107-
if guess < 0 {
108-
guess += numChunks
109-
}
64+
if ts > a.lastSaveStart {
65+
a.lastSaveStart = ts
11066
}
111-
112-
// we now have a good guess at which chunk position our requested TO is in.
113-
c := a.Chunks[guess]
114-
115-
if c.T0 == ts {
116-
// found our chunk.
117-
return c
118-
}
119-
120-
if ts > c.T0 {
121-
// we need to check newer chunks
122-
for c.T0 < currentT0 {
123-
guess += 1
124-
if guess >= numChunks {
125-
guess = 0
126-
}
127-
c = a.Chunks[guess]
128-
if c.T0 == ts {
129-
//found our chunk
130-
return c
131-
}
132-
}
133-
} else {
134-
// we need to check older chunks
135-
oldestT0 := a.Chunks[oldestPos].T0
136-
for c.T0 >= oldestT0 && c.T0 < currentT0 {
137-
guess -= 1
138-
if guess < 0 {
139-
guess += numChunks
140-
}
141-
c = a.Chunks[guess]
142-
if c.T0 == ts {
143-
//found or chunk.
144-
return c
145-
}
146-
}
67+
if LogLevel < 2 {
68+
log.Debug("AM metric %s at chunk T0=%d has been saved.", a.Key, ts)
14769
}
148-
// chunk not found.
149-
return nil
15070
}
15171

15272
func (a *AggMetric) getChunk(pos int) *chunk.Chunk {
@@ -332,18 +252,12 @@ func (a *AggMetric) addAggregators(ts uint32, val float64) {
332252

333253
// write a chunk to persistent storage. This should only be called while holding a.Lock()
334254
func (a *AggMetric) persist(pos int) {
335-
if !cluster.ThisNode.IsPrimary() {
336-
if LogLevel < 2 {
337-
log.Debug("AM persist(): node is not primary, not saving chunk.")
338-
}
339-
return
340-
}
341-
342-
pre := time.Now()
343255
chunk := a.Chunks[pos]
256+
pre := time.Now()
344257

345-
if chunk.Saved || chunk.Saving {
346-
// this can happen if chunk was persisted by GC (stale) and then new data triggered another persist call
258+
if a.lastSaveStart >= chunk.T0 {
259+
// this can happen if there are 2 primary MT nodes both saving chunks to Cassandra or
260+
// a primary failed and this node was promoted to be primary but metric consuming is lagging.
347261
log.Debug("AM persist(): duplicate persist call for chunk.")
348262
return
349263
}
@@ -352,11 +266,12 @@ func (a *AggMetric) persist(pos int) {
352266
pending := make([]*ChunkWriteRequest, 1)
353267
// add the current chunk to the list of chunks to send to the writeQueue
354268
pending[0] = &ChunkWriteRequest{
269+
metric: a,
355270
key: a.Key,
356-
chunk: chunk,
271+
span: a.ChunkSpan,
357272
ttl: a.ttl,
273+
chunk: chunk,
358274
timestamp: time.Now(),
359-
span: a.ChunkSpan,
360275
}
361276

362277
// if we recently became the primary, there may be older chunks
@@ -367,16 +282,17 @@ func (a *AggMetric) persist(pos int) {
367282
previousPos += len(a.Chunks)
368283
}
369284
previousChunk := a.Chunks[previousPos]
370-
for (previousChunk.T0 < chunk.T0) && !previousChunk.Saved && !previousChunk.Saving {
285+
for (previousChunk.T0 < chunk.T0) && (a.lastSaveStart < previousChunk.T0) {
371286
if LogLevel < 2 {
372287
log.Debug("AM persist(): old chunk needs saving. Adding %s:%d to writeQueue", a.Key, previousChunk.T0)
373288
}
374289
pending = append(pending, &ChunkWriteRequest{
290+
metric: a,
375291
key: a.Key,
376-
chunk: previousChunk,
292+
span: a.ChunkSpan,
377293
ttl: a.ttl,
294+
chunk: previousChunk,
378295
timestamp: time.Now(),
379-
span: a.ChunkSpan,
380296
})
381297
previousPos--
382298
if previousPos < 0 {
@@ -385,6 +301,9 @@ func (a *AggMetric) persist(pos int) {
385301
previousChunk = a.Chunks[previousPos]
386302
}
387303

304+
// Every chunk with a T0 <= this chunks' T0 is now either saved, or in the writeQueue.
305+
a.lastSaveStart = chunk.T0
306+
388307
if LogLevel < 2 {
389308
log.Debug("AM persist(): sending %d chunks to write queue", len(pending))
390309
}
@@ -402,9 +321,8 @@ func (a *AggMetric) persist(pos int) {
402321
if LogLevel < 2 {
403322
log.Debug("AM persist(): sealing chunk %d/%d (%s:%d) and adding to write queue.", pendingChunk, len(pending), a.Key, chunk.T0)
404323
}
405-
pending[pendingChunk].chunk.Finish()
406324
a.store.Add(pending[pendingChunk])
407-
pending[pendingChunk].chunk.Saving = true
325+
408326
pendingChunk--
409327
}
410328
persistDuration.Value(time.Now().Sub(pre))
@@ -432,6 +350,7 @@ func (a *AggMetric) Add(ts uint32, val float64) {
432350
}
433351

434352
log.Debug("AM %s Add(): created first chunk with first point: %v", a.Key, a.Chunks[0])
353+
a.lastWrite = uint32(time.Now().Unix())
435354
a.addAggregators(ts, val)
436355
return
437356
}
@@ -440,34 +359,40 @@ func (a *AggMetric) Add(ts uint32, val float64) {
440359

441360
if t0 == currentChunk.T0 {
442361
// last prior data was in same chunk as new point
443-
if currentChunk.Saving {
444-
// if we're already saving the chunk, it means it has the end-of-stream marker and any new points behind it wouldn't be read by an iterator
362+
if currentChunk.Closed {
363+
// if we've already 'finished' the chunk, it means it has the end-of-stream marker and any new points behind it wouldn't be read by an iterator
445364
// you should monitor this metric closely, it indicates that maybe your GC settings don't match how you actually send data (too late)
446-
addToSavingChunk.Inc()
365+
addToClosedChunk.Inc()
447366
return
448367
}
449368

450-
if err := currentChunk.Push(ts, val); err == nil {
451-
if currentChunk.Saved {
452-
// if we're here, it means we marked it as Saved because it was saved by an other primary, not by us since Saving is false.
453-
// typically this happens when non-primaries receive metrics that the primary already saved (maybe cause their metrics consumer is laggy)
454-
// we allow adding data to such chunks in that case, though this open the possibility for data to be rejected by the primary, to be
455-
// visible on secondaries.
456-
addToSavedChunk.Inc()
457-
}
458-
} else {
369+
if err := currentChunk.Push(ts, val); err != nil {
459370
log.Debug("AM failed to add metric to chunk for %s. %s", a.Key, err)
460371
metricsTooOld.Inc()
461372
return
462373
}
374+
a.lastWrite = uint32(time.Now().Unix())
463375
log.Debug("AM %s Add(): pushed new value to last chunk: %v", a.Key, a.Chunks[0])
464376
} else if t0 < currentChunk.T0 {
465377
log.Debug("AM Point at %d has t0 %d, goes back into previous chunk. CurrentChunk t0: %d, LastTs: %d", ts, t0, currentChunk.T0, currentChunk.LastTs)
466378
metricsTooOld.Inc()
467379
return
468380
} else {
469-
// persist the chunk. If the writeQueue is full, then this will block.
470-
a.persist(a.CurrentChunkPos)
381+
// Data belongs in a new chunk.
382+
383+
// If it isnt finished already, add the end-of-stream marker and flag the chunk as "closed"
384+
if !currentChunk.Closed {
385+
currentChunk.Finish()
386+
}
387+
388+
// If we are a primary node, then add the chunk to the write queue to be saved to Cassandra
389+
if cluster.ThisNode.IsPrimary() {
390+
if LogLevel < 2 {
391+
log.Debug("AM persist(): node is primary, saving chunk.")
392+
}
393+
// persist the chunk. If the writeQueue is full, then this will block.
394+
a.persist(a.CurrentChunkPos)
395+
}
471396

472397
a.CurrentChunkPos++
473398
if a.CurrentChunkPos >= int(a.NumChunks) {
@@ -490,6 +415,7 @@ func (a *AggMetric) Add(ts uint32, val float64) {
490415
}
491416
log.Debug("AM %s Add(): cleared chunk at %d of %d and replaced with new. and added the new point: %s", a.Key, a.CurrentChunkPos, len(a.Chunks), a.Chunks[a.CurrentChunkPos])
492417
}
418+
a.lastWrite = uint32(time.Now().Unix())
493419

494420
}
495421
a.addAggregators(ts, val)
@@ -503,13 +429,13 @@ func (a *AggMetric) GC(chunkMinTs, metricMinTs uint32) bool {
503429
return false
504430
}
505431

506-
if currentChunk.LastWrite < chunkMinTs {
507-
if currentChunk.Saved {
432+
if a.lastWrite < chunkMinTs {
433+
if a.lastSaveStart >= currentChunk.T0 {
508434
// already saved. lets check if we should just delete the metric from memory.
509-
if currentChunk.LastWrite < metricMinTs {
435+
if a.lastWrite < metricMinTs {
510436
return true
511437
}
512-
} else if !currentChunk.Saving {
438+
} else if a.lastSaveStart < currentChunk.T0 {
513439
// chunk hasn't been written to in a while, and is not yet queued to persist. Let's persist it
514440
log.Debug("Found stale Chunk, persisting it to Cassandra. key: %s T0: %d", a.Key, currentChunk.T0)
515441
a.persist(a.CurrentChunkPos)

0 commit comments

Comments
 (0)