Skip to content

Commit

Permalink
Added initial PromQL acceptance tests.
Browse files Browse the repository at this point in the history
* Improved logging.
* Improved debug matching info.
* Fixed important matching bug.
* Found reset bug, added issue #3644 and commented code.

Signed-off-by: Bartlomiej Plotka <bwplotka@gmail.com>
  • Loading branch information
bwplotka committed Dec 18, 2020
1 parent 3c3f2e2 commit 9b86020
Show file tree
Hide file tree
Showing 34 changed files with 3,793 additions and 207 deletions.
2 changes: 1 addition & 1 deletion cmd/thanos/rule.go
Original file line number Diff line number Diff line change
Expand Up @@ -548,7 +548,7 @@ func runRule(

// Start gRPC server.
{
tsdbStore := store.NewTSDBStore(logger, reg, db, component.Rule, lset)
tsdbStore := store.NewTSDBStore(logger, db, component.Rule, lset)

tlsCfg, err := tls.NewServerConfig(log.With(logger, "protocol", "gRPC"), grpcCert, grpcKey, grpcClientCA)
if err != nil {
Expand Down
6 changes: 3 additions & 3 deletions pkg/api/query/v1_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -180,7 +180,7 @@ func TestQueryEndpoints(t *testing.T) {
baseAPI: &baseAPI.BaseAPI{
Now: func() time.Time { return now },
},
queryableCreate: query.NewQueryableCreator(nil, nil, store.NewTSDBStore(nil, nil, db, component.Query, nil), 2, timeout),
queryableCreate: query.NewQueryableCreator(nil, nil, store.NewTSDBStore(nil, db, component.Query, nil), 2, timeout),
queryEngine: func(int64) *promql.Engine {
return qe
},
Expand Down Expand Up @@ -674,7 +674,7 @@ func TestMetadataEndpoints(t *testing.T) {
baseAPI: &baseAPI.BaseAPI{
Now: func() time.Time { return now },
},
queryableCreate: query.NewQueryableCreator(nil, nil, store.NewTSDBStore(nil, nil, db, component.Query, nil), 2, timeout),
queryableCreate: query.NewQueryableCreator(nil, nil, store.NewTSDBStore(nil, db, component.Query, nil), 2, timeout),
queryEngine: func(int64) *promql.Engine {
return qe
},
Expand All @@ -684,7 +684,7 @@ func TestMetadataEndpoints(t *testing.T) {
baseAPI: &baseAPI.BaseAPI{
Now: func() time.Time { return now },
},
queryableCreate: query.NewQueryableCreator(nil, nil, store.NewTSDBStore(nil, nil, db, component.Query, nil), 2, timeout),
queryableCreate: query.NewQueryableCreator(nil, nil, store.NewTSDBStore(nil, db, component.Query, nil), 2, timeout),
queryEngine: func(int64) *promql.Engine {
return qe
},
Expand Down
3 changes: 2 additions & 1 deletion pkg/compact/downsample/downsample.go
Original file line number Diff line number Diff line change
Expand Up @@ -513,7 +513,7 @@ func downsampleAggrBatch(chks []*AggrChunk, buf *[]sample, resolution int64) (ch
return chk, err
}

// Handle counters by reading them properly.
// Handle counters by applying resets directly.
acs := make([]chunkenc.Iterator, 0, len(chks))
for _, achk := range chks {
c, err := achk.Get(AggrCounter)
Expand Down Expand Up @@ -580,6 +580,7 @@ type sample struct {
// It handles overlapped chunks (removes overlaps).
// NOTE: It is important to deduplicate with care ensuring that you don't hit
// issue https://github.com/thanos-io/thanos/issues/2401#issuecomment-621958839.
// NOTE(bwplotka): This hides resets from PromQL engine. This means it will not work for PromQL resets function.
type ApplyCounterResetsSeriesIterator struct {
chks []chunkenc.Iterator
i int // Current chunk.
Expand Down
4 changes: 2 additions & 2 deletions pkg/promclient/promclient.go
Original file line number Diff line number Diff line change
Expand Up @@ -653,12 +653,12 @@ func (c *Client) get2xxResultWithGRPCErrors(ctx context.Context, spanName string

// SeriesInGRPC returns the labels from Prometheus series API. It uses gRPC errors.
// NOTE: This method is tested in pkg/store/prometheus_test.go against Prometheus.
func (c *Client) SeriesInGRPC(ctx context.Context, base *url.URL, matchers []storepb.LabelMatcher, startTime, endTime int64) ([]map[string]string, error) {
func (c *Client) SeriesInGRPC(ctx context.Context, base *url.URL, matchers []*labels.Matcher, startTime, endTime int64) ([]map[string]string, error) {
u := *base
u.Path = path.Join(u.Path, "/api/v1/series")
q := u.Query()

q.Add("match[]", storepb.MatchersToString(matchers...))
q.Add("match[]", storepb.PromMatchersToString(matchers...))
q.Add("start", formatTime(timestamp.Time(startTime)))
q.Add("end", formatTime(timestamp.Time(endTime)))
u.RawQuery = q.Encode()
Expand Down
1 change: 1 addition & 0 deletions pkg/query/iter.go
Original file line number Diff line number Diff line change
Expand Up @@ -185,6 +185,7 @@ func (s *chunkSeries) Iterator() chunkenc.Iterator {
for _, c := range s.chunks {
its = append(its, getFirstIterator(c.Counter, c.Raw))
}
// TODO(bwplotka): This breaks resets function. See https://github.com/thanos-io/thanos/issues/3644
sit = downsample.NewApplyCounterResetsIterator(its...)
default:
return errSeriesIterator{err: errors.Errorf("unexpected result aggregate type %v", s.aggrs)}
Expand Down
3 changes: 2 additions & 1 deletion pkg/query/querier.go
Original file line number Diff line number Diff line change
Expand Up @@ -255,7 +255,7 @@ func (q *querier) Select(_ bool, hints *storage.SelectHints, ms ...*labels.Match
}

func (q *querier) selectFn(ctx context.Context, hints *storage.SelectHints, ms ...*labels.Matcher) (storage.SeriesSet, error) {
sms, err := storepb.TranslatePromMatchers(ms...)
sms, err := storepb.PromMatchersToMatchers(ms...)
if err != nil {
return nil, errors.Wrap(err, "convert matchers")
}
Expand All @@ -265,6 +265,7 @@ func (q *querier) selectFn(ctx context.Context, hints *storage.SelectHints, ms .
// TODO(bwplotka): Pass it using the SeriesRequest instead of relying on context.
ctx = context.WithValue(ctx, store.StoreMatcherKey, q.storeDebugMatchers)

// TODO(bwplotka): Use inprocess gRPC.
resp := &seriesServer{ctx: ctx}
if err := q.proxy.Series(&storepb.SeriesRequest{
MinTime: hints.Start,
Expand Down
11 changes: 5 additions & 6 deletions pkg/query/querier_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,6 @@ import (
"github.com/prometheus/prometheus/promql"
"github.com/prometheus/prometheus/storage"
"github.com/prometheus/prometheus/tsdb/chunkenc"

"github.com/thanos-io/thanos/pkg/component"
"github.com/thanos-io/thanos/pkg/store"
"github.com/thanos-io/thanos/pkg/store/labelpb"
Expand All @@ -41,7 +40,7 @@ type sample struct {
}

func TestQueryableCreator_MaxResolution(t *testing.T) {
testProxy := &storeServer{resps: []*storepb.SeriesResponse{}}
testProxy := &testStoreServer{resps: []*storepb.SeriesResponse{}}
queryableCreator := NewQueryableCreator(nil, nil, testProxy, 2, 5*time.Second)

oneHourMillis := int64(1*time.Hour) / int64(time.Millisecond)
Expand All @@ -60,7 +59,7 @@ func TestQueryableCreator_MaxResolution(t *testing.T) {

// Tests E2E how PromQL works with downsampled data.
func TestQuerier_DownsampledData(t *testing.T) {
testProxy := &storeServer{
testProxy := &testStoreServer{
resps: []*storepb.SeriesResponse{
storeSeriesResponse(t, labels.FromStrings("__name__", "a", "zzz", "a", "aaa", "bbb"), []sample{{99, 1}, {199, 5}}), // Downsampled chunk from Store.
storeSeriesResponse(t, labels.FromStrings("__name__", "a", "zzz", "b", "bbbb", "eee"), []sample{{99, 3}, {199, 8}}), // Downsampled chunk from Store.
Expand Down Expand Up @@ -411,7 +410,7 @@ func TestQuerier_Select(t *testing.T) {
}{
{
name: "select overlapping data with partial error",
storeAPI: &storeServer{
storeAPI: &testStoreServer{
resps: []*storepb.SeriesResponse{
storeSeriesResponse(t, labels.FromStrings("a", "a"), []sample{{0, 0}, {2, 1}, {3, 2}}),
storepb.NewWarnSeriesResponse(errors.New("partial error")),
Expand Down Expand Up @@ -1468,14 +1467,14 @@ func BenchmarkDedupSeriesIterator(b *testing.B) {
})
}

type storeServer struct {
type testStoreServer struct {
// This field just exist to pseudo-implement the unused methods of the interface.
storepb.StoreServer

resps []*storepb.SeriesResponse
}

func (s *storeServer) Series(_ *storepb.SeriesRequest, srv storepb.Store_SeriesServer) error {
func (s *testStoreServer) Series(_ *storepb.SeriesRequest, srv storepb.Store_SeriesServer) error {
for _, resp := range s.resps {
err := srv.Send(resp)
if err != nil {
Expand Down
133 changes: 133 additions & 0 deletions pkg/query/query_reducer_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,133 @@
package query

/**
// TODO(bwplotka): Move to separate package?
func TestQuerierWithMapReducing(t *testing.T) {
logger := log.NewLogfmtLogger(os.Stderr)
timeout := 100 * time.Second
// Readable example of extemely shared store APIs and their responses.
// NOTE: Series are sorted within label sets. Within series samples are sorted by time. This is all by StoreAPI contract.
// Non overlapping samples.
storeResponsesShardedBySomeSeriesByTime := [][][]*storepb.SeriesResponse{
// Vertical shards (series).
{
// Horizontal shards (time).
{
storeSeriesResponse(t, labels.FromStrings("_type", "counter", "a", "a"), []sample{{0, 0}, {2, 3}, {3, 51}}),
storeSeriesResponse(t, labels.FromStrings("_type", "counter", "a", "a"), []sample{{5, 65}, {6, 24}, {7, 321}}), // Reset.
storeSeriesResponse(t, labels.FromStrings("_type", "counter", "a", "b"), []sample{{2, 4}, {3, 24}, {4, 33}}, []sample{{5, 77}, {6, 94}, {7, 360}}),
storeSeriesResponse(t, labels.FromStrings("_type", "counter", "a", "c"), []sample{{100, 1}, {300, 3}, {400, 54}}),
storeSeriesResponse(t, labels.FromStrings("_type", "gauge", "a", "a"), []sample{{0, -24}, {2, 53}, {3, -2}}),
storeSeriesResponse(t, labels.FromStrings("_type", "gauge", "a", "a"), []sample{{5, 10}, {6, 23}, {7, -30}}),
storeSeriesResponse(t, labels.FromStrings("_type", "gauge", "a", "b"), []sample{{2, 24}, {3, -12}, {4, 4}}, []sample{{5, 12}, {6, 45}, {7, -24}}),
storeSeriesResponse(t, labels.FromStrings("_type", "gauge", "a", "c"), []sample{{100, 1}, {300, -3}, {400, 43}}),
},
{
storeSeriesResponse(t, labels.FromStrings("a", "a"), []sample{{0, 0}, {2, 1}, {3, 2}}),
storeSeriesResponse(t, labels.FromStrings("a", "a"), []sample{{5, 5}, {6, 6}, {7, 7}}),
storeSeriesResponse(t, labels.FromStrings("a", "a"), []sample{{5, 5}, {6, 66}}), // Overlap samples for some reason.
storeSeriesResponse(t, labels.FromStrings("a", "b"), []sample{{2, 2}, {3, 3}, {4, 4}}, []sample{{1, 1}, {2, 2}, {3, 3}}),
storeSeriesResponse(t, labels.FromStrings("a", "c"), []sample{{100, 1}, {300, 3}, {400, 4}}),
},
},
}
t.Run("group by", func(t *testing.T) {
for _, c := range []struct {
query string
storeAPI storepb.StoreServer
mint, maxt int64
matchers []*labels.Matcher
replicaLabels []string
hints *storage.SelectHints
equivalentQuery string
expected []series
expectedAfterDedup series
expectedWarning string
}{
{
name: "select overlapping data with partial error",
storeAPI: &testStoreServer{
resps: []*storepb.SeriesResponse{
storeSeriesResponse(t, labels.FromStrings("a", "a"), []sample{{0, 0}, {2, 1}, {3, 2}}),
storepb.NewWarnSeriesResponse(errors.New("partial error")),
storeSeriesResponse(t, labels.FromStrings("a", "a"), []sample{{5, 5}, {6, 6}, {7, 7}}),
storeSeriesResponse(t, labels.FromStrings("a", "a"), []sample{{5, 5}, {6, 66}}), // Overlap samples for some reason.
storeSeriesResponse(t, labels.FromStrings("a", "b"), []sample{{2, 2}, {3, 3}, {4, 4}}, []sample{{1, 1}, {2, 2}, {3, 3}}),
storeSeriesResponse(t, labels.FromStrings("a", "c"), []sample{{100, 1}, {300, 3}, {400, 4}}),
},
},
mint: 1, maxt: 300,
replicaLabels: []string{"a"},
equivalentQuery: `{a=~"a|b|c"}`,
expected: []series{
{
lset: labels.FromStrings("a", "a"),
samples: []sample{{2, 1}, {3, 2}, {5, 5}, {6, 6}, {7, 7}},
},
{
lset: labels.FromStrings("a", "b"),
samples: []sample{{1, 1}, {2, 2}, {3, 3}, {4, 4}},
},
{
lset: labels.FromStrings("a", "c"),
samples: []sample{{100, 1}, {300, 3}},
},
},
expectedAfterDedup: series{
lset: labels.Labels{},
// We don't expect correctness here, it's just random non-replica data.
samples: []sample{{1, 1}, {2, 2}, {3, 3}, {4, 4}},
},
expectedWarning: "partial error",
},
g := gate.New(2)
q := newQuerier(context.Background(), logger, realSeriesWithStaleMarkerMint, realSeriesWithStaleMarkerMaxt, []string{"replica"}, nil, s, false, 0, true, false, g, timeout)
t.Cleanup(func() {
testutil.Ok(t, q.Close())
})
e := promql.NewEngine(promql.EngineOpts{
Logger: logger,
Timeout: timeout,
MaxSamples: math.MaxInt64,
})
t.Run("range", func(t *testing.T) {
q, err := e.NewRangeQuery(&mockedQueryable{querier: q}, `rate(gitlab_transaction_cache_read_hit_count_total[5m])`, timestamp.Time(realSeriesWithStaleMarkerMint).Add(5*time.Minute), timestamp.Time(realSeriesWithStaleMarkerMaxt), 100*time.Second)
testutil.Ok(t, err)
r := q.Exec(context.Background())
testutil.Ok(t, r.Err)
testutil.Assert(t, len(r.Warnings) == 0)
vec, err := r.Matrix()
testutil.Ok(t, err)
testutil.Equals(t, promql.Matrix{
{Metric: labels.FromStrings(), Points: []promql.Point{
{T: 1587690300000, V: 13.652631578947368}, {T: 1587690400000, V: 14.049122807017545}, {T: 1587690500000, V: 13.961403508771928}, {T: 1587690600000, V: 13.617543859649123}, {T: 1587690700000, V: 14.568421052631578}, {T: 1587690800000, V: 14.989473684210525},
{T: 1587690900000, V: 16.2}, {T: 1587691000000, V: 16.052631578947366}, {T: 1587691100000, V: 15.831578947368419}, {T: 1587691200000, V: 15.659649122807016}, {T: 1587691300000, V: 14.842105263157894}, {T: 1587691400000, V: 14.003508771929825},
{T: 1587691500000, V: 13.782456140350877}, {T: 1587691600000, V: 13.863157894736842}, {T: 1587691700000, V: 15.270282598474374}, {T: 1587691800000, V: 14.343859649122805}, {T: 1587691900000, V: 13.975438596491228}, {T: 1587692000000, V: 13.4},
{T: 1587692100000, V: 14.087719298245615}, {T: 1587692200000, V: 14.39298245614035}, {T: 1587692300000, V: 15.024561403508772}, {T: 1587692400000, V: 14.073684210526313}, {T: 1587692500000, V: 9.3772165751634}, {T: 1587692600000, V: 6.378947368421052},
{T: 1587692700000, V: 8.19298245614035}, {T: 1587692800000, V: 11.918703026416258}, {T: 1587692900000, V: 13.75813610765101}, {T: 1587693000000, V: 13.087719298245615}, {T: 1587693100000, V: 13.466666666666667}, {T: 1587693200000, V: 14.028070175438595},
{T: 1587693300000, V: 14.23859649122807}, {T: 1587693400000, V: 15.407017543859647}, {T: 1587693500000, V: 15.915789473684208}, {T: 1587693600000, V: 15.712280701754386},
}},
{Metric: labels.FromStrings(), Points: []promql.Point{
{T: 1587690300000, V: 13.69122807017544}, {T: 1587690400000, V: 14.098245614035086}, {T: 1587690500000, V: 13.905263157894735}, {T: 1587690600000, V: 13.617543859649123}, {T: 1587690700000, V: 14.350877192982455}, {T: 1587690800000, V: 15.003508771929825},
{T: 1587690900000, V: 16.12280701754386}, {T: 1587691000000, V: 16.049122807017543}, {T: 1587691100000, V: 15.922807017543859}, {T: 1587691200000, V: 15.63157894736842}, {T: 1587691300000, V: 14.982456140350878}, {T: 1587691400000, V: 14.187259188557551},
{T: 1587691500000, V: 13.828070175438594}, {T: 1587691600000, V: 13.971929824561403}, {T: 1587691700000, V: 15.31994329585807}, {T: 1587691800000, V: 14.30877192982456}, {T: 1587691900000, V: 13.915789473684212}, {T: 1587692000000, V: 13.312280701754384},
{T: 1587692100000, V: 14.136842105263158}, {T: 1587692200000, V: 14.39298245614035}, {T: 1587692300000, V: 15.014035087719297}, {T: 1587692400000, V: 14.112280701754386}, {T: 1587692500000, V: 9.421065148148147}, {T: 1587692600000, V: 6.421368067203301},
{T: 1587692700000, V: 8.252631578947367}, {T: 1587692800000, V: 11.721237543747266}, {T: 1587692900000, V: 13.842105263157894}, {T: 1587693000000, V: 13.153509064307995}, {T: 1587693100000, V: 13.378947368421052}, {T: 1587693200000, V: 14.03157894736842},
{T: 1587693300000, V: 14.147368421052631}, {T: 1587693400000, V: 15.343159785693985}, {T: 1587693500000, V: 15.90877192982456}, {T: 1587693600000, V: 15.761403508771927},
}},
}, vec)
})
})
}
*/
Loading

0 comments on commit 9b86020

Please sign in to comment.