Skip to content

Commit

Permalink
Added initial PromQL acceptance tests.
Browse files Browse the repository at this point in the history
* improved logging.
* improved debug matching info.

Signed-off-by: Bartlomiej Plotka <bwplotka@gmail.com>
  • Loading branch information
bwplotka committed Dec 18, 2020
1 parent 4d30ab7 commit 81af0bd
Show file tree
Hide file tree
Showing 29 changed files with 3,743 additions and 157 deletions.
2 changes: 1 addition & 1 deletion cmd/thanos/rule.go
Original file line number Diff line number Diff line change
Expand Up @@ -547,7 +547,7 @@ func runRule(

// Start gRPC server.
{
tsdbStore := store.NewTSDBStore(logger, reg, db, component.Rule, lset)
tsdbStore := store.NewTSDBStore(logger, db, component.Rule, lset)

tlsCfg, err := tls.NewServerConfig(log.With(logger, "protocol", "gRPC"), grpcCert, grpcKey, grpcClientCA)
if err != nil {
Expand Down
6 changes: 3 additions & 3 deletions pkg/api/query/v1_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -180,7 +180,7 @@ func TestQueryEndpoints(t *testing.T) {
baseAPI: &baseAPI.BaseAPI{
Now: func() time.Time { return now },
},
queryableCreate: query.NewQueryableCreator(nil, nil, store.NewTSDBStore(nil, nil, db, component.Query, nil), 2, timeout),
queryableCreate: query.NewQueryableCreator(nil, nil, store.NewTSDBStore(nil, db, component.Query, nil), 2, timeout),
queryEngine: func(int64) *promql.Engine {
return qe
},
Expand Down Expand Up @@ -674,7 +674,7 @@ func TestMetadataEndpoints(t *testing.T) {
baseAPI: &baseAPI.BaseAPI{
Now: func() time.Time { return now },
},
queryableCreate: query.NewQueryableCreator(nil, nil, store.NewTSDBStore(nil, nil, db, component.Query, nil), 2, timeout),
queryableCreate: query.NewQueryableCreator(nil, nil, store.NewTSDBStore(nil, db, component.Query, nil), 2, timeout),
queryEngine: func(int64) *promql.Engine {
return qe
},
Expand All @@ -684,7 +684,7 @@ func TestMetadataEndpoints(t *testing.T) {
baseAPI: &baseAPI.BaseAPI{
Now: func() time.Time { return now },
},
queryableCreate: query.NewQueryableCreator(nil, nil, store.NewTSDBStore(nil, nil, db, component.Query, nil), 2, timeout),
queryableCreate: query.NewQueryableCreator(nil, nil, store.NewTSDBStore(nil, db, component.Query, nil), 2, timeout),
queryEngine: func(int64) *promql.Engine {
return qe
},
Expand Down
4 changes: 2 additions & 2 deletions pkg/promclient/promclient.go
Original file line number Diff line number Diff line change
Expand Up @@ -650,12 +650,12 @@ func (c *Client) get2xxResultWithGRPCErrors(ctx context.Context, spanName string

// SeriesInGRPC returns the labels from Prometheus series API. It uses gRPC errors.
// NOTE: This method is tested in pkg/store/prometheus_test.go against Prometheus.
func (c *Client) SeriesInGRPC(ctx context.Context, base *url.URL, matchers []storepb.LabelMatcher, startTime, endTime int64) ([]map[string]string, error) {
func (c *Client) SeriesInGRPC(ctx context.Context, base *url.URL, matchers []*labels.Matcher, startTime, endTime int64) ([]map[string]string, error) {
u := *base
u.Path = path.Join(u.Path, "/api/v1/series")
q := u.Query()

q.Add("match[]", storepb.MatchersToString(matchers...))
q.Add("match[]", storepb.PromMatchersToString(matchers...))
q.Add("start", formatTime(timestamp.Time(startTime)))
q.Add("end", formatTime(timestamp.Time(endTime)))
u.RawQuery = q.Encode()
Expand Down
3 changes: 2 additions & 1 deletion pkg/query/querier.go
Original file line number Diff line number Diff line change
Expand Up @@ -255,7 +255,7 @@ func (q *querier) Select(_ bool, hints *storage.SelectHints, ms ...*labels.Match
}

func (q *querier) selectFn(ctx context.Context, hints *storage.SelectHints, ms ...*labels.Matcher) (storage.SeriesSet, error) {
sms, err := storepb.TranslatePromMatchers(ms...)
sms, err := storepb.PromMatchersToMatchers(ms...)
if err != nil {
return nil, errors.Wrap(err, "convert matchers")
}
Expand All @@ -265,6 +265,7 @@ func (q *querier) selectFn(ctx context.Context, hints *storage.SelectHints, ms .
// TODO(bwplotka): Pass it using the SeriesRequest instead of relying on context.
ctx = context.WithValue(ctx, store.StoreMatcherKey, q.storeDebugMatchers)

// TODO(bwplotka): Use inprocess gRPC.
resp := &seriesServer{ctx: ctx}
if err := q.proxy.Series(&storepb.SeriesRequest{
MinTime: hints.Start,
Expand Down
11 changes: 5 additions & 6 deletions pkg/query/querier_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,6 @@ import (
"github.com/prometheus/prometheus/promql"
"github.com/prometheus/prometheus/storage"
"github.com/prometheus/prometheus/tsdb/chunkenc"

"github.com/thanos-io/thanos/pkg/component"
"github.com/thanos-io/thanos/pkg/store"
"github.com/thanos-io/thanos/pkg/store/labelpb"
Expand All @@ -41,7 +40,7 @@ type sample struct {
}

func TestQueryableCreator_MaxResolution(t *testing.T) {
testProxy := &storeServer{resps: []*storepb.SeriesResponse{}}
testProxy := &testStoreServer{resps: []*storepb.SeriesResponse{}}
queryableCreator := NewQueryableCreator(nil, nil, testProxy, 2, 5*time.Second)

oneHourMillis := int64(1*time.Hour) / int64(time.Millisecond)
Expand All @@ -60,7 +59,7 @@ func TestQueryableCreator_MaxResolution(t *testing.T) {

// Tests E2E how PromQL works with downsampled data.
func TestQuerier_DownsampledData(t *testing.T) {
testProxy := &storeServer{
testProxy := &testStoreServer{
resps: []*storepb.SeriesResponse{
storeSeriesResponse(t, labels.FromStrings("__name__", "a", "zzz", "a", "aaa", "bbb"), []sample{{99, 1}, {199, 5}}), // Downsampled chunk from Store.
storeSeriesResponse(t, labels.FromStrings("__name__", "a", "zzz", "b", "bbbb", "eee"), []sample{{99, 3}, {199, 8}}), // Downsampled chunk from Store.
Expand Down Expand Up @@ -411,7 +410,7 @@ func TestQuerier_Select(t *testing.T) {
}{
{
name: "select overlapping data with partial error",
storeAPI: &storeServer{
storeAPI: &testStoreServer{
resps: []*storepb.SeriesResponse{
storeSeriesResponse(t, labels.FromStrings("a", "a"), []sample{{0, 0}, {2, 1}, {3, 2}}),
storepb.NewWarnSeriesResponse(errors.New("partial error")),
Expand Down Expand Up @@ -1468,14 +1467,14 @@ func BenchmarkDedupSeriesIterator(b *testing.B) {
})
}

type storeServer struct {
type testStoreServer struct {
// This field just exist to pseudo-implement the unused methods of the interface.
storepb.StoreServer

resps []*storepb.SeriesResponse
}

func (s *storeServer) Series(_ *storepb.SeriesRequest, srv storepb.Store_SeriesServer) error {
func (s *testStoreServer) Series(_ *storepb.SeriesRequest, srv storepb.Store_SeriesServer) error {
for _, resp := range s.resps {
err := srv.Send(resp)
if err != nil {
Expand Down
133 changes: 133 additions & 0 deletions pkg/query/query_reducer_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,133 @@
package query

/**
// TODO(bwplotka): Move to separate package?
func TestQuerierWithMapReducing(t *testing.T) {
logger := log.NewLogfmtLogger(os.Stderr)
timeout := 100 * time.Second
// Readable example of extemely shared store APIs and their responses.
// NOTE: Series are sorted within label sets. Within series samples are sorted by time. This is all by StoreAPI contract.
// Non overlapping samples.
storeResponsesShardedBySomeSeriesByTime := [][][]*storepb.SeriesResponse{
// Vertical shards (series).
{
// Horizontal shards (time).
{
storeSeriesResponse(t, labels.FromStrings("_type", "counter", "a", "a"), []sample{{0, 0}, {2, 3}, {3, 51}}),
storeSeriesResponse(t, labels.FromStrings("_type", "counter", "a", "a"), []sample{{5, 65}, {6, 24}, {7, 321}}), // Reset.
storeSeriesResponse(t, labels.FromStrings("_type", "counter", "a", "b"), []sample{{2, 4}, {3, 24}, {4, 33}}, []sample{{5, 77}, {6, 94}, {7, 360}}),
storeSeriesResponse(t, labels.FromStrings("_type", "counter", "a", "c"), []sample{{100, 1}, {300, 3}, {400, 54}}),
storeSeriesResponse(t, labels.FromStrings("_type", "gauge", "a", "a"), []sample{{0, -24}, {2, 53}, {3, -2}}),
storeSeriesResponse(t, labels.FromStrings("_type", "gauge", "a", "a"), []sample{{5, 10}, {6, 23}, {7, -30}}),
storeSeriesResponse(t, labels.FromStrings("_type", "gauge", "a", "b"), []sample{{2, 24}, {3, -12}, {4, 4}}, []sample{{5, 12}, {6, 45}, {7, -24}}),
storeSeriesResponse(t, labels.FromStrings("_type", "gauge", "a", "c"), []sample{{100, 1}, {300, -3}, {400, 43}}),
},
{
storeSeriesResponse(t, labels.FromStrings("a", "a"), []sample{{0, 0}, {2, 1}, {3, 2}}),
storeSeriesResponse(t, labels.FromStrings("a", "a"), []sample{{5, 5}, {6, 6}, {7, 7}}),
storeSeriesResponse(t, labels.FromStrings("a", "a"), []sample{{5, 5}, {6, 66}}), // Overlap samples for some reason.
storeSeriesResponse(t, labels.FromStrings("a", "b"), []sample{{2, 2}, {3, 3}, {4, 4}}, []sample{{1, 1}, {2, 2}, {3, 3}}),
storeSeriesResponse(t, labels.FromStrings("a", "c"), []sample{{100, 1}, {300, 3}, {400, 4}}),
},
},
}
t.Run("group by", func(t *testing.T) {
for _, c := range []struct {
query string
storeAPI storepb.StoreServer
mint, maxt int64
matchers []*labels.Matcher
replicaLabels []string
hints *storage.SelectHints
equivalentQuery string
expected []series
expectedAfterDedup series
expectedWarning string
}{
{
name: "select overlapping data with partial error",
storeAPI: &testStoreServer{
resps: []*storepb.SeriesResponse{
storeSeriesResponse(t, labels.FromStrings("a", "a"), []sample{{0, 0}, {2, 1}, {3, 2}}),
storepb.NewWarnSeriesResponse(errors.New("partial error")),
storeSeriesResponse(t, labels.FromStrings("a", "a"), []sample{{5, 5}, {6, 6}, {7, 7}}),
storeSeriesResponse(t, labels.FromStrings("a", "a"), []sample{{5, 5}, {6, 66}}), // Overlap samples for some reason.
storeSeriesResponse(t, labels.FromStrings("a", "b"), []sample{{2, 2}, {3, 3}, {4, 4}}, []sample{{1, 1}, {2, 2}, {3, 3}}),
storeSeriesResponse(t, labels.FromStrings("a", "c"), []sample{{100, 1}, {300, 3}, {400, 4}}),
},
},
mint: 1, maxt: 300,
replicaLabels: []string{"a"},
equivalentQuery: `{a=~"a|b|c"}`,
expected: []series{
{
lset: labels.FromStrings("a", "a"),
samples: []sample{{2, 1}, {3, 2}, {5, 5}, {6, 6}, {7, 7}},
},
{
lset: labels.FromStrings("a", "b"),
samples: []sample{{1, 1}, {2, 2}, {3, 3}, {4, 4}},
},
{
lset: labels.FromStrings("a", "c"),
samples: []sample{{100, 1}, {300, 3}},
},
},
expectedAfterDedup: series{
lset: labels.Labels{},
// We don't expect correctness here, it's just random non-replica data.
samples: []sample{{1, 1}, {2, 2}, {3, 3}, {4, 4}},
},
expectedWarning: "partial error",
},
g := gate.New(2)
q := newQuerier(context.Background(), logger, realSeriesWithStaleMarkerMint, realSeriesWithStaleMarkerMaxt, []string{"replica"}, nil, s, false, 0, true, false, g, timeout)
t.Cleanup(func() {
testutil.Ok(t, q.Close())
})
e := promql.NewEngine(promql.EngineOpts{
Logger: logger,
Timeout: timeout,
MaxSamples: math.MaxInt64,
})
t.Run("range", func(t *testing.T) {
q, err := e.NewRangeQuery(&mockedQueryable{querier: q}, `rate(gitlab_transaction_cache_read_hit_count_total[5m])`, timestamp.Time(realSeriesWithStaleMarkerMint).Add(5*time.Minute), timestamp.Time(realSeriesWithStaleMarkerMaxt), 100*time.Second)
testutil.Ok(t, err)
r := q.Exec(context.Background())
testutil.Ok(t, r.Err)
testutil.Assert(t, len(r.Warnings) == 0)
vec, err := r.Matrix()
testutil.Ok(t, err)
testutil.Equals(t, promql.Matrix{
{Metric: labels.FromStrings(), Points: []promql.Point{
{T: 1587690300000, V: 13.652631578947368}, {T: 1587690400000, V: 14.049122807017545}, {T: 1587690500000, V: 13.961403508771928}, {T: 1587690600000, V: 13.617543859649123}, {T: 1587690700000, V: 14.568421052631578}, {T: 1587690800000, V: 14.989473684210525},
{T: 1587690900000, V: 16.2}, {T: 1587691000000, V: 16.052631578947366}, {T: 1587691100000, V: 15.831578947368419}, {T: 1587691200000, V: 15.659649122807016}, {T: 1587691300000, V: 14.842105263157894}, {T: 1587691400000, V: 14.003508771929825},
{T: 1587691500000, V: 13.782456140350877}, {T: 1587691600000, V: 13.863157894736842}, {T: 1587691700000, V: 15.270282598474374}, {T: 1587691800000, V: 14.343859649122805}, {T: 1587691900000, V: 13.975438596491228}, {T: 1587692000000, V: 13.4},
{T: 1587692100000, V: 14.087719298245615}, {T: 1587692200000, V: 14.39298245614035}, {T: 1587692300000, V: 15.024561403508772}, {T: 1587692400000, V: 14.073684210526313}, {T: 1587692500000, V: 9.3772165751634}, {T: 1587692600000, V: 6.378947368421052},
{T: 1587692700000, V: 8.19298245614035}, {T: 1587692800000, V: 11.918703026416258}, {T: 1587692900000, V: 13.75813610765101}, {T: 1587693000000, V: 13.087719298245615}, {T: 1587693100000, V: 13.466666666666667}, {T: 1587693200000, V: 14.028070175438595},
{T: 1587693300000, V: 14.23859649122807}, {T: 1587693400000, V: 15.407017543859647}, {T: 1587693500000, V: 15.915789473684208}, {T: 1587693600000, V: 15.712280701754386},
}},
{Metric: labels.FromStrings(), Points: []promql.Point{
{T: 1587690300000, V: 13.69122807017544}, {T: 1587690400000, V: 14.098245614035086}, {T: 1587690500000, V: 13.905263157894735}, {T: 1587690600000, V: 13.617543859649123}, {T: 1587690700000, V: 14.350877192982455}, {T: 1587690800000, V: 15.003508771929825},
{T: 1587690900000, V: 16.12280701754386}, {T: 1587691000000, V: 16.049122807017543}, {T: 1587691100000, V: 15.922807017543859}, {T: 1587691200000, V: 15.63157894736842}, {T: 1587691300000, V: 14.982456140350878}, {T: 1587691400000, V: 14.187259188557551},
{T: 1587691500000, V: 13.828070175438594}, {T: 1587691600000, V: 13.971929824561403}, {T: 1587691700000, V: 15.31994329585807}, {T: 1587691800000, V: 14.30877192982456}, {T: 1587691900000, V: 13.915789473684212}, {T: 1587692000000, V: 13.312280701754384},
{T: 1587692100000, V: 14.136842105263158}, {T: 1587692200000, V: 14.39298245614035}, {T: 1587692300000, V: 15.014035087719297}, {T: 1587692400000, V: 14.112280701754386}, {T: 1587692500000, V: 9.421065148148147}, {T: 1587692600000, V: 6.421368067203301},
{T: 1587692700000, V: 8.252631578947367}, {T: 1587692800000, V: 11.721237543747266}, {T: 1587692900000, V: 13.842105263157894}, {T: 1587693000000, V: 13.153509064307995}, {T: 1587693100000, V: 13.378947368421052}, {T: 1587693200000, V: 14.03157894736842},
{T: 1587693300000, V: 14.147368421052631}, {T: 1587693400000, V: 15.343159785693985}, {T: 1587693500000, V: 15.90877192982456}, {T: 1587693600000, V: 15.761403508771927},
}},
}, vec)
})
})
}
*/
121 changes: 121 additions & 0 deletions pkg/query/query_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,11 +4,132 @@
package query

import (
"context"
"fmt"
"os"
"path/filepath"
"testing"
"time"

"github.com/go-kit/kit/log"
"github.com/prometheus/prometheus/pkg/labels"
"github.com/prometheus/prometheus/storage"
"github.com/thanos-io/thanos/pkg/component"
"github.com/thanos-io/thanos/pkg/store"
"github.com/thanos-io/thanos/pkg/store/storepb"
"github.com/thanos-io/thanos/pkg/testutil"
)

func TestMain(m *testing.M) {
testutil.TolerantVerifyLeakMain(m)
}

type inProcessClient struct {
t testing.TB

name string

storepb.StoreClient
extLset labels.Labels
}

func (i inProcessClient) LabelSets() []labels.Labels {
return []labels.Labels{i.extLset}
}

func (i inProcessClient) TimeRange() (mint int64, maxt int64) {
r, err := i.Info(context.TODO(), &storepb.InfoRequest{})
testutil.Ok(i.t, err)
return r.MinTime, r.MaxTime
}

func (i inProcessClient) String() string { return i.name }
func (i inProcessClient) Addr() string { return i.name }

func TestQuerier_Proxy(t *testing.T) {
files, err := filepath.Glob("testdata/promql/**/*.test")
testutil.Ok(t, err)
testutil.Equals(t, 10, len(files), "%v", files)

logger := log.NewLogfmtLogger(os.Stderr)
t.Run("proxy", func(t *testing.T) {
var clients []store.Client
q := NewQueryableCreator(
logger,
nil,
store.NewProxyStore(logger, nil, func() []store.Client { return clients },
component.Debug, nil, 5*time.Minute),
1000000,
5*time.Minute,
)

createQueryableFn := func(stores []*testStore) storage.Queryable {
clients = clients[:0]
for i, st := range stores {
m, err := storepb.PromMatchersToMatchers(st.matchers...)
testutil.Ok(t, err)

clients = append(clients, inProcessClient{
t: t,
StoreClient: storepb.ServerAsClient(SelectedStore(store.NewTSDBStore(logger, st.storage.DB, component.Debug, st.extLset), m, st.mint, st.maxt), 0),
name: fmt.Sprintf("store number %v", i),
extLset: st.extLset,
})
}
return q(true, nil, nil, 0, false, false)
}

for _, fn := range files {
t.Run(fn, func(t *testing.T) {
te, err := newTestFromFile(t, fn)
testutil.Ok(t, err)
testutil.Ok(t, te.run(createQueryableFn))
te.close()
})
}
})
}

// SelectStore allows wrapping another storeAPI with additional time and matcher selection.
type SelectStore struct {
matchers []storepb.LabelMatcher

storepb.StoreServer
mint, maxt int64
}

// SelectedStore wraps given store with SelectStore.
func SelectedStore(wrapped storepb.StoreServer, matchers []storepb.LabelMatcher, mint, maxt int64) *SelectStore {
return &SelectStore{
StoreServer: wrapped,
matchers: matchers,
mint: mint,
maxt: maxt,
}
}

func (s *SelectStore) Info(ctx context.Context, r *storepb.InfoRequest) (*storepb.InfoResponse, error) {
resp, err := s.StoreServer.Info(ctx, r)
if err != nil {
return nil, err
}
if resp.MinTime < s.mint {
resp.MinTime = s.mint
}
if resp.MaxTime > s.maxt {
resp.MaxTime = s.maxt
}
// TODO(bwplotka): Match labelsets and expose only those?
return resp, nil
}

func (s *SelectStore) Series(r *storepb.SeriesRequest, srv storepb.Store_SeriesServer) error {
if r.MinTime < s.mint {
r.MinTime = s.mint
}
if r.MaxTime > s.maxt {
r.MaxTime = s.maxt
}
r.Matchers = append(r.Matchers, s.matchers...)
return s.StoreServer.Series(r, srv)
}
Loading

0 comments on commit 81af0bd

Please sign in to comment.