diff --git a/tsdb/chunks/chunk_write_queue.go b/tsdb/chunks/chunk_write_queue.go index 5cdd2e81f0..e7b9a4cc3c 100644 --- a/tsdb/chunks/chunk_write_queue.go +++ b/tsdb/chunks/chunk_write_queue.go @@ -22,6 +22,8 @@ import ( "github.com/prometheus/prometheus/tsdb/chunkenc" ) +const chunkRefMapFreeThreshold = 10 + type chunkWriteJob struct { cutFile bool seriesRef HeadSeriesRef @@ -38,8 +40,9 @@ type chunkWriteJob struct { type chunkWriteQueue struct { jobs chan chunkWriteJob - chunkRefMapMtx sync.RWMutex - chunkRefMap map[ChunkDiskMapperRef]chunkenc.Chunk + chunkRefMapMtx sync.RWMutex + chunkRefMap map[ChunkDiskMapperRef]chunkenc.Chunk + chunkRefMapPeak int isRunningMtx sync.Mutex // Protects the isRunning property. isRunning bool // Used to prevent that new jobs get added to the queue when the chan is already closed. @@ -69,7 +72,7 @@ func newChunkWriteQueue(reg prometheus.Registerer, size int, writeChunk writeChu q := &chunkWriteQueue{ jobs: make(chan chunkWriteJob, size), - chunkRefMap: make(map[ChunkDiskMapperRef]chunkenc.Chunk, size), + chunkRefMap: make(map[ChunkDiskMapperRef]chunkenc.Chunk), writeChunk: writeChunk, adds: counters.WithLabelValues("add"), @@ -112,6 +115,14 @@ func (c *chunkWriteQueue) processJob(job chunkWriteJob) { delete(c.chunkRefMap, job.ref) c.completed.Inc() + + if len(c.chunkRefMap) == 0 && c.chunkRefMapPeak > chunkRefMapFreeThreshold { + // Re-initialize the chunk ref map to half of the peak size that was in use since the last re-init event. + // By setting it to half of the peak we try to minimize the number of allocations required for a "normal" usage + // while ensuring that if its usage has decreased we shrink it over time. + c.chunkRefMap = make(map[ChunkDiskMapperRef]chunkenc.Chunk, c.chunkRefMapPeak/2) + c.chunkRefMapPeak = 0 + } } func (c *chunkWriteQueue) addJob(job chunkWriteJob) (err error) { @@ -121,6 +132,8 @@ func (c *chunkWriteQueue) addJob(job chunkWriteJob) (err error) { } }() + // c.isRunningMtx serializes the adding of jobs to the c.chunkRefMap, if c.jobs is full then c.addJob() will block + // while holding c.isRunningMtx, this guarantees that c.chunkRefMap won't ever grow beyond the queue size + 1. c.isRunningMtx.Lock() defer c.isRunningMtx.Unlock() @@ -130,6 +143,11 @@ func (c *chunkWriteQueue) addJob(job chunkWriteJob) (err error) { c.chunkRefMapMtx.Lock() c.chunkRefMap[job.ref] = job.chk + + // Keep track of the peak usage of c.chunkRefMap. + if len(c.chunkRefMap) > c.chunkRefMapPeak { + c.chunkRefMapPeak = len(c.chunkRefMap) + } c.chunkRefMapMtx.Unlock() c.jobs <- job