Skip to content

Commit

Permalink
Improve query cache hit rate
Browse files Browse the repository at this point in the history
  • Loading branch information
prymitive committed Mar 17, 2022
1 parent 9144913 commit dafcf5e
Show file tree
Hide file tree
Showing 7 changed files with 103 additions and 51 deletions.
7 changes: 7 additions & 0 deletions docs/changelog.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,12 @@
# Changelog

## v0.15.2

### Fixed

- Improved query cache hit rate and added `pint_prometheus_cache_hits_total` metric
to track the number of cache hits.

## v0.15.1

### Added
Expand Down
1 change: 1 addition & 0 deletions internal/promapi/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ func (p *Prometheus) Config(ctx context.Context) (*ConfigResult, error) {

if v, ok := p.cache.Get(key); ok {
log.Debug().Str("key", key).Str("uri", p.uri).Msg("Config cache hit")
prometheusCacheHitsTotal.WithLabelValues(p.name, "/api/v1/status/config")
cfg := v.(ConfigResult)
return &cfg, nil
}
Expand Down
8 changes: 4 additions & 4 deletions internal/promapi/errors.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,25 +26,25 @@ func CanRetryError(err error, delta time.Duration) (time.Duration, bool) {

var neterr net.Error
if ok := errors.As(err, &neterr); ok && neterr.Timeout() {
return delta / 2, true
return (delta / 2).Round(time.Minute), true
}

var apiErr *v1.Error
if ok := errors.As(err, &apiErr); ok {
switch apiErr.Type {
case v1.ErrBadData:
case v1.ErrTimeout:
return delta / 2, true
return (delta / 2).Round(time.Minute), true
case v1.ErrCanceled:
case v1.ErrExec:
if strings.Contains(apiErr.Msg, "query processing would load too many samples into memory in ") {
return (delta / 4) * 3, true
return (delta / 4).Round(time.Minute), true
}
return delta / 2, true
case v1.ErrBadResponse:
case v1.ErrServer:
case v1.ErrClient:
return delta / 2, true
return (delta / 2).Round(time.Minute), true
}
}

Expand Down
8 changes: 8 additions & 0 deletions internal/promapi/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,13 @@ import (
)

var (
prometheusCacheHitsTotal = prometheus.NewCounterVec(
prometheus.CounterOpts{
Name: "pint_prometheus_cache_hits_total",
Help: "Total number of all prometheus queries served from a cache",
},
[]string{"name", "endpoint"},
)
prometheusQueriesTotal = prometheus.NewCounterVec(
prometheus.CounterOpts{
Name: "pint_prometheus_queries_total",
Expand All @@ -27,6 +34,7 @@ var (
)

func RegisterMetrics() {
prometheus.MustRegister(prometheusCacheHitsTotal)
prometheus.MustRegister(prometheusQueriesTotal)
prometheus.MustRegister(prometheusQueryErrorsTotal)
}
Expand Down
1 change: 1 addition & 0 deletions internal/promapi/query.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ func (p *Prometheus) Query(ctx context.Context, expr string) (*QueryResult, erro

if v, ok := p.cache.Get(expr); ok {
log.Debug().Str("key", expr).Str("uri", p.uri).Msg("Query cache hit")
prometheusCacheHitsTotal.WithLabelValues(p.name, "/api/v1/query").Inc()
r := v.(QueryResult)
return &r, nil
}
Expand Down
44 changes: 33 additions & 11 deletions internal/promapi/range.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,9 @@ type RangeQueryResult struct {
}

func (p *Prometheus) RangeQuery(ctx context.Context, expr string, start, end time.Time, step time.Duration) (*RangeQueryResult, error) {
start = start.Round(time.Second)
end = end.Round(time.Second)

log.Debug().
Str("uri", p.uri).
Str("query", expr).
Expand All @@ -33,14 +36,24 @@ func (p *Prometheus) RangeQuery(ctx context.Context, expr string, start, end tim

lockKey := "/api/v1/query/range"
p.lock.lock(lockKey)
defer p.lock.unlock(lockKey)

cacheKey := strings.Join([]string{expr, start.String(), end.String(), step.String()}, "\n")
return p.realRangeQuery(ctx, expr, start, end, step, cacheKey, false)
}

func (p *Prometheus) realRangeQuery(
ctx context.Context,
expr string, start, end time.Time, step time.Duration,
cacheKey string, isRetry bool,
) (*RangeQueryResult, error) {
if v, ok := p.cache.Get(cacheKey); ok {
log.Debug().Str("key", cacheKey).Str("uri", p.uri).Msg("Range query cache hit")
prometheusCacheHitsTotal.WithLabelValues(p.name, "/api/v1/query_range").Inc()
r := v.(RangeQueryResult)
p.lock.unlock((lockKey))
return &r, nil
}
log.Debug().Str("key", cacheKey).Str("uri", p.uri).Msg("Range query cache miss")

prometheusQueriesTotal.WithLabelValues(p.name, "/api/v1/query_range").Inc()
r := v1.Range{
Expand All @@ -49,23 +62,31 @@ func (p *Prometheus) RangeQuery(ctx context.Context, expr string, start, end tim
Step: step,
}

p.slowQueryLock.Lock()
if v, ok := p.slowQueryCache.Get(expr); ok {
log.Debug().
Str("query", expr).
Str("delta", output.HumanizeDuration(v.(time.Duration))).
Msg("Got cached slow query delta")
r.Start.Add(v.(time.Duration))
if !isRetry {
p.slowQueryLock.Lock()
if v, ok := p.slowQueryCache.Get(expr); ok {
log.Debug().
Str("query", expr).
Str("delta", output.HumanizeDuration(v.(time.Duration))).
Str("start", r.Start.String()).
Str("cached", r.End.Add(v.(time.Duration)*-1).String()).
Msg("Got cached slow query delta")
r.Start = r.End.Add(v.(time.Duration) * -1)
}
p.slowQueryLock.Unlock()
}
p.slowQueryLock.Unlock()

rctx, cancel := context.WithTimeout(ctx, p.timeout)
defer cancel()

log.Debug().
Str("uri", p.uri).
Str("query", expr).
Bool("retry", isRetry).
Msg("Executing range query")
qstart := time.Now()
result, _, err := p.api.QueryRange(rctx, expr, r)
duration := time.Since(qstart)
p.lock.unlock((lockKey))
log.Debug().
Str("uri", p.uri).
Str("query", expr).
Expand All @@ -81,9 +102,10 @@ func (p *Prometheus) RangeQuery(ctx context.Context, expr string, start, end tim
}
log.Warn().Str("delta", output.HumanizeDuration(delta)).Msg("Retrying request with smaller range")
p.slowQueryLock.Lock()
p.slowQueryCache.Remove(expr)
p.slowQueryCache.Add(expr, delta)
p.slowQueryLock.Unlock()
return p.RangeQuery(ctx, expr, start.Add(delta), end, step)
return p.realRangeQuery(ctx, expr, end.Add(delta*-1), end, step, cacheKey, true)
}
return nil, err
}
Expand Down
Loading

0 comments on commit dafcf5e

Please sign in to comment.