Skip to content

Commit

Permalink
dont waste space on the chunkRefMap
Browse files Browse the repository at this point in the history
Signed-off-by: Mauro Stettler <mauro.stettler@gmail.com>
  • Loading branch information
replay committed Feb 8, 2022
1 parent 644605d commit 8e3bf56
Showing 1 changed file with 21 additions and 3 deletions.
24 changes: 21 additions & 3 deletions tsdb/chunks/chunk_write_queue.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,8 @@ import (
"github.com/prometheus/prometheus/tsdb/chunkenc"
)

const chunkRefMapFreeThreshold = 10

type chunkWriteJob struct {
cutFile bool
seriesRef HeadSeriesRef
Expand All @@ -38,8 +40,9 @@ type chunkWriteJob struct {
type chunkWriteQueue struct {
jobs chan chunkWriteJob

chunkRefMapMtx sync.RWMutex
chunkRefMap map[ChunkDiskMapperRef]chunkenc.Chunk
chunkRefMapMtx sync.RWMutex
chunkRefMap map[ChunkDiskMapperRef]chunkenc.Chunk
chunkRefMapPeak int

isRunningMtx sync.Mutex // Protects the isRunning property.
isRunning bool // Used to prevent that new jobs get added to the queue when the chan is already closed.
Expand Down Expand Up @@ -69,7 +72,7 @@ func newChunkWriteQueue(reg prometheus.Registerer, size int, writeChunk writeChu

q := &chunkWriteQueue{
jobs: make(chan chunkWriteJob, size),
chunkRefMap: make(map[ChunkDiskMapperRef]chunkenc.Chunk, size),
chunkRefMap: make(map[ChunkDiskMapperRef]chunkenc.Chunk),
writeChunk: writeChunk,

adds: counters.WithLabelValues("add"),
Expand Down Expand Up @@ -112,6 +115,14 @@ func (c *chunkWriteQueue) processJob(job chunkWriteJob) {
delete(c.chunkRefMap, job.ref)

c.completed.Inc()

if len(c.chunkRefMap) == 0 && c.chunkRefMapPeak > chunkRefMapFreeThreshold {
// Re-initialize the chunk ref map to half of the peak size that was in use since the last re-init event.
// By setting it to half of the peak we try to minimize the number of allocations required for a "normal" usage
// while ensuring that if its usage has decreased we shrink it over time.
c.chunkRefMap = make(map[ChunkDiskMapperRef]chunkenc.Chunk, c.chunkRefMapPeak/2)
c.chunkRefMapPeak = 0
}
}

func (c *chunkWriteQueue) addJob(job chunkWriteJob) (err error) {
Expand All @@ -121,6 +132,8 @@ func (c *chunkWriteQueue) addJob(job chunkWriteJob) (err error) {
}
}()

// c.isRunningMtx serializes the adding of jobs to the c.chunkRefMap, if c.jobs is full then c.addJob() will block
// while holding c.isRunningMtx, this guarantees that c.chunkRefMap won't ever grow beyond the queue size + 1.
c.isRunningMtx.Lock()
defer c.isRunningMtx.Unlock()

Expand All @@ -130,6 +143,11 @@ func (c *chunkWriteQueue) addJob(job chunkWriteJob) (err error) {

c.chunkRefMapMtx.Lock()
c.chunkRefMap[job.ref] = job.chk

// Keep track of the peak usage of c.chunkRefMap.
if len(c.chunkRefMap) > c.chunkRefMapPeak {
c.chunkRefMapPeak = len(c.chunkRefMap)
}
c.chunkRefMapMtx.Unlock()

c.jobs <- job
Expand Down

0 comments on commit 8e3bf56

Please sign in to comment.