Skip to content

Commit

Permalink
Address review feedback. Renamed "free" to "shrink" everywhere, updat…
Browse files Browse the repository at this point in the history
…ed comments and threshold to 1000.
  • Loading branch information
pstibrany committed Jun 1, 2022
1 parent be28c10 commit d7a75bc
Showing 1 changed file with 41 additions and 36 deletions.
77 changes: 41 additions & 36 deletions tsdb/chunks/chunk_write_queue.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,12 +24,11 @@ import (
)

const (
// Minimum recorded peak since since the last freeing
// of chunkWriteQueue.chunkrefMap to free it again.
chunkRefMapFreeThreshold = 10
// Minimum recorded peak since since the last shrinking of chunkWriteQueue.chunkrefMap to shrink it again.
chunkRefMapShrinkThreshold = 1000

// Minimum interval between freeing of chunkWriteQueue.chunkRefMap.
chunkRefMapMinFreeInterval = 10 * time.Minute
// Minimum interval between shrinking of chunkWriteQueue.chunkRefMap.
chunkRefMapMinShrinkInterval = 10 * time.Minute
)

type chunkWriteJob struct {
Expand All @@ -48,24 +47,28 @@ type chunkWriteJob struct {
type chunkWriteQueue struct {
jobs chan chunkWriteJob

chunkRefMapMtx sync.RWMutex
chunkRefMap map[ChunkDiskMapperRef]chunkenc.Chunk
chunkRefMapPeakSize int // Largest size that chunkRefMap has grown to since the last time it got freed.
chunkRefMapLastFree time.Time // When the chunkRefMap has been freed the last time.
chunkRefMapMtx sync.RWMutex
chunkRefMap map[ChunkDiskMapperRef]chunkenc.Chunk
chunkRefMapPeakSize int // Largest size that chunkRefMap has grown to since the last time we shrank it.
chunkRefMapLastShrink time.Time // When the chunkRefMap has been shrunk the last time.

isRunningMtx sync.Mutex // Protects the isRunning property.
isRunning bool // Used to prevent that new jobs get added to the queue when the chan is already closed.
// isRunningMtx serves two purposes:
// 1. It protects isRunning field.
// 2. It serializes adding of jobs to the chunkRefMap in addJob() method. If jobs channel is full then addJob() will block
// while holding this mutex, which guarantees that chunkRefMap won't ever grow beyond the queue size + 1.
isRunningMtx sync.Mutex
isRunning bool // Used to prevent that new jobs get added to the queue when the chan is already closed.

workerWg sync.WaitGroup

writeChunk writeChunkF

// Keeping three separate counters instead of only a single CounterVec to improve the performance of the critical
// Keeping separate counters instead of only a single CounterVec to improve the performance of the critical
// addJob() method which otherwise would need to perform a WithLabelValues call on the CounterVec.
adds prometheus.Counter
gets prometheus.Counter
completed prometheus.Counter
free prometheus.Counter
shrink prometheus.Counter
}

// writeChunkF is a function which writes chunks, it is dynamic to allow mocking in tests.
Expand All @@ -81,15 +84,15 @@ func newChunkWriteQueue(reg prometheus.Registerer, size int, writeChunk writeChu
)

q := &chunkWriteQueue{
jobs: make(chan chunkWriteJob, size),
chunkRefMap: make(map[ChunkDiskMapperRef]chunkenc.Chunk),
chunkRefMapLastFree: time.Now(),
writeChunk: writeChunk,
jobs: make(chan chunkWriteJob, size),
chunkRefMap: make(map[ChunkDiskMapperRef]chunkenc.Chunk),
chunkRefMapLastShrink: time.Now(),
writeChunk: writeChunk,

adds: counters.WithLabelValues("add"),
gets: counters.WithLabelValues("get"),
completed: counters.WithLabelValues("complete"),
free: counters.WithLabelValues("free"),
shrink: counters.WithLabelValues("shrink"),
}

if reg != nil {
Expand Down Expand Up @@ -128,37 +131,41 @@ func (c *chunkWriteQueue) processJob(job chunkWriteJob) {

c.completed.Inc()

c.freeChunkRefMap()
c.shrinkChunkRefMap()
}

// freeChunkRefMap checks whether the conditions to free the chunkRefMap are met,
// if so it re-initializes it and frees the memory which it currently uses.
// The chunkRefMapMtx must be held when calling this method.
func (c *chunkWriteQueue) freeChunkRefMap() {
// shrinkChunkRefMap checks whether the conditions to shrink the chunkRefMap are met,
// if so chunkRefMap is reinitialized. The chunkRefMapMtx must be held when calling this method.
//
// We do this because Go runtime doesn't release internal memory used by map after map has been emptied.
// To achieve that we create new map instead and throw the old one away.
func (c *chunkWriteQueue) shrinkChunkRefMap() {
if len(c.chunkRefMap) > 0 {
// Can't free it while there is data in it.
// Can't shrink it while there is data in it.
return
}

if c.chunkRefMapPeakSize < chunkRefMapFreeThreshold {
// Not freeing it because it has not grown to the minimum threshold yet.
if c.chunkRefMapPeakSize < chunkRefMapShrinkThreshold {
// Not shrinking it because it has not grown to the minimum threshold yet.
return
}

if time.Since(c.chunkRefMapLastFree) < chunkRefMapMinFreeInterval {
// Not freeing it because the minimum duration between free-events has not passed yet.
now := time.Now()

if now.Sub(c.chunkRefMapLastShrink) < chunkRefMapMinShrinkInterval {
// Not shrinking it because the minimum duration between shrink-events has not passed yet.
return
}

// Re-initialize the chunk ref map to half of the peak size that it has grown to since the last re-init event.
// By initializing it to half of the peak size since the last re-init event we try to hit the sweet spot in the
// trade-off between initializing it to a very small size potentially resulting in many allocations to re-grow it,
// and initializing it to a large size potentially resulting in unused allocated memory.
// We are trying to hit the sweet spot in the trade-off between initializing it to a very small size
// potentially resulting in many allocations to re-grow it, and initializing it to a large size potentially
// resulting in unused allocated memory.
c.chunkRefMap = make(map[ChunkDiskMapperRef]chunkenc.Chunk, c.chunkRefMapPeakSize/2)

c.chunkRefMapPeakSize = 0
c.chunkRefMapLastFree = time.Now()
c.free.Inc()
c.chunkRefMapLastShrink = now
c.shrink.Inc()
}

func (c *chunkWriteQueue) addJob(job chunkWriteJob) (err error) {
Expand All @@ -168,13 +175,11 @@ func (c *chunkWriteQueue) addJob(job chunkWriteJob) (err error) {
}
}()

// c.isRunningMtx serializes the adding of jobs to the c.chunkRefMap, if c.jobs is full then c.addJob() will block
// while holding c.isRunningMtx, this guarantees that c.chunkRefMap won't ever grow beyond the queue size + 1.
c.isRunningMtx.Lock()
defer c.isRunningMtx.Unlock()

if !c.isRunning {
return errors.New("queue is not started")
return errors.New("queue is not running")
}

c.chunkRefMapMtx.Lock()
Expand Down

0 comments on commit d7a75bc

Please sign in to comment.