Skip to content

Commit

Permalink
Use series API for query/series check
Browse files Browse the repository at this point in the history
  • Loading branch information
prymitive committed Feb 8, 2022
1 parent a34cb6e commit 128bd8b
Show file tree
Hide file tree
Showing 5 changed files with 273 additions and 49 deletions.
7 changes: 7 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,12 @@
# Changelog

## [next]

### Changed

- `query/series` check now uses `/api/v1/series` Prometheus API instead of running
instant queries.

## v0.9.0

### Changed
Expand Down
10 changes: 2 additions & 8 deletions internal/checks/query_series.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,8 +59,7 @@ func (c SeriesCheck) Check(ctx context.Context, rule parser.Rule) (problems []Pr
}

func (c SeriesCheck) countSeries(ctx context.Context, expr parser.PromQLExpr, selector promParser.VectorSelector) (problems []Problem) {
q := fmt.Sprintf("count(%s)", selector.String())
qr, err := c.prom.Query(ctx, q)
sets, err := c.prom.Series(ctx, []string{selector.String()})
if err != nil {
problems = append(problems, Problem{
Fragment: selector.String(),
Expand All @@ -72,12 +71,7 @@ func (c SeriesCheck) countSeries(ctx context.Context, expr parser.PromQLExpr, se
return
}

var series int
for _, s := range qr.Series {
series += int(s.Value)
}

if series == 0 {
if len(sets) == 0 {
if len(selector.LabelMatchers) > 1 {
// retry selector with only __name__ label
s := stripLabels(selector)
Expand Down
66 changes: 25 additions & 41 deletions internal/checks/query_series_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package checks_test
import (
"net/http"
"net/http/httptest"
"strings"
"testing"
"time"

Expand All @@ -16,82 +17,65 @@ func TestSeriesCheck(t *testing.T) {
zerolog.SetGlobalLevel(zerolog.FatalLevel)

srv := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
err := r.ParseForm()
if err != nil {
t.Fatal(err)
}
query := r.Form.Get("query")
match := r.URL.Query()[("match[]")]

switch query {
case "count(notfound)", `count(notfound{job="foo"})`, `count(notfound{job!="foo"})`, `count({__name__="notfound",job="bar"})`:
switch strings.Join(match, ", ") {
case "notfound", `notfound{job="foo"}`, `notfound{job!="foo"}`, `{__name__="notfound",job="bar"}`:
w.WriteHeader(200)
w.Header().Set("Content-Type", "application/json")
_, _ = w.Write([]byte(`{
"status":"success",
"data":{
"resultType":"vector",
"result":[]
}
"data": []
}`))
case "count(found_1)", `count({__name__="notfound"})`:
case "found_1", `{__name__="notfound"}`:
w.WriteHeader(200)
w.Header().Set("Content-Type", "application/json")
_, _ = w.Write([]byte(`{
"status":"success",
"data":{
"resultType":"vector",
"result":[{"metric":{},"value":[1614859502.068,"1"]}]
}
"data": [{"__name__":"single", "foo": "bar"}]
}`))
case "count(found_7)":
case "found_7":
w.WriteHeader(200)
w.Header().Set("Content-Type", "application/json")
_, _ = w.Write([]byte(`{
"status":"success",
"data":{
"resultType":"vector",
"result":[{"metric":{},"value":[1614859502.068,"7"]}]
}
"data": [
{"__name__":"seven", "id": "1"},
{"__name__":"seven", "id": "2"},
{"__name__":"seven", "id": "3"},
{"__name__":"seven", "id": "4"},
{"__name__":"seven", "id": "5"},
{"__name__":"seven", "id": "6"},
{"__name__":"seven", "id": "7"}
]
}`))
case `count(node_filesystem_readonly{mountpoint!=""})`:
case `node_filesystem_readonly{mountpoint!=""}`:
w.WriteHeader(200)
w.Header().Set("Content-Type", "application/json")
_, _ = w.Write([]byte(`{
"status":"success",
"data":{
"resultType":"vector",
"result":[{"metric":{},"value":[1614859502.068,"1"]}]
}
"data": [{"__name__":"single", "foo": "bar"}]
}`))
case `count(disk_info{interface_speed!="6.0 Gb/s",type="sat"})`:
case `disk_info{interface_speed!="6.0 Gb/s",type="sat"}`:
w.WriteHeader(200)
w.Header().Set("Content-Type", "application/json")
_, _ = w.Write([]byte(`{
"status":"success",
"data":{
"resultType":"vector",
"result":[{"metric":{},"value":[1614859502.068,"1"]}]
}
"data": [{"__name__":"disk_info"}]
}`))
case `count(found{job="notfound"})`, `count(notfound{job="notfound"})`:
case `found{job="notfound"}`, `notfound{job="notfound"}`:
w.WriteHeader(200)
w.Header().Set("Content-Type", "application/json")
_, _ = w.Write([]byte(`{
"status":"success",
"data":{
"resultType":"vector",
"result":[]
}
"data": []
}`))
case `count(found)`:
case `found`:
w.WriteHeader(200)
w.Header().Set("Content-Type", "application/json")
_, _ = w.Write([]byte(`{
"status":"success",
"data":{
"resultType":"vector",
"result":[{"metric":{},"value":[1614859502.068,"1"]}]
}
"data": [{"__name__":"found"}]
}`))
default:
w.WriteHeader(400)
Expand Down
56 changes: 56 additions & 0 deletions internal/promapi/series.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,56 @@
package promapi

import (
"context"
"strings"
"time"

"github.com/prometheus/common/model"
"github.com/rs/zerolog/log"
)

type SeriesResult struct {
Series model.Vector
}

func (p *Prometheus) Series(ctx context.Context, matches []string) ([]model.LabelSet, error) {
log.Debug().Str("uri", p.uri).Strs("matches", matches).Msg("Scheduling prometheus series query")

lockKey := strings.Join(matches, ",")
p.lock.lock(lockKey)
defer p.lock.unlock((lockKey))

if v, ok := p.cache.Get(lockKey); ok {
log.Debug().Str("key", lockKey).Str("uri", p.uri).Msg("Query cache hit")
r := v.([]model.LabelSet)
return r, nil
}

log.Debug().Str("uri", p.uri).Strs("matches", matches).Msg("Query started")

ctx, cancel := context.WithTimeout(ctx, p.timeout)
defer cancel()

start := time.Now()
result, _, err := p.api.Series(ctx, matches, start.Add(time.Minute*-5), start)
duration := time.Since(start)
log.Debug().
Str("uri", p.uri).
Strs("matches", matches).
Str("duration", HumanizeDuration(duration)).
Msg("Query completed")
if err != nil {
log.Error().Err(err).
Str("uri", p.uri).
Strs("matches", matches).
Msg("Query failed")
return nil, err
}

log.Debug().Str("uri", p.uri).Strs("matches", matches).Int("series", len(result)).Msg("Parsed response")

log.Debug().Str("key", lockKey).Str("uri", p.uri).Msg("Query cache miss")
p.cache.Add(lockKey, result)

return result, nil
}
183 changes: 183 additions & 0 deletions internal/promapi/series_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,183 @@
package promapi_test

import (
"context"
"net/http"
"net/http/httptest"
"regexp"
"strings"
"sync"
"testing"
"time"

"github.com/prometheus/common/model"
"github.com/stretchr/testify/assert"

"github.com/cloudflare/pint/internal/promapi"
)

func TestSeries(t *testing.T) {
done := sync.Map{}
srv := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
match := r.URL.Query()[("match[]")]

switch strings.Join(match, ", ") {
case "empty":
w.WriteHeader(200)
w.Header().Set("Content-Type", "application/json")
_, _ = w.Write([]byte(`{
"status": "success",
"data": []
}`))
case "single":
w.WriteHeader(200)
w.Header().Set("Content-Type", "application/json")
_, _ = w.Write([]byte(`{
"status": "success",
"data": [{"__name__":"single", "foo": "bar"}]
}`))
case "two":
w.WriteHeader(200)
w.Header().Set("Content-Type", "application/json")
_, _ = w.Write([]byte(`{
"status": "success",
"data": [
{"__name__":"two", "foo": "bar"},
{"__name__":"two", "foo": "baz"}
]
}`))
case "single, two":
w.WriteHeader(200)
w.Header().Set("Content-Type", "application/json")
_, _ = w.Write([]byte(`{
"status": "success",
"data": [
{"__name__":"single", "foo": "bar"},
{"__name__":"two", "foo": "bar"},
{"__name__":"two", "foo": "baz"}
]
}`))
case "timeout":
w.WriteHeader(200)
w.Header().Set("Content-Type", "application/json")
time.Sleep(time.Second)
_, _ = w.Write([]byte(`{
"status":"success",
"data":{
"resultType":"vector",
"result":[]
}
}`))
case "once":
if _, wasDone := done.Load(match); wasDone {
w.WriteHeader(500)
_, _ = w.Write([]byte("query already requested\n"))
return
}
w.WriteHeader(200)
w.Header().Set("Content-Type", "application/json")
_, _ = w.Write([]byte(`{
"status": "success",
"data": [
{"__name__":"once", "foo": "bar"},
{"__name__":"once", "foo": "baz"}
]
}`))
done.Store(match, true)
default:
w.WriteHeader(400)
w.Header().Set("Content-Type", "application/json")
_, _ = w.Write([]byte(`{
"status":"error",
"errorType":"bad_data",
"error":"unhandled query"
}`))
}
}))
defer srv.Close()

type testCaseT struct {
matches []string
timeout time.Duration
series []model.LabelSet
err *regexp.Regexp
runs int
}

testCases := []testCaseT{
{
matches: []string{"empty"},
timeout: time.Second,
series: []model.LabelSet{},
runs: 5,
},
{
matches: []string{"single"},
timeout: time.Second,
series: []model.LabelSet{
{model.MetricNameLabel: "single", "foo": "bar"},
},
runs: 5,
},
{
matches: []string{"two"},
timeout: time.Second,
series: []model.LabelSet{
{model.MetricNameLabel: "two", "foo": "bar"},
{model.MetricNameLabel: "two", "foo": "baz"},
},
runs: 5,
},
{
matches: []string{"single", "two"},
timeout: time.Second,
series: []model.LabelSet{
{model.MetricNameLabel: "single", "foo": "bar"},
{model.MetricNameLabel: "two", "foo": "bar"},
{model.MetricNameLabel: "two", "foo": "baz"},
},
runs: 5,
},
{
matches: []string{"timeout"},
timeout: time.Millisecond * 50,
series: []model.LabelSet{},
runs: 5,
err: regexp.MustCompile(`Get "http.+\/api\/v1\/series\?end=.+&match%5B%5D=timeout&start=.+": context deadline exceeded`),
},
{
matches: []string{"error"},
timeout: time.Second,
series: []model.LabelSet{},
runs: 5,
err: regexp.MustCompile("unhandled query"),
},
}

for _, tc := range testCases {
t.Run(strings.Join(tc.matches, ","), func(t *testing.T) {
assert := assert.New(t)

prom := promapi.NewPrometheus("test", srv.URL, tc.timeout)

wg := sync.WaitGroup{}
wg.Add(tc.runs)
for i := 1; i <= tc.runs; i++ {
go func() {
m, err := prom.Series(context.Background(), tc.matches)
if tc.err != nil {
assert.Error(err)
assert.True(tc.err.MatchString(err.Error()))
} else {
assert.NoError(err)
}
if m != nil {
assert.Equal(tc.series, m)
}
wg.Done()
}()
}
wg.Wait()
})
}
}

0 comments on commit 128bd8b

Please sign in to comment.