Skip to content
This repository was archived by the owner on Aug 23, 2023. It is now read-only.

Commit fb2a833

Browse files
authored
Merge pull request #869 from grafana/issue-844
fix: GC task too eagerly closes chunks. #844
2 parents 5240c14 + 24564d8 commit fb2a833

File tree

3 files changed

+67
-33
lines changed

3 files changed

+67
-33
lines changed

mdata/aggmetric.go

+60-26
Original file line numberDiff line numberDiff line change
@@ -561,16 +561,45 @@ func (a *AggMetric) add(ts uint32, val float64) {
561561
a.addAggregators(ts, val)
562562
}
563563

564+
// collectable returns whether the AggMetric is garbage collectable
565+
// an Aggmetric is collectable based on two conditions:
566+
// * the AggMetric hasn't been written to in a configurable amount of time
567+
// (wether the write went to the ROB or a chunk is irrelevant)
568+
// * the last chunk - if any - is no longer "active".
569+
// active means:
570+
// any reasonable realtime stream (e.g. up to 15 min behind wall-clock)
571+
// could add points to the chunk
572+
//
573+
// caller must hold AggMetric lock
574+
func (a *AggMetric) collectable(now, chunkMinTs uint32) bool {
575+
576+
currentChunk := a.getChunk(a.CurrentChunkPos)
577+
578+
// no chunks at all means "possibly collectable"
579+
// the caller (AggMetric.GC()) still has its own checks to
580+
// handle the "no chunks" correctly later.
581+
// also: we want AggMetric.GC() to go ahead with flushing the ROB in this case
582+
if currentChunk == nil {
583+
return a.lastWrite < chunkMinTs
584+
}
585+
586+
return a.lastWrite < chunkMinTs && currentChunk.Series.T0+a.ChunkSpan+15*60 < now
587+
}
588+
564589
// GC returns whether or not this AggMetric is stale and can be removed
565590
// chunkMinTs -> min timestamp of a chunk before to be considered stale and to be persisted to Cassandra
566591
// metricMinTs -> min timestamp for a metric before to be considered stale and to be purged from the tank
567-
func (a *AggMetric) GC(chunkMinTs, metricMinTs uint32) bool {
592+
func (a *AggMetric) GC(now, chunkMinTs, metricMinTs uint32) bool {
568593
a.Lock()
569594
defer a.Unlock()
570595

571-
// if the reorderBuffer is enabled and we have not received a datapoint in a while,
572-
// then flush the reorder buffer.
573-
if a.rob != nil && a.lastWrite < chunkMinTs {
596+
// abort unless it looks like the AggMetric is collectable
597+
if !a.collectable(now, chunkMinTs) {
598+
return false
599+
}
600+
601+
// make sure any points in the reorderBuffer are moved into our chunks so we can save the data
602+
if a.rob != nil {
574603
tmpLastWrite := a.lastWrite
575604
pts := a.rob.Flush()
576605
for _, p := range pts {
@@ -582,44 +611,49 @@ func (a *AggMetric) GC(chunkMinTs, metricMinTs uint32) bool {
582611
}
583612

584613
// this aggMetric has never had metrics written to it.
585-
// if the rob is empty or disabled, this AggMetric can be deleted
586614
if len(a.Chunks) == 0 {
587-
return (a.rob == nil || a.rob.IsEmpty()) && a.gcAggregators(chunkMinTs, metricMinTs)
615+
return a.gcAggregators(now, chunkMinTs, metricMinTs)
588616
}
589617

590618
currentChunk := a.getChunk(a.CurrentChunkPos)
591619
if currentChunk == nil {
592620
return false
593621
}
594622

595-
if a.lastWrite < chunkMinTs {
596-
if currentChunk.Closed {
597-
// already closed and should be saved, though we cant guarantee that.
598-
// Check if we should just delete the metric from memory.
599-
if a.lastWrite < metricMinTs {
600-
return a.gcAggregators(chunkMinTs, metricMinTs)
601-
}
602-
} else {
603-
// chunk hasn't been written to in a while, and is not yet closed. Let's close it and persist it if
604-
// we are a primary
605-
log.Debug("Found stale Chunk, adding end-of-stream bytes. key: %s T0: %d", a.Key, currentChunk.T0)
606-
currentChunk.Finish()
607-
if cluster.Manager.IsPrimary() {
608-
if LogLevel < 2 {
609-
log.Debug("AM persist(): node is primary, saving chunk. %s T0: %d", a.Key, currentChunk.T0)
610-
}
611-
// persist the chunk. If the writeQueue is full, then this will block.
612-
a.persist(a.CurrentChunkPos)
623+
// we must check collectable again. Imagine this scenario:
624+
// * we didn't have any chunks when calling collectable() the first time so it returned true
625+
// * data from the ROB is flushed and moved into a new chunk
626+
// * this new chunk is active so we're not collectable, even though earlier we thought we were.
627+
if !a.collectable(now, chunkMinTs) {
628+
return false
629+
}
630+
631+
if currentChunk.Closed {
632+
// already closed and should be saved, though we cant guarantee that.
633+
// Check if we should just delete the metric from memory.
634+
if a.lastWrite < metricMinTs {
635+
return a.gcAggregators(now, chunkMinTs, metricMinTs)
636+
}
637+
} else {
638+
// chunk hasn't been written to in a while, and is not yet closed. Let's close it and persist it if
639+
// we are a primary
640+
log.Debug("Found stale Chunk, adding end-of-stream bytes. key: %s T0: %d", a.Key, currentChunk.T0)
641+
currentChunk.Finish()
642+
if cluster.Manager.IsPrimary() {
643+
if LogLevel < 2 {
644+
log.Debug("AM persist(): node is primary, saving chunk. %s T0: %d", a.Key, currentChunk.T0)
613645
}
646+
// persist the chunk. If the writeQueue is full, then this will block.
647+
a.persist(a.CurrentChunkPos)
614648
}
615649
}
616650
return false
617651
}
618652

619-
func (a *AggMetric) gcAggregators(chunkMinTs, metricMinTs uint32) bool {
653+
func (a *AggMetric) gcAggregators(now, chunkMinTs, metricMinTs uint32) bool {
620654
ret := true
621655
for _, agg := range a.aggregators {
622-
ret = agg.GC(chunkMinTs, metricMinTs, a.lastWrite) && ret
656+
ret = agg.GC(now, chunkMinTs, metricMinTs, a.lastWrite) && ret
623657
}
624658
return ret
625659
}

mdata/aggmetrics.go

+1-1
Original file line numberDiff line numberDiff line change
@@ -62,7 +62,7 @@ func (ms *AggMetrics) GC() {
6262
ms.RLock()
6363
a := ms.Metrics[key]
6464
ms.RUnlock()
65-
if stale := a.GC(chunkMinTs, metricMinTs); stale {
65+
if a.GC(now, chunkMinTs, metricMinTs) {
6666
log.Debug("metric %s is stale. Purging data from memory.", key)
6767
ms.Lock()
6868
delete(ms.Metrics, key)

mdata/aggregator.go

+6-6
Original file line numberDiff line numberDiff line change
@@ -113,7 +113,7 @@ func (agg *Aggregator) Add(ts uint32, val float64) {
113113
}
114114
}
115115

116-
func (agg *Aggregator) GC(chunkMinTs, metricMinTs, lastWriteTime uint32) bool {
116+
func (agg *Aggregator) GC(now, chunkMinTs, metricMinTs, lastWriteTime uint32) bool {
117117
ret := true
118118

119119
if lastWriteTime+agg.span > chunkMinTs {
@@ -127,19 +127,19 @@ func (agg *Aggregator) GC(chunkMinTs, metricMinTs, lastWriteTime uint32) bool {
127127
}
128128

129129
if agg.minMetric != nil {
130-
ret = agg.minMetric.GC(chunkMinTs, metricMinTs) && ret
130+
ret = agg.minMetric.GC(now, chunkMinTs, metricMinTs) && ret
131131
}
132132
if agg.maxMetric != nil {
133-
ret = agg.maxMetric.GC(chunkMinTs, metricMinTs) && ret
133+
ret = agg.maxMetric.GC(now, chunkMinTs, metricMinTs) && ret
134134
}
135135
if agg.sumMetric != nil {
136-
ret = agg.sumMetric.GC(chunkMinTs, metricMinTs) && ret
136+
ret = agg.sumMetric.GC(now, chunkMinTs, metricMinTs) && ret
137137
}
138138
if agg.cntMetric != nil {
139-
ret = agg.cntMetric.GC(chunkMinTs, metricMinTs) && ret
139+
ret = agg.cntMetric.GC(now, chunkMinTs, metricMinTs) && ret
140140
}
141141
if agg.lstMetric != nil {
142-
ret = agg.lstMetric.GC(chunkMinTs, metricMinTs) && ret
142+
ret = agg.lstMetric.GC(now, chunkMinTs, metricMinTs) && ret
143143
}
144144

145145
return ret

0 commit comments

Comments
 (0)