Skip to content

Commit

Permalink
Improve cache hit rate
Browse files Browse the repository at this point in the history
  • Loading branch information
prymitive committed Mar 18, 2022
1 parent 3ef96c0 commit 0483c1c
Show file tree
Hide file tree
Showing 6 changed files with 195 additions and 137 deletions.
5 changes: 1 addition & 4 deletions internal/checks/alerts_count.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,10 +49,7 @@ func (c AlertsCheck) Check(ctx context.Context, rule parser.Rule, entries []disc
return
}

end := time.Now()
start := end.Add(-1 * c.lookBack)

qr, err := c.prom.RangeQuery(ctx, rule.AlertingRule.Expr.Value.Value, start, end, c.step)
qr, err := c.prom.RangeQuery(ctx, rule.AlertingRule.Expr.Value.Value, c.lookBack, c.step)
if err != nil {
text, severity := textAndSeverityFromError(err, c.Reporter(), c.prom.Name(), Bug)
problems = append(problems, Problem{
Expand Down
18 changes: 9 additions & 9 deletions internal/checks/promql_series.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ func (c SeriesCheck) Check(ctx context.Context, rule parser.Rule, entries []disc
return
}

rangeStart := time.Now().Add(time.Hour * 24 * -7)
rangeLookback := time.Hour * 24 * 7
rangeStep := time.Minute * 5

done := map[string]bool{}
Expand Down Expand Up @@ -137,7 +137,7 @@ func (c SeriesCheck) Check(ctx context.Context, rule parser.Rule, entries []disc

// 2. If foo was NEVER there -> BUG
log.Debug().Str("check", c.Reporter()).Stringer("selector", &bareSelector).Msg("Checking if base metric has historical series")
trs, err := c.serieTimeRanges(ctx, fmt.Sprintf("count(%s)", bareSelector.String()), rangeStart, rangeStep)
trs, err := c.serieTimeRanges(ctx, fmt.Sprintf("count(%s)", bareSelector.String()), rangeLookback, rangeStep)
if err != nil {
problems = append(problems, c.queryProblem(err, bareSelector.String(), expr))
continue
Expand Down Expand Up @@ -199,7 +199,7 @@ func (c SeriesCheck) Check(ctx context.Context, rule parser.Rule, entries []disc
// 3. If foo is ALWAYS/SOMETIMES there BUT {bar OR baz} is NEVER there -> BUG
for _, name := range labelNames {
log.Debug().Str("check", c.Reporter()).Stringer("selector", &selector).Str("label", name).Msg("Checking if base metric has historical series with required label")
trsLabelCount, err := c.serieTimeRanges(ctx, fmt.Sprintf("count(%s) by (%s)", bareSelector.String(), name), rangeStart, rangeStep)
trsLabelCount, err := c.serieTimeRanges(ctx, fmt.Sprintf("count(%s) by (%s)", bareSelector.String(), name), rangeLookback, rangeStep)
if err != nil {
problems = append(problems, c.queryProblem(err, selector.String(), expr))
continue
Expand Down Expand Up @@ -229,7 +229,7 @@ func (c SeriesCheck) Check(ctx context.Context, rule parser.Rule, entries []disc

// 4. If foo was ALWAYS there but it's NO LONGER there -> BUG
if len(trs.ranges) == 1 &&
!trs.oldest().After(rangeStart.Add(rangeStep)) &&
!trs.oldest().After(time.Now().Add(rangeLookback-1).Add(rangeStep)) &&
trs.newest().Before(time.Now().Add(rangeStep*-1)) {
problems = append(problems, Problem{
Fragment: bareSelector.String(),
Expand Down Expand Up @@ -257,7 +257,7 @@ func (c SeriesCheck) Check(ctx context.Context, rule parser.Rule, entries []disc
}
log.Debug().Str("check", c.Reporter()).Stringer("selector", &labelSelector).Stringer("matcher", lm).Msg("Checking if there are historical series matching filter")

trsLabel, err := c.serieTimeRanges(ctx, fmt.Sprintf("count(%s)", labelSelector.String()), rangeStart, rangeStep)
trsLabel, err := c.serieTimeRanges(ctx, fmt.Sprintf("count(%s)", labelSelector.String()), rangeLookback, rangeStep)
if err != nil {
problems = append(problems, c.queryProblem(err, labelSelector.String(), expr))
continue
Expand Down Expand Up @@ -290,7 +290,7 @@ func (c SeriesCheck) Check(ctx context.Context, rule parser.Rule, entries []disc

// 6. If foo is ALWAYS/SOMETIMES there AND {bar OR baz} used to be there ALWAYS BUT it's NO LONGER there -> BUG
if len(trsLabel.ranges) == 1 &&
!trsLabel.oldest().After(rangeStart.Add(rangeStep)) &&
!trsLabel.oldest().After(time.Now().Add(rangeLookback-1).Add(rangeStep)) &&
trsLabel.newest().Before(time.Now().Add(rangeStep*-1)) {
problems = append(problems, Problem{
Fragment: labelSelector.String(),
Expand Down Expand Up @@ -367,16 +367,16 @@ func (c SeriesCheck) instantSeriesCount(ctx context.Context, query string) (int,
return series, qr.URI, nil
}

func (c SeriesCheck) serieTimeRanges(ctx context.Context, query string, from time.Time, step time.Duration) (tr *timeRanges, err error) {
func (c SeriesCheck) serieTimeRanges(ctx context.Context, query string, lookback, step time.Duration) (tr *timeRanges, err error) {
now := time.Now()
qr, err := c.prom.RangeQuery(ctx, query, from, now, step)
qr, err := c.prom.RangeQuery(ctx, query, lookback, step)
if err != nil {
return nil, err
}

tr = &timeRanges{
uri: qr.URI,
from: from,
from: now.Add(lookback * -1),
until: now,
step: step,
}
Expand Down
17 changes: 16 additions & 1 deletion internal/promapi/errors.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package promapi

import (
"encoding/json"
"errors"
"net"
"strings"
Expand All @@ -19,6 +20,12 @@ func IsUnavailableError(err error) bool {
return true
}

type APIError struct {
Status string `json:"status"`
ErrorType v1.ErrorType `json:"errorType"`
Error string `json:"error"`
}

func CanRetryError(err error, delta time.Duration) (time.Duration, bool) {
if errors.Is(err, syscall.ECONNREFUSED) {
return delta, false
Expand All @@ -31,10 +38,18 @@ func CanRetryError(err error, delta time.Duration) (time.Duration, bool) {

var apiErr *v1.Error
if ok := errors.As(err, &apiErr); ok {
// {"status":"error","errorType":"timeout","error":"query timed out in expression evaluation"}
// Comes with 503 and ends up being reported as server_error but Detail contains raw JSON
// which we can parse and fix the error type
var v1e APIError
if err2 := json.Unmarshal([]byte(apiErr.Detail), &v1e); err2 == nil && v1e.ErrorType == v1.ErrTimeout {
apiErr.Type = v1.ErrTimeout
}

switch apiErr.Type {
case v1.ErrBadData:
case v1.ErrTimeout:
return (delta / 2).Round(time.Minute), true
return (delta / 4).Round(time.Minute), true
case v1.ErrCanceled:
case v1.ErrExec:
if strings.Contains(apiErr.Msg, "query processing would load too many samples into memory in ") {
Expand Down
4 changes: 2 additions & 2 deletions internal/promapi/failover.go
Original file line number Diff line number Diff line change
Expand Up @@ -81,11 +81,11 @@ func (fg *FailoverGroup) Query(ctx context.Context, expr string) (qr *QueryResul
return nil, &FailoverGroupError{err: err, uri: uri, isStrict: fg.strictErrors}
}

func (fg *FailoverGroup) RangeQuery(ctx context.Context, expr string, start, end time.Time, step time.Duration) (rqr *RangeQueryResult, err error) {
func (fg *FailoverGroup) RangeQuery(ctx context.Context, expr string, lookback, step time.Duration) (rqr *RangeQueryResult, err error) {
var uri string
for _, prom := range fg.servers {
uri = prom.uri
rqr, err = prom.RangeQuery(ctx, expr, start, end, step)
rqr, err = prom.RangeQuery(ctx, expr, lookback, step)
if err == nil {
return
}
Expand Down
42 changes: 25 additions & 17 deletions internal/promapi/range.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,43 +22,50 @@ type RangeQueryResult struct {
DurationSeconds float64
}

func (p *Prometheus) RangeQuery(ctx context.Context, expr string, start, end time.Time, step time.Duration) (*RangeQueryResult, error) {
start = start.Round(time.Second)
end = end.Round(time.Second)

func (p *Prometheus) RangeQuery(ctx context.Context, expr string, lookback, step time.Duration) (*RangeQueryResult, error) {
log.Debug().
Str("uri", p.uri).
Str("query", expr).
Time("start", start).
Time("end", end).
Str("delta", output.HumanizeDuration(lookback)).
Str("step", output.HumanizeDuration(step)).
Msg("Scheduling prometheus range query")

lockKey := "/api/v1/query/range"
p.lock.lock(lockKey)
defer p.lock.unlock(lockKey)

cacheKey := strings.Join([]string{expr, start.String(), end.String(), step.String()}, "\n")
return p.realRangeQuery(ctx, expr, start, end, step, cacheKey, false)
cacheKey := strings.Join([]string{expr, lookback.String(), step.String()}, "\n")
return p.realRangeQuery(ctx, expr, lookback, step, cacheKey, false)
}

func (p *Prometheus) realRangeQuery(
ctx context.Context,
expr string, start, end time.Time, step time.Duration,
expr string, lookback, step time.Duration,
cacheKey string, isRetry bool,
) (*RangeQueryResult, error) {
if v, ok := p.cache.Get(cacheKey); ok {
log.Debug().Str("key", cacheKey).Str("uri", p.uri).Msg("Range query cache hit")
log.Debug().
Str("uri", p.uri).
Str("query", expr).
Str("delta", output.HumanizeDuration(lookback)).
Str("step", output.HumanizeDuration(step)).
Msg("Cache hit")
prometheusCacheHitsTotal.WithLabelValues(p.name, "/api/v1/query_range").Inc()
r := v.(RangeQueryResult)
return &r, nil
}
log.Debug().Str("key", cacheKey).Str("uri", p.uri).Msg("Range query cache miss")
log.Debug().
Str("uri", p.uri).
Str("query", expr).
Str("delta", output.HumanizeDuration(lookback)).
Str("step", output.HumanizeDuration(step)).
Msg("Cache miss")

prometheusQueriesTotal.WithLabelValues(p.name, "/api/v1/query_range").Inc()
now := time.Now()
r := v1.Range{
Start: start,
End: end,
Start: now.Add(lookback * -1),
End: now,
Step: step,
}

Expand All @@ -82,6 +89,7 @@ func (p *Prometheus) realRangeQuery(
log.Debug().
Str("uri", p.uri).
Str("query", expr).
Str("delta", output.HumanizeDuration(lookback)).
Bool("retry", isRetry).
Msg("Executing range query")
qstart := time.Now()
Expand All @@ -95,7 +103,7 @@ func (p *Prometheus) realRangeQuery(
if err != nil {
log.Error().Err(err).Str("uri", p.uri).Str("query", expr).Msg("Range query failed")
prometheusQueryErrorsTotal.WithLabelValues(p.name, "/api/v1/query_range", errReason(err)).Inc()
if delta, retryOK := CanRetryError(err, end.Sub(start)); retryOK {
if delta, retryOK := CanRetryError(err, lookback); retryOK {
if delta < step*2 {
log.Error().Str("uri", p.uri).Str("query", expr).Msg("No more retries possible")
return nil, errors.New("no more retries possible")
Expand All @@ -105,16 +113,16 @@ func (p *Prometheus) realRangeQuery(
p.slowQueryCache.Remove(expr)
p.slowQueryCache.Add(expr, delta)
p.slowQueryLock.Unlock()
return p.realRangeQuery(ctx, expr, end.Add(delta*-1), end, step, cacheKey, true)
return p.realRangeQuery(ctx, expr, delta, step, cacheKey, true)
}
return nil, err
}

qr := RangeQueryResult{
URI: p.uri,
DurationSeconds: duration.Seconds(),
Start: start,
End: end,
Start: r.Start,
End: r.End,
}

switch result.Type() {
Expand Down
Loading

0 comments on commit 0483c1c

Please sign in to comment.