Skip to content

Commit

Permalink
add time factor
Browse files Browse the repository at this point in the history
Signed-off-by: Mauro Stettler <mauro.stettler@gmail.com>
  • Loading branch information
replay committed Feb 8, 2022
1 parent 8e3bf56 commit 20f3746
Showing 1 changed file with 15 additions and 8 deletions.
23 changes: 15 additions & 8 deletions tsdb/chunks/chunk_write_queue.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,13 +16,17 @@ package chunks
import (
"errors"
"sync"
"time"

"github.com/prometheus/client_golang/prometheus"

"github.com/prometheus/prometheus/tsdb/chunkenc"
)

const chunkRefMapFreeThreshold = 10
const (
chunkRefMapFreeThreshold = 10
chunkRefMapMinFreeInterval = 10 * time.Minute
)

type chunkWriteJob struct {
cutFile bool
Expand All @@ -40,9 +44,10 @@ type chunkWriteJob struct {
type chunkWriteQueue struct {
jobs chan chunkWriteJob

chunkRefMapMtx sync.RWMutex
chunkRefMap map[ChunkDiskMapperRef]chunkenc.Chunk
chunkRefMapPeak int
chunkRefMapMtx sync.RWMutex
chunkRefMap map[ChunkDiskMapperRef]chunkenc.Chunk
chunkRefMapPeak int
chunkRefMapLastFree time.Time

isRunningMtx sync.Mutex // Protects the isRunning property.
isRunning bool // Used to prevent that new jobs get added to the queue when the chan is already closed.
Expand Down Expand Up @@ -71,9 +76,10 @@ func newChunkWriteQueue(reg prometheus.Registerer, size int, writeChunk writeChu
)

q := &chunkWriteQueue{
jobs: make(chan chunkWriteJob, size),
chunkRefMap: make(map[ChunkDiskMapperRef]chunkenc.Chunk),
writeChunk: writeChunk,
jobs: make(chan chunkWriteJob, size),
chunkRefMap: make(map[ChunkDiskMapperRef]chunkenc.Chunk),
chunkRefMapLastFree: time.Now(),
writeChunk: writeChunk,

adds: counters.WithLabelValues("add"),
gets: counters.WithLabelValues("get"),
Expand Down Expand Up @@ -116,12 +122,13 @@ func (c *chunkWriteQueue) processJob(job chunkWriteJob) {

c.completed.Inc()

if len(c.chunkRefMap) == 0 && c.chunkRefMapPeak > chunkRefMapFreeThreshold {
if len(c.chunkRefMap) == 0 && c.chunkRefMapPeak > chunkRefMapFreeThreshold && time.Since(c.chunkRefMapLastFree) > chunkRefMapMinFreeInterval {
// Re-initialize the chunk ref map to half of the peak size that was in use since the last re-init event.
// By setting it to half of the peak we try to minimize the number of allocations required for a "normal" usage
// while ensuring that if its usage has decreased we shrink it over time.
c.chunkRefMap = make(map[ChunkDiskMapperRef]chunkenc.Chunk, c.chunkRefMapPeak/2)
c.chunkRefMapPeak = 0
c.chunkRefMapLastFree = time.Now()
}
}

Expand Down

0 comments on commit 20f3746

Please sign in to comment.