Skip to content

Commit

Permalink
Use series API for query/series check
Browse files Browse the repository at this point in the history
  • Loading branch information
prymitive committed Feb 8, 2022
1 parent a34cb6e commit 94c4362
Show file tree
Hide file tree
Showing 6 changed files with 274 additions and 50 deletions.
7 changes: 7 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,12 @@
# Changelog

## [next]

### Changed

- `query/series` check now uses `/api/v1/series` Prometheus API instead of running
instant queries.

## v0.9.0

### Changed
Expand Down
2 changes: 1 addition & 1 deletion cmd/pint/tests/0043_watch_cancel.txt
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ pint.ok --no-color watch --interval=1h --listen=:6043 --pidfile=pint.pid rules
! stdout .
stderr 'level=info msg="Shutting down"'
stderr 'level=error msg="Failed to query Prometheus configuration" error="Get \\"http://127.0.0.1:7043/api/v1/status/config\\": context canceled" uri=http://127.0.0.1:7043'
stderr 'level=error msg="Query failed" error="Post \\"http://127.0.0.1:7043/api/v1/query\\": context canceled" query=count\(foo\) uri=http://127.0.0.1:7043'
stderr 'level=error msg="Query failed" error="Get \\"http://127.0.0.1:7043\/api\/v1\/series\?end=.+&match%5B%5D=foo&start=.+\\": context canceled" matches=\["foo"\] uri=http://127.0.0.1:7043'
stderr 'level=info msg="Waiting for all background tasks to finish"'
stderr 'level=info msg="Background worker finished"'

Expand Down
10 changes: 2 additions & 8 deletions internal/checks/query_series.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,8 +59,7 @@ func (c SeriesCheck) Check(ctx context.Context, rule parser.Rule) (problems []Pr
}

func (c SeriesCheck) countSeries(ctx context.Context, expr parser.PromQLExpr, selector promParser.VectorSelector) (problems []Problem) {
q := fmt.Sprintf("count(%s)", selector.String())
qr, err := c.prom.Query(ctx, q)
sets, err := c.prom.Series(ctx, []string{selector.String()})
if err != nil {
problems = append(problems, Problem{
Fragment: selector.String(),
Expand All @@ -72,12 +71,7 @@ func (c SeriesCheck) countSeries(ctx context.Context, expr parser.PromQLExpr, se
return
}

var series int
for _, s := range qr.Series {
series += int(s.Value)
}

if series == 0 {
if len(sets) == 0 {
if len(selector.LabelMatchers) > 1 {
// retry selector with only __name__ label
s := stripLabels(selector)
Expand Down
66 changes: 25 additions & 41 deletions internal/checks/query_series_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package checks_test
import (
"net/http"
"net/http/httptest"
"strings"
"testing"
"time"

Expand All @@ -16,82 +17,65 @@ func TestSeriesCheck(t *testing.T) {
zerolog.SetGlobalLevel(zerolog.FatalLevel)

srv := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
err := r.ParseForm()
if err != nil {
t.Fatal(err)
}
query := r.Form.Get("query")
match := r.URL.Query()[("match[]")]

switch query {
case "count(notfound)", `count(notfound{job="foo"})`, `count(notfound{job!="foo"})`, `count({__name__="notfound",job="bar"})`:
switch strings.Join(match, ", ") {
case "notfound", `notfound{job="foo"}`, `notfound{job!="foo"}`, `{__name__="notfound",job="bar"}`:
w.WriteHeader(200)
w.Header().Set("Content-Type", "application/json")
_, _ = w.Write([]byte(`{
"status":"success",
"data":{
"resultType":"vector",
"result":[]
}
"data": []
}`))
case "count(found_1)", `count({__name__="notfound"})`:
case "found_1", `{__name__="notfound"}`:
w.WriteHeader(200)
w.Header().Set("Content-Type", "application/json")
_, _ = w.Write([]byte(`{
"status":"success",
"data":{
"resultType":"vector",
"result":[{"metric":{},"value":[1614859502.068,"1"]}]
}
"data": [{"__name__":"single", "foo": "bar"}]
}`))
case "count(found_7)":
case "found_7":
w.WriteHeader(200)
w.Header().Set("Content-Type", "application/json")
_, _ = w.Write([]byte(`{
"status":"success",
"data":{
"resultType":"vector",
"result":[{"metric":{},"value":[1614859502.068,"7"]}]
}
"data": [
{"__name__":"seven", "id": "1"},
{"__name__":"seven", "id": "2"},
{"__name__":"seven", "id": "3"},
{"__name__":"seven", "id": "4"},
{"__name__":"seven", "id": "5"},
{"__name__":"seven", "id": "6"},
{"__name__":"seven", "id": "7"}
]
}`))
case `count(node_filesystem_readonly{mountpoint!=""})`:
case `node_filesystem_readonly{mountpoint!=""}`:
w.WriteHeader(200)
w.Header().Set("Content-Type", "application/json")
_, _ = w.Write([]byte(`{
"status":"success",
"data":{
"resultType":"vector",
"result":[{"metric":{},"value":[1614859502.068,"1"]}]
}
"data": [{"__name__":"single", "foo": "bar"}]
}`))
case `count(disk_info{interface_speed!="6.0 Gb/s",type="sat"})`:
case `disk_info{interface_speed!="6.0 Gb/s",type="sat"}`:
w.WriteHeader(200)
w.Header().Set("Content-Type", "application/json")
_, _ = w.Write([]byte(`{
"status":"success",
"data":{
"resultType":"vector",
"result":[{"metric":{},"value":[1614859502.068,"1"]}]
}
"data": [{"__name__":"disk_info"}]
}`))
case `count(found{job="notfound"})`, `count(notfound{job="notfound"})`:
case `found{job="notfound"}`, `notfound{job="notfound"}`:
w.WriteHeader(200)
w.Header().Set("Content-Type", "application/json")
_, _ = w.Write([]byte(`{
"status":"success",
"data":{
"resultType":"vector",
"result":[]
}
"data": []
}`))
case `count(found)`:
case `found`:
w.WriteHeader(200)
w.Header().Set("Content-Type", "application/json")
_, _ = w.Write([]byte(`{
"status":"success",
"data":{
"resultType":"vector",
"result":[{"metric":{},"value":[1614859502.068,"1"]}]
}
"data": [{"__name__":"found"}]
}`))
default:
w.WriteHeader(400)
Expand Down
56 changes: 56 additions & 0 deletions internal/promapi/series.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,56 @@
package promapi

import (
"context"
"strings"
"time"

"github.com/prometheus/common/model"
"github.com/rs/zerolog/log"
)

type SeriesResult struct {
Series model.Vector
}

func (p *Prometheus) Series(ctx context.Context, matches []string) ([]model.LabelSet, error) {
log.Debug().Str("uri", p.uri).Strs("matches", matches).Msg("Scheduling prometheus series query")

lockKey := strings.Join(matches, ",")
p.lock.lock(lockKey)
defer p.lock.unlock((lockKey))

if v, ok := p.cache.Get(lockKey); ok {
log.Debug().Str("key", lockKey).Str("uri", p.uri).Msg("Query cache hit")
r := v.([]model.LabelSet)
return r, nil
}

log.Debug().Str("uri", p.uri).Strs("matches", matches).Msg("Query started")

ctx, cancel := context.WithTimeout(ctx, p.timeout)
defer cancel()

start := time.Now()
result, _, err := p.api.Series(ctx, matches, start.Add(time.Minute*-5), start)
duration := time.Since(start)
log.Debug().
Str("uri", p.uri).
Strs("matches", matches).
Str("duration", HumanizeDuration(duration)).
Msg("Query completed")
if err != nil {
log.Error().Err(err).
Str("uri", p.uri).
Strs("matches", matches).
Msg("Query failed")
return nil, err
}

log.Debug().Str("uri", p.uri).Strs("matches", matches).Int("series", len(result)).Msg("Parsed response")

log.Debug().Str("key", lockKey).Str("uri", p.uri).Msg("Query cache miss")
p.cache.Add(lockKey, result)

return result, nil
}
Loading

0 comments on commit 94c4362

Please sign in to comment.