Skip to content

Commit

Permalink
Refactor prometheus query loop
Browse files Browse the repository at this point in the history
  • Loading branch information
prymitive committed Oct 21, 2022
1 parent c01ca0b commit a7ae5e1
Showing 1 changed file with 51 additions and 47 deletions.
98 changes: 51 additions & 47 deletions internal/promapi/prometheus.go
Original file line number Diff line number Diff line change
Expand Up @@ -146,59 +146,63 @@ func (prom *Prometheus) doRequest(ctx context.Context, method, path string, args

func queryWorker(prom *Prometheus, queries chan queryRequest) {
for job := range queries {
job := job

cacheKey := job.query.CacheKey()
if cacheKey != "" {
if cached, ok := prom.cache.Get(cacheKey); ok {
job.result <- cached.(queryResult)
prometheusCacheHitsTotal.WithLabelValues(prom.name, job.query.Endpoint()).Inc()
log.Debug().
Str("uri", prom.uri).
Str("query", job.query.String()).
Str("key", cacheKey).
Msg("Cache hit")
continue
}
}
prometheusCacheMissTotal.WithLabelValues(prom.name, job.query.Endpoint()).Inc()
log.Debug().
Str("uri", prom.uri).
Str("query", job.query.String()).
Str("key", cacheKey).
Msg("Cache miss")

prometheusQueriesTotal.WithLabelValues(prom.name, job.query.Endpoint()).Inc()
prometheusQueriesRunning.WithLabelValues(prom.name, job.query.Endpoint()).Inc()
prom.rateLimiter.Take()
start := time.Now()
result := job.query.Run()
dur := time.Since(start)
log.Debug().
Str("uri", prom.uri).
Str("query", job.query.String()).
Str("endpoint", job.query.Endpoint()).
Str("duration", output.HumanizeDuration(dur)).
Msg("Query completed")
prometheusQueriesRunning.WithLabelValues(prom.name, job.query.Endpoint()).Dec()
if result.err != nil {
prometheusQueryErrorsTotal.WithLabelValues(prom.name, job.query.Endpoint(), errReason(result.err)).Inc()
log.Error().
Err(result.err).
job.result <- processJob(prom, job)
}
}

func processJob(prom *Prometheus, job queryRequest) queryResult {
cacheKey := job.query.CacheKey()
if cacheKey != "" {
if cached, ok := prom.cache.Get(cacheKey); ok {
prometheusCacheHitsTotal.WithLabelValues(prom.name, job.query.Endpoint()).Inc()
log.Debug().
Str("uri", prom.uri).
Str("query", job.query.String()).
Msg("Query returned an error")
job.result <- result
continue
Str("key", cacheKey).
Msg("Cache hit")
return cached.(queryResult)
}
}

if cacheKey != "" {
prom.cache.Add(cacheKey, result)
}
prometheusCacheSize.WithLabelValues(prom.name).Set(float64(prom.cache.Len()))
prometheusCacheMissTotal.WithLabelValues(prom.name, job.query.Endpoint()).Inc()
log.Debug().
Str("uri", prom.uri).
Str("query", job.query.String()).
Str("key", cacheKey).
Msg("Cache miss")

prometheusQueriesTotal.WithLabelValues(prom.name, job.query.Endpoint()).Inc()
prometheusQueriesRunning.WithLabelValues(prom.name, job.query.Endpoint()).Inc()

job.result <- result
prom.rateLimiter.Take()
start := time.Now()
result := job.query.Run()
dur := time.Since(start)
log.Debug().
Str("uri", prom.uri).
Str("query", job.query.String()).
Str("endpoint", job.query.Endpoint()).
Str("duration", output.HumanizeDuration(dur)).
Msg("Query completed")
prometheusQueriesRunning.WithLabelValues(prom.name, job.query.Endpoint()).Dec()

if result.err != nil {
prometheusQueryErrorsTotal.WithLabelValues(prom.name, job.query.Endpoint(), errReason(result.err)).Inc()
log.Error().
Err(result.err).
Str("uri", prom.uri).
Str("query", job.query.String()).
Msg("Query returned an error")
return result
}

if cacheKey != "" {
prom.cache.Add(cacheKey, result)
}

prometheusCacheSize.WithLabelValues(prom.name).Set(float64(prom.cache.Len()))

return result
}

func formatTime(t time.Time) string {
Expand Down

0 comments on commit a7ae5e1

Please sign in to comment.