From 128bd8b52c2571433b530da412308f74abe88a18 Mon Sep 17 00:00:00 2001 From: Lukasz Mierzwa Date: Tue, 8 Feb 2022 12:22:04 +0000 Subject: [PATCH] Use series API for query/series check --- CHANGELOG.md | 7 + internal/checks/query_series.go | 10 +- internal/checks/query_series_test.go | 66 ++++------ internal/promapi/series.go | 56 ++++++++ internal/promapi/series_test.go | 183 +++++++++++++++++++++++++++ 5 files changed, 273 insertions(+), 49 deletions(-) create mode 100644 internal/promapi/series.go create mode 100644 internal/promapi/series_test.go diff --git a/CHANGELOG.md b/CHANGELOG.md index e6e00c7f..d4dc2674 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,5 +1,12 @@ # Changelog +## [next] + +### Changed + +- `query/series` check now uses `/api/v1/series` Prometheus API instead of running + instant queries. + ## v0.9.0 ### Changed diff --git a/internal/checks/query_series.go b/internal/checks/query_series.go index 0499b9ad..310f95cc 100644 --- a/internal/checks/query_series.go +++ b/internal/checks/query_series.go @@ -59,8 +59,7 @@ func (c SeriesCheck) Check(ctx context.Context, rule parser.Rule) (problems []Pr } func (c SeriesCheck) countSeries(ctx context.Context, expr parser.PromQLExpr, selector promParser.VectorSelector) (problems []Problem) { - q := fmt.Sprintf("count(%s)", selector.String()) - qr, err := c.prom.Query(ctx, q) + sets, err := c.prom.Series(ctx, []string{selector.String()}) if err != nil { problems = append(problems, Problem{ Fragment: selector.String(), @@ -72,12 +71,7 @@ func (c SeriesCheck) countSeries(ctx context.Context, expr parser.PromQLExpr, se return } - var series int - for _, s := range qr.Series { - series += int(s.Value) - } - - if series == 0 { + if len(sets) == 0 { if len(selector.LabelMatchers) > 1 { // retry selector with only __name__ label s := stripLabels(selector) diff --git a/internal/checks/query_series_test.go b/internal/checks/query_series_test.go index d497a033..5b8c5955 100644 --- a/internal/checks/query_series_test.go +++ b/internal/checks/query_series_test.go @@ -3,6 +3,7 @@ package checks_test import ( "net/http" "net/http/httptest" + "strings" "testing" "time" @@ -16,82 +17,65 @@ func TestSeriesCheck(t *testing.T) { zerolog.SetGlobalLevel(zerolog.FatalLevel) srv := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { - err := r.ParseForm() - if err != nil { - t.Fatal(err) - } - query := r.Form.Get("query") + match := r.URL.Query()[("match[]")] - switch query { - case "count(notfound)", `count(notfound{job="foo"})`, `count(notfound{job!="foo"})`, `count({__name__="notfound",job="bar"})`: + switch strings.Join(match, ", ") { + case "notfound", `notfound{job="foo"}`, `notfound{job!="foo"}`, `{__name__="notfound",job="bar"}`: w.WriteHeader(200) w.Header().Set("Content-Type", "application/json") _, _ = w.Write([]byte(`{ "status":"success", - "data":{ - "resultType":"vector", - "result":[] - } + "data": [] }`)) - case "count(found_1)", `count({__name__="notfound"})`: + case "found_1", `{__name__="notfound"}`: w.WriteHeader(200) w.Header().Set("Content-Type", "application/json") _, _ = w.Write([]byte(`{ "status":"success", - "data":{ - "resultType":"vector", - "result":[{"metric":{},"value":[1614859502.068,"1"]}] - } + "data": [{"__name__":"single", "foo": "bar"}] }`)) - case "count(found_7)": + case "found_7": w.WriteHeader(200) w.Header().Set("Content-Type", "application/json") _, _ = w.Write([]byte(`{ "status":"success", - "data":{ - "resultType":"vector", - "result":[{"metric":{},"value":[1614859502.068,"7"]}] - } + "data": [ + {"__name__":"seven", "id": "1"}, + {"__name__":"seven", "id": "2"}, + {"__name__":"seven", "id": "3"}, + {"__name__":"seven", "id": "4"}, + {"__name__":"seven", "id": "5"}, + {"__name__":"seven", "id": "6"}, + {"__name__":"seven", "id": "7"} + ] }`)) - case `count(node_filesystem_readonly{mountpoint!=""})`: + case `node_filesystem_readonly{mountpoint!=""}`: w.WriteHeader(200) w.Header().Set("Content-Type", "application/json") _, _ = w.Write([]byte(`{ "status":"success", - "data":{ - "resultType":"vector", - "result":[{"metric":{},"value":[1614859502.068,"1"]}] - } + "data": [{"__name__":"single", "foo": "bar"}] }`)) - case `count(disk_info{interface_speed!="6.0 Gb/s",type="sat"})`: + case `disk_info{interface_speed!="6.0 Gb/s",type="sat"}`: w.WriteHeader(200) w.Header().Set("Content-Type", "application/json") _, _ = w.Write([]byte(`{ "status":"success", - "data":{ - "resultType":"vector", - "result":[{"metric":{},"value":[1614859502.068,"1"]}] - } + "data": [{"__name__":"disk_info"}] }`)) - case `count(found{job="notfound"})`, `count(notfound{job="notfound"})`: + case `found{job="notfound"}`, `notfound{job="notfound"}`: w.WriteHeader(200) w.Header().Set("Content-Type", "application/json") _, _ = w.Write([]byte(`{ "status":"success", - "data":{ - "resultType":"vector", - "result":[] - } + "data": [] }`)) - case `count(found)`: + case `found`: w.WriteHeader(200) w.Header().Set("Content-Type", "application/json") _, _ = w.Write([]byte(`{ "status":"success", - "data":{ - "resultType":"vector", - "result":[{"metric":{},"value":[1614859502.068,"1"]}] - } + "data": [{"__name__":"found"}] }`)) default: w.WriteHeader(400) diff --git a/internal/promapi/series.go b/internal/promapi/series.go new file mode 100644 index 00000000..68ccc52b --- /dev/null +++ b/internal/promapi/series.go @@ -0,0 +1,56 @@ +package promapi + +import ( + "context" + "strings" + "time" + + "github.com/prometheus/common/model" + "github.com/rs/zerolog/log" +) + +type SeriesResult struct { + Series model.Vector +} + +func (p *Prometheus) Series(ctx context.Context, matches []string) ([]model.LabelSet, error) { + log.Debug().Str("uri", p.uri).Strs("matches", matches).Msg("Scheduling prometheus series query") + + lockKey := strings.Join(matches, ",") + p.lock.lock(lockKey) + defer p.lock.unlock((lockKey)) + + if v, ok := p.cache.Get(lockKey); ok { + log.Debug().Str("key", lockKey).Str("uri", p.uri).Msg("Query cache hit") + r := v.([]model.LabelSet) + return r, nil + } + + log.Debug().Str("uri", p.uri).Strs("matches", matches).Msg("Query started") + + ctx, cancel := context.WithTimeout(ctx, p.timeout) + defer cancel() + + start := time.Now() + result, _, err := p.api.Series(ctx, matches, start.Add(time.Minute*-5), start) + duration := time.Since(start) + log.Debug(). + Str("uri", p.uri). + Strs("matches", matches). + Str("duration", HumanizeDuration(duration)). + Msg("Query completed") + if err != nil { + log.Error().Err(err). + Str("uri", p.uri). + Strs("matches", matches). + Msg("Query failed") + return nil, err + } + + log.Debug().Str("uri", p.uri).Strs("matches", matches).Int("series", len(result)).Msg("Parsed response") + + log.Debug().Str("key", lockKey).Str("uri", p.uri).Msg("Query cache miss") + p.cache.Add(lockKey, result) + + return result, nil +} diff --git a/internal/promapi/series_test.go b/internal/promapi/series_test.go new file mode 100644 index 00000000..a5ca97dc --- /dev/null +++ b/internal/promapi/series_test.go @@ -0,0 +1,183 @@ +package promapi_test + +import ( + "context" + "net/http" + "net/http/httptest" + "regexp" + "strings" + "sync" + "testing" + "time" + + "github.com/prometheus/common/model" + "github.com/stretchr/testify/assert" + + "github.com/cloudflare/pint/internal/promapi" +) + +func TestSeries(t *testing.T) { + done := sync.Map{} + srv := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + match := r.URL.Query()[("match[]")] + + switch strings.Join(match, ", ") { + case "empty": + w.WriteHeader(200) + w.Header().Set("Content-Type", "application/json") + _, _ = w.Write([]byte(`{ + "status": "success", + "data": [] + }`)) + case "single": + w.WriteHeader(200) + w.Header().Set("Content-Type", "application/json") + _, _ = w.Write([]byte(`{ + "status": "success", + "data": [{"__name__":"single", "foo": "bar"}] + }`)) + case "two": + w.WriteHeader(200) + w.Header().Set("Content-Type", "application/json") + _, _ = w.Write([]byte(`{ + "status": "success", + "data": [ + {"__name__":"two", "foo": "bar"}, + {"__name__":"two", "foo": "baz"} + ] + }`)) + case "single, two": + w.WriteHeader(200) + w.Header().Set("Content-Type", "application/json") + _, _ = w.Write([]byte(`{ + "status": "success", + "data": [ + {"__name__":"single", "foo": "bar"}, + {"__name__":"two", "foo": "bar"}, + {"__name__":"two", "foo": "baz"} + ] + }`)) + case "timeout": + w.WriteHeader(200) + w.Header().Set("Content-Type", "application/json") + time.Sleep(time.Second) + _, _ = w.Write([]byte(`{ + "status":"success", + "data":{ + "resultType":"vector", + "result":[] + } + }`)) + case "once": + if _, wasDone := done.Load(match); wasDone { + w.WriteHeader(500) + _, _ = w.Write([]byte("query already requested\n")) + return + } + w.WriteHeader(200) + w.Header().Set("Content-Type", "application/json") + _, _ = w.Write([]byte(`{ + "status": "success", + "data": [ + {"__name__":"once", "foo": "bar"}, + {"__name__":"once", "foo": "baz"} + ] + }`)) + done.Store(match, true) + default: + w.WriteHeader(400) + w.Header().Set("Content-Type", "application/json") + _, _ = w.Write([]byte(`{ + "status":"error", + "errorType":"bad_data", + "error":"unhandled query" + }`)) + } + })) + defer srv.Close() + + type testCaseT struct { + matches []string + timeout time.Duration + series []model.LabelSet + err *regexp.Regexp + runs int + } + + testCases := []testCaseT{ + { + matches: []string{"empty"}, + timeout: time.Second, + series: []model.LabelSet{}, + runs: 5, + }, + { + matches: []string{"single"}, + timeout: time.Second, + series: []model.LabelSet{ + {model.MetricNameLabel: "single", "foo": "bar"}, + }, + runs: 5, + }, + { + matches: []string{"two"}, + timeout: time.Second, + series: []model.LabelSet{ + {model.MetricNameLabel: "two", "foo": "bar"}, + {model.MetricNameLabel: "two", "foo": "baz"}, + }, + runs: 5, + }, + { + matches: []string{"single", "two"}, + timeout: time.Second, + series: []model.LabelSet{ + {model.MetricNameLabel: "single", "foo": "bar"}, + {model.MetricNameLabel: "two", "foo": "bar"}, + {model.MetricNameLabel: "two", "foo": "baz"}, + }, + runs: 5, + }, + { + matches: []string{"timeout"}, + timeout: time.Millisecond * 50, + series: []model.LabelSet{}, + runs: 5, + err: regexp.MustCompile(`Get "http.+\/api\/v1\/series\?end=.+&match%5B%5D=timeout&start=.+": context deadline exceeded`), + }, + { + matches: []string{"error"}, + timeout: time.Second, + series: []model.LabelSet{}, + runs: 5, + err: regexp.MustCompile("unhandled query"), + }, + } + + for _, tc := range testCases { + t.Run(strings.Join(tc.matches, ","), func(t *testing.T) { + assert := assert.New(t) + + prom := promapi.NewPrometheus("test", srv.URL, tc.timeout) + + wg := sync.WaitGroup{} + wg.Add(tc.runs) + for i := 1; i <= tc.runs; i++ { + go func() { + m, err := prom.Series(context.Background(), tc.matches) + if tc.err != nil { + assert.Error(err) + assert.True(tc.err.MatchString(err.Error())) + } else { + assert.NoError(err) + } + if m != nil { + assert.Equal(tc.series, m) + } + wg.Done() + }() + } + wg.Wait() + }) + } +}