Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

query: Added PromQL tests framework supporing multiple stores. First step towars Query pushdown! #3631

Merged
merged 2 commits into from
Dec 19, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion cmd/thanos/rule.go
Original file line number Diff line number Diff line change
Expand Up @@ -548,7 +548,7 @@ func runRule(

// Start gRPC server.
{
tsdbStore := store.NewTSDBStore(logger, reg, db, component.Rule, lset)
tsdbStore := store.NewTSDBStore(logger, db, component.Rule, lset)

tlsCfg, err := tls.NewServerConfig(log.With(logger, "protocol", "gRPC"), grpcCert, grpcKey, grpcClientCA)
if err != nil {
Expand Down
6 changes: 3 additions & 3 deletions pkg/api/query/v1_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -180,7 +180,7 @@ func TestQueryEndpoints(t *testing.T) {
baseAPI: &baseAPI.BaseAPI{
Now: func() time.Time { return now },
},
queryableCreate: query.NewQueryableCreator(nil, nil, store.NewTSDBStore(nil, nil, db, component.Query, nil), 2, timeout),
queryableCreate: query.NewQueryableCreator(nil, nil, store.NewTSDBStore(nil, db, component.Query, nil), 2, timeout),
queryEngine: func(int64) *promql.Engine {
return qe
},
Expand Down Expand Up @@ -674,7 +674,7 @@ func TestMetadataEndpoints(t *testing.T) {
baseAPI: &baseAPI.BaseAPI{
Now: func() time.Time { return now },
},
queryableCreate: query.NewQueryableCreator(nil, nil, store.NewTSDBStore(nil, nil, db, component.Query, nil), 2, timeout),
queryableCreate: query.NewQueryableCreator(nil, nil, store.NewTSDBStore(nil, db, component.Query, nil), 2, timeout),
queryEngine: func(int64) *promql.Engine {
return qe
},
Expand All @@ -684,7 +684,7 @@ func TestMetadataEndpoints(t *testing.T) {
baseAPI: &baseAPI.BaseAPI{
Now: func() time.Time { return now },
},
queryableCreate: query.NewQueryableCreator(nil, nil, store.NewTSDBStore(nil, nil, db, component.Query, nil), 2, timeout),
queryableCreate: query.NewQueryableCreator(nil, nil, store.NewTSDBStore(nil, db, component.Query, nil), 2, timeout),
queryEngine: func(int64) *promql.Engine {
return qe
},
Expand Down
3 changes: 2 additions & 1 deletion pkg/compact/downsample/downsample.go
Original file line number Diff line number Diff line change
Expand Up @@ -513,7 +513,7 @@ func downsampleAggrBatch(chks []*AggrChunk, buf *[]sample, resolution int64) (ch
return chk, err
}

// Handle counters by reading them properly.
// Handle counters by applying resets directly.
acs := make([]chunkenc.Iterator, 0, len(chks))
for _, achk := range chks {
c, err := achk.Get(AggrCounter)
Expand Down Expand Up @@ -580,6 +580,7 @@ type sample struct {
// It handles overlapped chunks (removes overlaps).
// NOTE: It is important to deduplicate with care ensuring that you don't hit
// issue https://github.com/thanos-io/thanos/issues/2401#issuecomment-621958839.
// NOTE(bwplotka): This hides resets from PromQL engine. This means it will not work for PromQL resets function.
type ApplyCounterResetsSeriesIterator struct {
chks []chunkenc.Iterator
i int // Current chunk.
Expand Down
4 changes: 2 additions & 2 deletions pkg/promclient/promclient.go
Original file line number Diff line number Diff line change
Expand Up @@ -653,12 +653,12 @@ func (c *Client) get2xxResultWithGRPCErrors(ctx context.Context, spanName string

// SeriesInGRPC returns the labels from Prometheus series API. It uses gRPC errors.
// NOTE: This method is tested in pkg/store/prometheus_test.go against Prometheus.
func (c *Client) SeriesInGRPC(ctx context.Context, base *url.URL, matchers []storepb.LabelMatcher, startTime, endTime int64) ([]map[string]string, error) {
func (c *Client) SeriesInGRPC(ctx context.Context, base *url.URL, matchers []*labels.Matcher, startTime, endTime int64) ([]map[string]string, error) {
u := *base
u.Path = path.Join(u.Path, "/api/v1/series")
q := u.Query()

q.Add("match[]", storepb.MatchersToString(matchers...))
q.Add("match[]", storepb.PromMatchersToString(matchers...))
q.Add("start", formatTime(timestamp.Time(startTime)))
q.Add("end", formatTime(timestamp.Time(endTime)))
u.RawQuery = q.Encode()
Expand Down
1 change: 1 addition & 0 deletions pkg/query/iter.go
Original file line number Diff line number Diff line change
Expand Up @@ -185,6 +185,7 @@ func (s *chunkSeries) Iterator() chunkenc.Iterator {
for _, c := range s.chunks {
its = append(its, getFirstIterator(c.Counter, c.Raw))
}
// TODO(bwplotka): This breaks resets function. See https://github.com/thanos-io/thanos/issues/3644
sit = downsample.NewApplyCounterResetsIterator(its...)
default:
return errSeriesIterator{err: errors.Errorf("unexpected result aggregate type %v", s.aggrs)}
Expand Down
3 changes: 2 additions & 1 deletion pkg/query/querier.go
Original file line number Diff line number Diff line change
Expand Up @@ -255,7 +255,7 @@ func (q *querier) Select(_ bool, hints *storage.SelectHints, ms ...*labels.Match
}

func (q *querier) selectFn(ctx context.Context, hints *storage.SelectHints, ms ...*labels.Matcher) (storage.SeriesSet, error) {
sms, err := storepb.TranslatePromMatchers(ms...)
sms, err := storepb.PromMatchersToMatchers(ms...)
if err != nil {
return nil, errors.Wrap(err, "convert matchers")
}
Expand All @@ -265,6 +265,7 @@ func (q *querier) selectFn(ctx context.Context, hints *storage.SelectHints, ms .
// TODO(bwplotka): Pass it using the SeriesRequest instead of relying on context.
ctx = context.WithValue(ctx, store.StoreMatcherKey, q.storeDebugMatchers)

// TODO(bwplotka): Use inprocess gRPC.
resp := &seriesServer{ctx: ctx}
if err := q.proxy.Series(&storepb.SeriesRequest{
MinTime: hints.Start,
Expand Down
11 changes: 5 additions & 6 deletions pkg/query/querier_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,6 @@ import (
"github.com/prometheus/prometheus/promql"
"github.com/prometheus/prometheus/storage"
"github.com/prometheus/prometheus/tsdb/chunkenc"

"github.com/thanos-io/thanos/pkg/component"
"github.com/thanos-io/thanos/pkg/store"
"github.com/thanos-io/thanos/pkg/store/labelpb"
Expand All @@ -41,7 +40,7 @@ type sample struct {
}

func TestQueryableCreator_MaxResolution(t *testing.T) {
testProxy := &storeServer{resps: []*storepb.SeriesResponse{}}
testProxy := &testStoreServer{resps: []*storepb.SeriesResponse{}}
queryableCreator := NewQueryableCreator(nil, nil, testProxy, 2, 5*time.Second)

oneHourMillis := int64(1*time.Hour) / int64(time.Millisecond)
Expand All @@ -60,7 +59,7 @@ func TestQueryableCreator_MaxResolution(t *testing.T) {

// Tests E2E how PromQL works with downsampled data.
func TestQuerier_DownsampledData(t *testing.T) {
testProxy := &storeServer{
testProxy := &testStoreServer{
resps: []*storepb.SeriesResponse{
storeSeriesResponse(t, labels.FromStrings("__name__", "a", "zzz", "a", "aaa", "bbb"), []sample{{99, 1}, {199, 5}}), // Downsampled chunk from Store.
storeSeriesResponse(t, labels.FromStrings("__name__", "a", "zzz", "b", "bbbb", "eee"), []sample{{99, 3}, {199, 8}}), // Downsampled chunk from Store.
Expand Down Expand Up @@ -411,7 +410,7 @@ func TestQuerier_Select(t *testing.T) {
}{
{
name: "select overlapping data with partial error",
storeAPI: &storeServer{
storeAPI: &testStoreServer{
resps: []*storepb.SeriesResponse{
storeSeriesResponse(t, labels.FromStrings("a", "a"), []sample{{0, 0}, {2, 1}, {3, 2}}),
storepb.NewWarnSeriesResponse(errors.New("partial error")),
Expand Down Expand Up @@ -1468,14 +1467,14 @@ func BenchmarkDedupSeriesIterator(b *testing.B) {
})
}

type storeServer struct {
type testStoreServer struct {
// This field just exist to pseudo-implement the unused methods of the interface.
storepb.StoreServer

resps []*storepb.SeriesResponse
}

func (s *storeServer) Series(_ *storepb.SeriesRequest, srv storepb.Store_SeriesServer) error {
func (s *testStoreServer) Series(_ *storepb.SeriesRequest, srv storepb.Store_SeriesServer) error {
for _, resp := range s.resps {
err := srv.Send(resp)
if err != nil {
Expand Down
121 changes: 121 additions & 0 deletions pkg/query/query_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,11 +4,132 @@
package query

import (
"context"
"fmt"
"os"
"path/filepath"
"testing"
"time"

"github.com/go-kit/kit/log"
"github.com/prometheus/prometheus/pkg/labels"
"github.com/prometheus/prometheus/storage"
"github.com/thanos-io/thanos/pkg/component"
"github.com/thanos-io/thanos/pkg/store"
"github.com/thanos-io/thanos/pkg/store/storepb"
"github.com/thanos-io/thanos/pkg/testutil"
)

func TestMain(m *testing.M) {
testutil.TolerantVerifyLeakMain(m)
}

type inProcessClient struct {
t testing.TB

name string

storepb.StoreClient
extLset labels.Labels
}

func (i inProcessClient) LabelSets() []labels.Labels {
return []labels.Labels{i.extLset}
}

func (i inProcessClient) TimeRange() (mint int64, maxt int64) {
r, err := i.Info(context.TODO(), &storepb.InfoRequest{})
testutil.Ok(i.t, err)
return r.MinTime, r.MaxTime
}

func (i inProcessClient) String() string { return i.name }
func (i inProcessClient) Addr() string { return i.name }

func TestQuerier_Proxy(t *testing.T) {
files, err := filepath.Glob("testdata/promql/**/*.test")
testutil.Ok(t, err)
testutil.Equals(t, 10, len(files), "%v", files)

logger := log.NewLogfmtLogger(os.Stderr)
t.Run("proxy", func(t *testing.T) {
var clients []store.Client
q := NewQueryableCreator(
logger,
nil,
store.NewProxyStore(logger, nil, func() []store.Client { return clients },
component.Debug, nil, 5*time.Minute),
1000000,
5*time.Minute,
)

createQueryableFn := func(stores []*testStore) storage.Queryable {
clients = clients[:0]
for i, st := range stores {
m, err := storepb.PromMatchersToMatchers(st.matchers...)
testutil.Ok(t, err)

// TODO(bwplotka): Parse external labels.
clients = append(clients, inProcessClient{
t: t,
StoreClient: storepb.ServerAsClient(SelectedStore(store.NewTSDBStore(logger, st.storage.DB, component.Debug, nil), m, st.mint, st.maxt), 0),
name: fmt.Sprintf("store number %v", i),
})
}
return q(true, nil, nil, 0, false, false)
}

for _, fn := range files {
t.Run(fn, func(t *testing.T) {
te, err := newTestFromFile(t, fn)
testutil.Ok(t, err)
testutil.Ok(t, te.run(createQueryableFn))
te.close()
})
}
})
}

// SelectStore allows wrapping another storeAPI with additional time and matcher selection.
type SelectStore struct {
matchers []storepb.LabelMatcher

storepb.StoreServer
mint, maxt int64
}

// SelectedStore wraps given store with SelectStore.
func SelectedStore(wrapped storepb.StoreServer, matchers []storepb.LabelMatcher, mint, maxt int64) *SelectStore {
return &SelectStore{
StoreServer: wrapped,
matchers: matchers,
mint: mint,
maxt: maxt,
}
}

func (s *SelectStore) Info(ctx context.Context, r *storepb.InfoRequest) (*storepb.InfoResponse, error) {
resp, err := s.StoreServer.Info(ctx, r)
if err != nil {
return nil, err
}
if resp.MinTime < s.mint {
resp.MinTime = s.mint
}
if resp.MaxTime > s.maxt {
resp.MaxTime = s.maxt
}
// TODO(bwplotka): Match labelsets and expose only those?
return resp, nil
}

func (s *SelectStore) Series(r *storepb.SeriesRequest, srv storepb.Store_SeriesServer) error {
if r.MinTime < s.mint {
r.MinTime = s.mint
}
if r.MaxTime > s.maxt {
r.MaxTime = s.maxt
}
r.Matchers = append(r.Matchers, s.matchers...)
return s.StoreServer.Series(r, srv)
}
22 changes: 11 additions & 11 deletions pkg/query/storeset_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,29 +29,29 @@ var testGRPCOpts = []grpc.DialOption{
grpc.WithInsecure(),
}

type testStore struct {
type mockedStore struct {
infoDelay time.Duration
info storepb.InfoResponse
}

func (s *testStore) Info(ctx context.Context, r *storepb.InfoRequest) (*storepb.InfoResponse, error) {
func (s *mockedStore) Info(ctx context.Context, r *storepb.InfoRequest) (*storepb.InfoResponse, error) {
if s.infoDelay > 0 {
time.Sleep(s.infoDelay)
}
return &s.info, nil
}

func (s *testStore) Series(r *storepb.SeriesRequest, srv storepb.Store_SeriesServer) error {
func (s *mockedStore) Series(r *storepb.SeriesRequest, srv storepb.Store_SeriesServer) error {
return status.Error(codes.Unimplemented, "not implemented")
}

func (s *testStore) LabelNames(ctx context.Context, r *storepb.LabelNamesRequest) (
func (s *mockedStore) LabelNames(ctx context.Context, r *storepb.LabelNamesRequest) (
*storepb.LabelNamesResponse, error,
) {
return nil, status.Error(codes.Unimplemented, "not implemented")
}

func (s *testStore) LabelValues(ctx context.Context, r *storepb.LabelValuesRequest) (
func (s *mockedStore) LabelValues(ctx context.Context, r *storepb.LabelValuesRequest) (
*storepb.LabelValuesResponse, error,
) {
return nil, status.Error(codes.Unimplemented, "not implemented")
Expand Down Expand Up @@ -84,7 +84,7 @@ func startTestStores(storeMetas []testStoreMeta) (*testStores, error) {

srv := grpc.NewServer()

storeSrv := &testStore{
storeSrv := &mockedStore{
info: storepb.InfoResponse{
LabelSets: meta.extlsetFn(listener.Addr().String()),
MaxTime: meta.maxTime,
Expand Down Expand Up @@ -1012,12 +1012,12 @@ func TestUpdateStoreStateLastError(t *testing.T) {
storeStatuses: map[string]*StoreStatus{},
}
mockStoreRef := &storeRef{
addr: "testStore",
addr: "mockedStore",
}

mockStoreSet.updateStoreStatus(mockStoreRef, tc.InputError)

b, err := json.Marshal(mockStoreSet.storeStatuses["testStore"].LastError)
b, err := json.Marshal(mockStoreSet.storeStatuses["mockedStore"].LastError)
testutil.Ok(t, err)
testutil.Equals(t, tc.ExpectedLastErr, string(b))
}
Expand All @@ -1028,19 +1028,19 @@ func TestUpdateStoreStateForgetsPreviousErrors(t *testing.T) {
storeStatuses: map[string]*StoreStatus{},
}
mockStoreRef := &storeRef{
addr: "testStore",
addr: "mockedStore",
}

mockStoreSet.updateStoreStatus(mockStoreRef, errors.New("test err"))

b, err := json.Marshal(mockStoreSet.storeStatuses["testStore"].LastError)
b, err := json.Marshal(mockStoreSet.storeStatuses["mockedStore"].LastError)
testutil.Ok(t, err)
testutil.Equals(t, `"test err"`, string(b))

// updating status without and error should clear the previous one.
mockStoreSet.updateStoreStatus(mockStoreRef, nil)

b, err = json.Marshal(mockStoreSet.storeStatuses["testStore"].LastError)
b, err = json.Marshal(mockStoreSet.storeStatuses["mockedStore"].LastError)
testutil.Ok(t, err)
testutil.Equals(t, `null`, string(b))
}
Loading