@@ -41,7 +41,7 @@ type AggMetric struct {
41
41
ttl uint32
42
42
lastSaveStart uint32 // last chunk T0 that was added to the write Queue.
43
43
lastSaveFinish uint32 // last chunk T0 successfully written to Cassandra.
44
- lastWrite uint32
44
+ lastWrite uint32 // wall clock time of when last point was successfully added (possibly to the ROB)
45
45
firstTs uint32
46
46
}
47
47
@@ -331,7 +331,7 @@ func (a *AggMetric) Get(from, to uint32) (Result, error) {
331
331
return result , nil
332
332
}
333
333
334
- // this function must only be called while holding the lock
334
+ // caller must hold lock
335
335
func (a * AggMetric ) addAggregators (ts uint32 , val float64 ) {
336
336
for _ , agg := range a .aggregators {
337
337
log .Debugf ("AM: %s pushing %d,%f to aggregator %d" , a .Key , ts , val , agg .span )
@@ -340,7 +340,7 @@ func (a *AggMetric) addAggregators(ts uint32, val float64) {
340
340
}
341
341
342
342
// pushToCache adds the chunk into the cache if it is hot
343
- // assumes lock held by caller
343
+ // caller must hold lock
344
344
func (a * AggMetric ) pushToCache (c * chunk.Chunk ) {
345
345
if a .cachePusher == nil {
346
346
return
@@ -355,9 +355,10 @@ func (a *AggMetric) pushToCache(c *chunk.Chunk) {
355
355
go a .cachePusher .AddIfHot (a .Key , 0 , itergen )
356
356
}
357
357
358
- // write a chunk to persistent storage. This should only be called while holding a.Lock()
358
+ // write a chunk to persistent storage.
359
359
// never persist a chunk that may receive further updates!
360
360
// (because the stores will read out chunk data on the unlocked chunk)
361
+ // caller must hold lock.
361
362
func (a * AggMetric ) persist (pos int ) {
362
363
chunk := a .Chunks [pos ]
363
364
pre := time .Now ()
@@ -443,7 +444,7 @@ func (a *AggMetric) Add(ts uint32, val float64) {
443
444
}
444
445
445
446
// don't ever call with a ts of 0, cause we use 0 to mean not initialized!
446
- // assumes a write lock is held by the call-site
447
+ // caller must hold write lock
447
448
func (a * AggMetric ) add (ts uint32 , val float64 ) {
448
449
t0 := ts - (ts % a .ChunkSpan )
449
450
@@ -546,22 +547,18 @@ func (a *AggMetric) add(ts uint32, val float64) {
546
547
// any reasonable realtime stream (e.g. up to 15 min behind wall-clock)
547
548
// could add points to the chunk
548
549
//
549
- // caller must hold AggMetric lock
550
+ // caller must hold lock
550
551
func (a * AggMetric ) collectable (now , chunkMinTs uint32 ) bool {
551
552
552
- var currentChunk * chunk.Chunk
553
- if len (a .Chunks ) != 0 {
554
- currentChunk = a .getChunk (a .CurrentChunkPos )
555
- }
556
-
557
553
// no chunks at all means "possibly collectable"
558
554
// the caller (AggMetric.GC()) still has its own checks to
559
555
// handle the "no chunks" correctly later.
560
556
// also: we want AggMetric.GC() to go ahead with flushing the ROB in this case
561
- if currentChunk == nil {
557
+ if len ( a . Chunks ) == 0 {
562
558
return a .lastWrite < chunkMinTs
563
559
}
564
560
561
+ currentChunk := a .getChunk (a .CurrentChunkPos )
565
562
return a .lastWrite < chunkMinTs && currentChunk .Series .T0 + a .ChunkSpan + 15 * 60 < now
566
563
}
567
564
@@ -572,7 +569,7 @@ func (a *AggMetric) GC(now, chunkMinTs, metricMinTs uint32) bool {
572
569
a .Lock ()
573
570
defer a .Unlock ()
574
571
575
- // abort unless it looks like the AggMetric is collectable
572
+ // unless it looks like the AggMetric is collectable, abort and mark as not stale
576
573
if ! a .collectable (now , chunkMinTs ) {
577
574
return false
578
575
}
@@ -627,6 +624,7 @@ func (a *AggMetric) GC(now, chunkMinTs, metricMinTs uint32) bool {
627
624
return false
628
625
}
629
626
627
+ // gcAggregators returns whether all aggregators are stale and can be removed
630
628
func (a * AggMetric ) gcAggregators (now , chunkMinTs , metricMinTs uint32 ) bool {
631
629
ret := true
632
630
for _ , agg := range a .aggregators {
0 commit comments