From d7a75bc8f59b0892e19c0a0203517de3c47eb793 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Peter=20=C5=A0tibran=C3=BD?= Date: Wed, 1 Jun 2022 12:03:41 +0200 Subject: [PATCH] Address review feedback. Renamed "free" to "shrink" everywhere, updated comments and threshold to 1000. --- tsdb/chunks/chunk_write_queue.go | 77 +++++++++++++++++--------------- 1 file changed, 41 insertions(+), 36 deletions(-) diff --git a/tsdb/chunks/chunk_write_queue.go b/tsdb/chunks/chunk_write_queue.go index 79220065ad..f766eb43a5 100644 --- a/tsdb/chunks/chunk_write_queue.go +++ b/tsdb/chunks/chunk_write_queue.go @@ -24,12 +24,11 @@ import ( ) const ( - // Minimum recorded peak since since the last freeing - // of chunkWriteQueue.chunkrefMap to free it again. - chunkRefMapFreeThreshold = 10 + // Minimum recorded peak since since the last shrinking of chunkWriteQueue.chunkrefMap to shrink it again. + chunkRefMapShrinkThreshold = 1000 - // Minimum interval between freeing of chunkWriteQueue.chunkRefMap. - chunkRefMapMinFreeInterval = 10 * time.Minute + // Minimum interval between shrinking of chunkWriteQueue.chunkRefMap. + chunkRefMapMinShrinkInterval = 10 * time.Minute ) type chunkWriteJob struct { @@ -48,24 +47,28 @@ type chunkWriteJob struct { type chunkWriteQueue struct { jobs chan chunkWriteJob - chunkRefMapMtx sync.RWMutex - chunkRefMap map[ChunkDiskMapperRef]chunkenc.Chunk - chunkRefMapPeakSize int // Largest size that chunkRefMap has grown to since the last time it got freed. - chunkRefMapLastFree time.Time // When the chunkRefMap has been freed the last time. + chunkRefMapMtx sync.RWMutex + chunkRefMap map[ChunkDiskMapperRef]chunkenc.Chunk + chunkRefMapPeakSize int // Largest size that chunkRefMap has grown to since the last time we shrank it. + chunkRefMapLastShrink time.Time // When the chunkRefMap has been shrunk the last time. - isRunningMtx sync.Mutex // Protects the isRunning property. - isRunning bool // Used to prevent that new jobs get added to the queue when the chan is already closed. + // isRunningMtx serves two purposes: + // 1. It protects isRunning field. + // 2. It serializes adding of jobs to the chunkRefMap in addJob() method. If jobs channel is full then addJob() will block + // while holding this mutex, which guarantees that chunkRefMap won't ever grow beyond the queue size + 1. + isRunningMtx sync.Mutex + isRunning bool // Used to prevent that new jobs get added to the queue when the chan is already closed. workerWg sync.WaitGroup writeChunk writeChunkF - // Keeping three separate counters instead of only a single CounterVec to improve the performance of the critical + // Keeping separate counters instead of only a single CounterVec to improve the performance of the critical // addJob() method which otherwise would need to perform a WithLabelValues call on the CounterVec. adds prometheus.Counter gets prometheus.Counter completed prometheus.Counter - free prometheus.Counter + shrink prometheus.Counter } // writeChunkF is a function which writes chunks, it is dynamic to allow mocking in tests. @@ -81,15 +84,15 @@ func newChunkWriteQueue(reg prometheus.Registerer, size int, writeChunk writeChu ) q := &chunkWriteQueue{ - jobs: make(chan chunkWriteJob, size), - chunkRefMap: make(map[ChunkDiskMapperRef]chunkenc.Chunk), - chunkRefMapLastFree: time.Now(), - writeChunk: writeChunk, + jobs: make(chan chunkWriteJob, size), + chunkRefMap: make(map[ChunkDiskMapperRef]chunkenc.Chunk), + chunkRefMapLastShrink: time.Now(), + writeChunk: writeChunk, adds: counters.WithLabelValues("add"), gets: counters.WithLabelValues("get"), completed: counters.WithLabelValues("complete"), - free: counters.WithLabelValues("free"), + shrink: counters.WithLabelValues("shrink"), } if reg != nil { @@ -128,37 +131,41 @@ func (c *chunkWriteQueue) processJob(job chunkWriteJob) { c.completed.Inc() - c.freeChunkRefMap() + c.shrinkChunkRefMap() } -// freeChunkRefMap checks whether the conditions to free the chunkRefMap are met, -// if so it re-initializes it and frees the memory which it currently uses. -// The chunkRefMapMtx must be held when calling this method. -func (c *chunkWriteQueue) freeChunkRefMap() { +// shrinkChunkRefMap checks whether the conditions to shrink the chunkRefMap are met, +// if so chunkRefMap is reinitialized. The chunkRefMapMtx must be held when calling this method. +// +// We do this because Go runtime doesn't release internal memory used by map after map has been emptied. +// To achieve that we create new map instead and throw the old one away. +func (c *chunkWriteQueue) shrinkChunkRefMap() { if len(c.chunkRefMap) > 0 { - // Can't free it while there is data in it. + // Can't shrink it while there is data in it. return } - if c.chunkRefMapPeakSize < chunkRefMapFreeThreshold { - // Not freeing it because it has not grown to the minimum threshold yet. + if c.chunkRefMapPeakSize < chunkRefMapShrinkThreshold { + // Not shrinking it because it has not grown to the minimum threshold yet. return } - if time.Since(c.chunkRefMapLastFree) < chunkRefMapMinFreeInterval { - // Not freeing it because the minimum duration between free-events has not passed yet. + now := time.Now() + + if now.Sub(c.chunkRefMapLastShrink) < chunkRefMapMinShrinkInterval { + // Not shrinking it because the minimum duration between shrink-events has not passed yet. return } // Re-initialize the chunk ref map to half of the peak size that it has grown to since the last re-init event. - // By initializing it to half of the peak size since the last re-init event we try to hit the sweet spot in the - // trade-off between initializing it to a very small size potentially resulting in many allocations to re-grow it, - // and initializing it to a large size potentially resulting in unused allocated memory. + // We are trying to hit the sweet spot in the trade-off between initializing it to a very small size + // potentially resulting in many allocations to re-grow it, and initializing it to a large size potentially + // resulting in unused allocated memory. c.chunkRefMap = make(map[ChunkDiskMapperRef]chunkenc.Chunk, c.chunkRefMapPeakSize/2) c.chunkRefMapPeakSize = 0 - c.chunkRefMapLastFree = time.Now() - c.free.Inc() + c.chunkRefMapLastShrink = now + c.shrink.Inc() } func (c *chunkWriteQueue) addJob(job chunkWriteJob) (err error) { @@ -168,13 +175,11 @@ func (c *chunkWriteQueue) addJob(job chunkWriteJob) (err error) { } }() - // c.isRunningMtx serializes the adding of jobs to the c.chunkRefMap, if c.jobs is full then c.addJob() will block - // while holding c.isRunningMtx, this guarantees that c.chunkRefMap won't ever grow beyond the queue size + 1. c.isRunningMtx.Lock() defer c.isRunningMtx.Unlock() if !c.isRunning { - return errors.New("queue is not started") + return errors.New("queue is not running") } c.chunkRefMapMtx.Lock()