Skip to content

Commit

Permalink
Track cache gets
Browse files Browse the repository at this point in the history
  • Loading branch information
prymitive committed Nov 28, 2022
1 parent ceca305 commit 4aed40c
Show file tree
Hide file tree
Showing 5 changed files with 149 additions and 39 deletions.
29 changes: 17 additions & 12 deletions internal/promapi/cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,8 +10,9 @@ import (

type cacheEntry struct {
lst *list.Element
data queryResult
data any
expiresAt time.Time
lastGet time.Time
cost int
}

Expand All @@ -23,19 +24,21 @@ type endpointStats struct {
func (e *endpointStats) hit() { e.hits++ }
func (e *endpointStats) miss() { e.misses++ }

func newQueryCache(maxSize int) *queryCache {
func newQueryCache(maxSize int, maxStale time.Duration) *queryCache {
return &queryCache{
entries: map[uint64]*cacheEntry{},
stats: map[string]*endpointStats{},
maxCost: maxSize,
useList: list.New(),
entries: map[uint64]*cacheEntry{},
stats: map[string]*endpointStats{},
maxCost: maxSize,
maxStale: maxStale,
useList: list.New(),
}
}

type queryCache struct {
mu sync.Mutex
entries map[uint64]*cacheEntry
stats map[string]*endpointStats
maxStale time.Duration
cost int
maxCost int
evictions int
Expand All @@ -53,7 +56,7 @@ func (c *queryCache) endpointStats(endpoint string) *endpointStats {
return e
}

func (c *queryCache) get(key uint64, endpoint string) (v queryResult, ok bool) {
func (c *queryCache) get(key uint64, endpoint string) (v any, ok bool) {
c.mu.Lock()
defer c.mu.Unlock()

Expand All @@ -64,6 +67,7 @@ func (c *queryCache) get(key uint64, endpoint string) (v queryResult, ok bool) {
return v, ok
}

ce.lastGet = time.Now()
c.useList.MoveToFront(ce.lst)
c.endpointStats(endpoint).hit()

Expand All @@ -72,7 +76,7 @@ func (c *queryCache) get(key uint64, endpoint string) (v queryResult, ok bool) {

// Cache results if it was requested at least twice EVER - which means it's either
// popular and requested multiple times within a loop OR this cache key survives between loops.
func (c *queryCache) set(key uint64, val queryResult, ttl time.Duration, cost int, endpoint string) {
func (c *queryCache) set(key uint64, val any, ttl time.Duration, cost int, endpoint string) {
c.mu.Lock()
defer c.mu.Unlock()

Expand All @@ -92,9 +96,10 @@ func (c *queryCache) set(key uint64, val queryResult, ttl time.Duration, cost in

c.cost += cost
c.entries[key] = &cacheEntry{
data: val,
cost: cost,
lst: lst,
data: val,
cost: cost,
lst: lst,
lastGet: time.Now(),
}
if ttl > 0 {
c.entries[key].expiresAt = time.Now().Add(ttl)
Expand Down Expand Up @@ -122,7 +127,7 @@ func (c *queryCache) gc() {

now := time.Now()
for key, ce := range c.entries {
if !ce.expiresAt.IsZero() && ce.expiresAt.Before(now) {
if (!ce.expiresAt.IsZero() && ce.expiresAt.Before(now)) || now.Sub(ce.lastGet) >= c.maxStale {
c.useList.Remove(ce.lst)
c.cost -= ce.cost
c.evictions++
Expand Down
149 changes: 126 additions & 23 deletions internal/promapi/cache_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,11 +14,11 @@ import (
func TestQueryCacheOnlySet(t *testing.T) {
const maxSize = 100
mockErr := errors.New("Fake Error")
cache := newQueryCache(maxSize)
cache := newQueryCache(maxSize, time.Minute)

var i uint64
for i = 1; i <= maxSize; i++ {
cache.set(i, queryResult{err: mockErr}, 0, 1, "/foo")
cache.set(i, mockErr, 0, 1, "/foo")
}

require.Equal(t, maxSize, cache.cost)
Expand All @@ -30,11 +30,11 @@ func TestQueryCacheOnlySet(t *testing.T) {
func TestQueryCacheReplace(t *testing.T) {
const maxSize = 100
mockErr := errors.New("Fake Error")
cache := newQueryCache(maxSize)
cache := newQueryCache(maxSize, time.Minute)

cache.set(6, queryResult{err: mockErr}, 0, 7, "/foo")
cache.set(6, queryResult{err: mockErr}, 0, 7, "/foo")
cache.set(6, queryResult{err: mockErr}, 0, 7, "/foo")
cache.set(6, mockErr, 0, 7, "/foo")
cache.set(6, mockErr, 0, 7, "/foo")
cache.set(6, mockErr, 0, 7, "/foo")

require.Equal(t, 7, cache.cost)
require.Equal(t, 1, len(cache.entries))
Expand All @@ -45,7 +45,7 @@ func TestQueryCacheReplace(t *testing.T) {
func TestQueryCacheGetAndSet(t *testing.T) {
const maxSize = 100
mockErr := errors.New("Fake Error")
cache := newQueryCache(maxSize)
cache := newQueryCache(maxSize, time.Minute)

var i uint64
for i = 1; i <= maxSize; i++ {
Expand All @@ -55,13 +55,13 @@ func TestQueryCacheGetAndSet(t *testing.T) {
require.Zero(t, v)

// first set
cache.set(i, queryResult{err: mockErr}, time.Minute, 2, "/foo")
cache.set(i, mockErr, time.Minute, 2, "/foo")

// second get, should be in cache now
v, ok = cache.get(i, "/foo")
require.Equal(t, true, ok, "should be present in cache on third get")
require.NotZero(t, v)
require.Equal(t, mockErr, v.err)
require.Equal(t, mockErr, v)
}

require.Equal(t, 100, cache.cost)
Expand All @@ -83,15 +83,15 @@ func TestQueryCacheGetAndSet(t *testing.T) {
func TestQueryCachePurgeMaxCost(t *testing.T) {
const maxSize = 460
mockErr := errors.New("Fake Error")
cache := newQueryCache(maxSize)
cache := newQueryCache(maxSize, time.Minute)

var i uint64
for i = 1; i <= 100; i++ {
cost := int(i % 10)
if cost == 0 {
cost = 1
}
cache.set(i, queryResult{err: mockErr}, 0, cost, "/foo")
cache.set(i, mockErr, 0, cost, "/foo")
_, _ = cache.get(i, "/foo")
}

Expand All @@ -106,7 +106,7 @@ func TestQueryCachePurgeMaxCost(t *testing.T) {
cost = 1
}
cost++
cache.set(i, queryResult{err: mockErr}, 0, cost, "/bar")
cache.set(i, mockErr, 0, cost, "/bar")
_, _ = cache.get(i, "/foo")
}
require.Equal(t, 460, cache.cost)
Expand All @@ -118,11 +118,11 @@ func TestQueryCachePurgeMaxCost(t *testing.T) {
func TestQueryCachePurgeZeroTTL(t *testing.T) {
const maxSize = 100
mockErr := errors.New("Fake Error")
cache := newQueryCache(maxSize)
cache := newQueryCache(maxSize, time.Minute)

var i uint64
for i = 1; i <= maxSize; i++ {
cache.set(i, queryResult{err: mockErr}, 0, 1, "/foo")
cache.set(i, mockErr, 0, 1, "/foo")
_, _ = cache.get(i, "/foo")
}
require.Equal(t, 100, cache.cost)
Expand All @@ -141,13 +141,13 @@ func TestQueryCachePurgeZeroTTL(t *testing.T) {
func TestQueryCachePurgeExpired(t *testing.T) {
const maxSize = 100
mockErr := errors.New("Fake Error")
cache := newQueryCache(maxSize)
cache := newQueryCache(maxSize, time.Minute)

var i uint64
for i = 1; i <= maxSize; i++ {
_, _ = cache.get(i, "/foo")
_, _ = cache.get(i, "/foo")
cache.set(i, queryResult{err: mockErr}, time.Second, 1, "/foo")
cache.set(i, mockErr, time.Second, 1, "/foo")
_, _ = cache.get(i, "/foo")
}
require.Equal(t, 100, cache.cost)
Expand All @@ -169,11 +169,11 @@ func TestQueryCachePurgeExpired(t *testing.T) {
func TestQueryCacheOverrideExpired(t *testing.T) {
const maxSize = 100
mockErr := errors.New("Fake Error")
cache := newQueryCache(maxSize)
cache := newQueryCache(maxSize, time.Minute)

var i uint64
for i = 1; i <= maxSize; i++ {
cache.set(i, queryResult{err: mockErr}, time.Second, 1, "/foo")
cache.set(i, mockErr, time.Second, 1, "/foo")
_, _ = cache.get(i, "/foo")
}
require.Equal(t, 100, cache.cost)
Expand All @@ -183,7 +183,7 @@ func TestQueryCacheOverrideExpired(t *testing.T) {

cache.entries[maxSize/2].expiresAt = time.Now().Add(time.Second * -1)

cache.set(maxSize+1, queryResult{err: mockErr}, time.Second, 1, "/foo")
cache.set(maxSize+1, mockErr, time.Second, 1, "/foo")
_, _ = cache.get(maxSize+1, "/foo")

require.Equal(t, 100, cache.cost)
Expand All @@ -195,11 +195,11 @@ func TestQueryCacheOverrideExpired(t *testing.T) {
func TestQueryCacheEvictLRU(t *testing.T) {
const maxSize = 100
mockErr := errors.New("Fake Error")
cache := newQueryCache(maxSize)
cache := newQueryCache(maxSize, time.Minute)

var i, j uint64
for i = 1; i <= maxSize; i++ {
cache.set(i, queryResult{err: mockErr}, time.Second, 1, "/foo")
cache.set(i, mockErr, time.Second, 1, "/foo")
for j = 1; j <= i; j++ {
_, _ = cache.get(i, "/foo")
}
Expand All @@ -216,7 +216,7 @@ func TestQueryCacheEvictLRU(t *testing.T) {
require.Equal(t, 0, cache.evictions)

for i = maxSize + 1; i <= maxSize+20; i++ {
cache.set(i, queryResult{err: mockErr}, time.Second, 1, "/foo")
cache.set(i, mockErr, time.Second, 1, "/foo")
}
require.Equal(t, 100, cache.cost)
require.Equal(t, 100, len(cache.entries))
Expand All @@ -230,9 +230,53 @@ func TestQueryCacheEvictLRU(t *testing.T) {
}
}

func TestQueryCacheEvictMaxStale(t *testing.T) {
const maxSize = 100
mockErr := errors.New("Fake Error")
cache := newQueryCache(maxSize, time.Second)

var i, j uint64
for i = 1; i <= maxSize; i++ {
cache.set(i, mockErr, time.Minute, 1, "/foo")
for j = 1; j <= i; j++ {
_, _ = cache.get(i, "/foo")
}
}
require.Equal(t, 100, cache.cost)
require.Equal(t, 100, len(cache.entries))
require.Equal(t, 100, cache.useList.Len())
require.Equal(t, 0, cache.evictions)

cache.gc()
require.Equal(t, 100, cache.cost)
require.Equal(t, 100, len(cache.entries))
require.Equal(t, 100, cache.useList.Len())
require.Equal(t, 0, cache.evictions)

time.Sleep(time.Second + time.Millisecond*100)
for i = 1; i <= 50; i++ {
_, _ = cache.get(i, "/foo")
}
cache.gc()
require.Equal(t, 50, cache.cost)
require.Equal(t, 50, len(cache.entries))
require.Equal(t, 50, cache.useList.Len())
require.Equal(t, 50, cache.evictions)

var ok bool
for i = 1; i <= 50; i++ {
_, ok = cache.get(i, "/foo")
require.True(t, ok)
}
for i = 51; i <= maxSize; i++ {
_, ok = cache.get(i, "/foo")
require.False(t, ok)
}
}

func TestCacheCollector(t *testing.T) {
const maxSize = 100
cache := newQueryCache(maxSize)
cache := newQueryCache(maxSize, time.Minute)

names := []string{
"pint_prometheus_cache_size",
Expand Down Expand Up @@ -344,3 +388,62 @@ pint_prometheus_cache_size{name="prom"} 100
names...,
))
}

func BenchmarkQueryCacheOnlySet(b *testing.B) {
const maxSize = 1000
mockErr := errors.New("Fake Error")
cache := newQueryCache(maxSize, time.Minute)

endpoint := "/foo"
for n := 0; n < b.N; n++ {
cache.set(1, mockErr, 0, 1, endpoint)
}
}

func BenchmarkQueryCacheSetGrow(b *testing.B) {
const maxSize = 1000
mockErr := errors.New("Fake Error")
cache := newQueryCache(maxSize, time.Minute)

var i uint64
for i = 1; i <= maxSize; i++ {
cache.set(i, mockErr, 0, 1, "/foo")
}

endpoint := "/foo"
for n := 1; n <= b.N; n++ {
cache.set(uint64(maxSize+n), mockErr, 0, 1, endpoint)
}
}

func BenchmarkQueryCacheGetMiss(b *testing.B) {
const maxSize = 1000
cache := newQueryCache(maxSize, time.Minute)

for n := 0; n < b.N; n++ {
cache.get(uint64(n), "/foo")
}
}

func BenchmarkQueryCacheGC(b *testing.B) {
const maxSize = 1000
mockErr := errors.New("Fake Error")
cache := newQueryCache(maxSize, time.Minute)

var i uint64
var ttl time.Duration
for n := 0; n < b.N; n++ {
b.StopTimer()
if n%2 == 0 {
ttl = 0
} else {
ttl = time.Millisecond
}
for i = 1; i <= maxSize; i++ {
cache.set(i, mockErr, ttl, 1, "/foo")
}
time.Sleep(time.Millisecond * 2)
b.StartTimer()
cache.gc()
}
}
2 changes: 1 addition & 1 deletion internal/promapi/failover.go
Original file line number Diff line number Diff line change
Expand Up @@ -93,7 +93,7 @@ func (fg *FailoverGroup) IsEnabledForPath(path string) bool {
}

func (fg *FailoverGroup) StartWorkers() {
queryCache := newQueryCache(fg.cacheSize)
queryCache := newQueryCache(fg.cacheSize, time.Hour)
fg.quitChan = make(chan bool)
go cacheCleaner(queryCache, time.Minute*2, fg.quitChan)

Expand Down
4 changes: 2 additions & 2 deletions internal/promapi/prometheus.go
Original file line number Diff line number Diff line change
Expand Up @@ -164,7 +164,7 @@ func processJob(prom *Prometheus, job queryRequest) queryResult {
Str("query", job.query.String()).
Uint64("key", cacheKey).
Msg("Cache hit")
return cached
return queryResult{value: cached}
}
}

Expand Down Expand Up @@ -204,7 +204,7 @@ func processJob(prom *Prometheus, job queryRequest) queryResult {
}

if prom.cache != nil {
prom.cache.set(cacheKey, result, job.query.CacheTTL(), cost, job.query.Endpoint())
prom.cache.set(cacheKey, result.value, job.query.CacheTTL(), cost, job.query.Endpoint())
}

return result
Expand Down
Loading

0 comments on commit 4aed40c

Please sign in to comment.