diff --git a/pkg/query/querier_test.go b/pkg/query/querier_test.go index ad764cddfc5..642a0e601b6 100644 --- a/pkg/query/querier_test.go +++ b/pkg/query/querier_test.go @@ -294,6 +294,136 @@ func (s series) Iterator() chunkenc.Iterator { return newMockedSeriesIterator(s.samples) } +// TestQuerier_Select_After_promql tests expected results with and without deduplication after passing all data to promql. +// To test with real data: +// Collect the expected results from Prometheus or Thanos through "/api/v1/query_range" and save to a file. +// Collect raw data to be used for local storage: +// scripts/insecure_grpcurl_series.sh queriesGrpcIP:port '[{"name": "__name__", "value":"cluster_version"},{"name":"_id","value":"xxx"}]' 1597823000000 1597824600000 > localStorage.json +// Remove all white space from the file and put each series in a new line. +// When collecting the raw data mint should be Prometheus query time minus the default look back delta(default is 5min or 300000ms) +// For example if the Prometheus query mint is 1597823700000 the grpccurl query mint should be 1597823400000. +// This is because when promql displays data for a given range it looks back 5min before the requested time window. +func TestQuerier_Select_After_promql(t *testing.T) { + logger := log.NewLogfmtLogger(os.Stderr) + + for _, tcase := range []struct { + name string + storeAPI storepb.StoreServer + replicaLabels []string // Replica label groups chunks by the label value and strips it from the final result. + hints *storage.SelectHints + equivalentQuery string + + expected []series + expectedAfterDedup series + expectedWarning string + }{ + + { + // Simulate Prom with 1m scrape interval scraping 30s apart. + // This should start with replica-1 until a brief outage, + // then switch to replica-2 after not seeing a value for 2 * interval = 120s. + name: "switch to replica 2 after an outage", + storeAPI: &storeServer{ + resps: []*storepb.SeriesResponse{ + storeSeriesResponse(t, labels.FromStrings("a", "a"), []sample{{0, 1}, {60000, 3}, {120000, 5} /* outage for 3 minutes */, {300000, 11}, {360000, 13}}), + storeSeriesResponse(t, labels.FromStrings("a", "b"), []sample{{30000, 2}, {90000, 5}, {150000, 6}, {210000, 8}, {270000, 10}, {330000, 12}}), + }, + }, + hints: &storage.SelectHints{ + Start: 0, + End: 360000, + Step: 60000, + }, + replicaLabels: []string{"a"}, + equivalentQuery: `{a=~"a|b"}`, + expected: []series{ + { + lset: labels.FromStrings("a", "a"), + samples: []sample{{0, 1}, {60000, 3}, {120000, 5}, {t: 180000, v: 5}, {t: 240000, v: 5}, {t: 300000, v: 11}, {t: 360000, v: 13}}, + }, + { + lset: labels.FromStrings("a", "b"), + samples: []sample{{t: 60000, v: 2}, {t: 120000, v: 5}, {t: 180000, v: 6}, {t: 240000, v: 8}, {t: 300000, v: 10}, {t: 360000, v: 12}}, + }, + }, + expectedAfterDedup: series{ + lset: labels.Labels{}, + samples: []sample{{0, 1}, {60000, 2}, {120000, 5}, {t: 180000, v: 6}, {t: 240000, v: 8}, {t: 300000, v: 10}, {t: 360000, v: 12}}, + }, + }, + + { + // // Regression test against https://github.com/thanos-io/thanos/issues/2890. + name: "when switching replicas make sure the time window between samples is never bigger then the lookback delta", + storeAPI: func() storepb.StoreServer { + s, err := store.NewLocalStoreFromJSONMmappableFile(logger, component.Debug, nil, "./testdata/issue2890-seriesresponses.json", store.ScanGRPCCurlProtoStreamMessages) + testutil.Ok(t, err) + return s + }(), + equivalentQuery: `cluster_version{}`, + replicaLabels: []string{"replica"}, + hints: &storage.SelectHints{ + Start: 1597823700000, + End: 1597824600000, + Step: 3000, + }, + expected: jsonToSeries(t, "testdata/issue2890-expected.json"), + expectedAfterDedup: jsonToSeries(t, "testdata/issue2890-expected-dedup.json")[0], + }, + } { + timeout := 5 * time.Minute + e := promql.NewEngine(promql.EngineOpts{ + Logger: logger, + Timeout: timeout, + MaxSamples: math.MaxInt64, + }) + + t.Run(tcase.name, func(t *testing.T) { + for _, sc := range []struct { + dedup bool + expected []series + }{ + {dedup: false, expected: tcase.expected}, + {dedup: true, expected: []series{tcase.expectedAfterDedup}}, + } { + + resolution := time.Duration(tcase.hints.Step) * time.Millisecond + t.Run(fmt.Sprintf("dedup=%v, resolution=%v", sc.dedup, resolution.String()), func(t *testing.T) { + var actual []series + // Boostrap a local store and pass the data through promql. + { + g := gate.New(2) + mq := &mockedQueryable{ + Creator: func(mint, maxt int64) storage.Querier { + return newQuerier(context.Background(), nil, nil, mint, maxt, tcase.replicaLabels, nil, tcase.storeAPI, sc.dedup, 0, true, false, g, timeout) + }, + } + t.Cleanup(func() { + testutil.Ok(t, mq.Close()) + }) + q, err := e.NewRangeQuery(mq, tcase.equivalentQuery, timestamp.Time(tcase.hints.Start), timestamp.Time(tcase.hints.End), resolution) + testutil.Ok(t, err) + t.Cleanup(q.Close) + res := q.Exec(context.Background()) + testutil.Ok(t, res.Err) + actual = promqlResToSeries(res) + if tcase.expectedWarning != "" { + warns := res.Warnings + testutil.Assert(t, len(warns) == 1, "expected only single warnings") + testutil.Equals(t, tcase.expectedWarning, warns[0].Error()) + } + } + + testutil.Equals(t, sc.expected, actual, "promql result doesn't match the expected output") + if sc.dedup { + testutil.Assert(t, len(actual) == 1, "expected only single response, subqueries?") + } + }) + } + }) + } +} + func TestQuerier_Select(t *testing.T) { logger := log.NewLogfmtLogger(os.Stderr) @@ -578,136 +708,6 @@ func testSelectResponse(t *testing.T, expected []series, res storage.SeriesSet) } } -// TestQuerier_Select_After_promql tests expected results with and without deduplication after passing all data to promql. -// To test with real data: -// Collect the expected results from Prometheus or Thanos through "/api/v1/query_range" and save to a file. -// Collect raw data to be used for local storage: -// scripts/insecure_grpcurl_series.sh queriesGrpcIP:port '[{"name": "__name__", "value":"cluster_version"},{"name":"_id","value":"xxx"}]' 1597823000000 1597824600000 > localStorage.json -// Remove all white space from the file and put each series in a new line. -// When collecting the raw data mint should be Prometheus query time minus the default look back delta(default is 5min or 300000ms) -// For example if the Prometheus query mint is 1597823700000 the grpccurl query mint should be 1597823400000. -// This is because when promql displays data for a given range it looks back 5min before the requested time window. -func TestQuerier_Select_After_promql(t *testing.T) { - logger := log.NewLogfmtLogger(os.Stderr) - - for _, tcase := range []struct { - name string - storeAPI storepb.StoreServer - replicaLabels []string // Replica label groups chunks by the label value and strips it from the final result. - hints *storage.SelectHints - equivalentQuery string - - expected []series - expectedAfterDedup series - expectedWarning string - }{ - - { - // Simulate Prom with 1m scrape interval scraping 30s apart. - // This should start with replica-1 until a brief outage, - // then switch to replica-2 after not seeing a value for 2 * interval = 120s. - name: "switch to replica 2 after an outage", - storeAPI: &storeServer{ - resps: []*storepb.SeriesResponse{ - storeSeriesResponse(t, labels.FromStrings("a", "a"), []sample{{0, 1}, {60000, 3}, {120000, 5} /* outage for 3 minutes */, {300000, 11}, {360000, 13}}), - storeSeriesResponse(t, labels.FromStrings("a", "b"), []sample{{30000, 2}, {90000, 5}, {150000, 6}, {210000, 8}, {270000, 10}, {330000, 12}}), - }, - }, - hints: &storage.SelectHints{ - Start: 0, - End: 360000, - Step: 60000, - }, - replicaLabels: []string{"a"}, - equivalentQuery: `{a=~"a|b"}`, - expected: []series{ - { - lset: labels.FromStrings("a", "a"), - samples: []sample{{0, 1}, {60000, 3}, {120000, 5}, {t: 180000, v: 5}, {t: 240000, v: 5}, {t: 300000, v: 11}, {t: 360000, v: 13}}, - }, - { - lset: labels.FromStrings("a", "b"), - samples: []sample{{t: 60000, v: 2}, {t: 120000, v: 5}, {t: 180000, v: 6}, {t: 240000, v: 8}, {t: 300000, v: 10}, {t: 360000, v: 12}}, - }, - }, - expectedAfterDedup: series{ - lset: labels.Labels{}, - samples: []sample{{0, 1}, {60000, 2}, {120000, 5}, {t: 180000, v: 6}, {t: 240000, v: 8}, {t: 300000, v: 10}, {t: 360000, v: 12}}, - }, - }, - - { - // // Regression test against https://github.com/thanos-io/thanos/issues/2890. - name: "when switching replicas make sure the time window between samples is never bigger then the lookback delta", - storeAPI: func() storepb.StoreServer { - s, err := store.NewLocalStoreFromJSONMmappableFile(logger, component.Debug, nil, "./testdata/issue2890-seriesresponses.json", store.ScanGRPCCurlProtoStreamMessages) - testutil.Ok(t, err) - return s - }(), - equivalentQuery: `cluster_version{}`, - replicaLabels: []string{"replica"}, - hints: &storage.SelectHints{ - Start: 1597823700000, - End: 1597824600000, - Step: 3000, - }, - expected: jsonToSeries(t, "testdata/issue2890-expected.json"), - expectedAfterDedup: jsonToSeries(t, "testdata/issue2890-expected-dedup.json")[0], - }, - } { - timeout := 5 * time.Minute - e := promql.NewEngine(promql.EngineOpts{ - Logger: logger, - Timeout: timeout, - MaxSamples: math.MaxInt64, - }) - - t.Run(tcase.name, func(t *testing.T) { - for _, sc := range []struct { - dedup bool - expected []series - }{ - {dedup: false, expected: tcase.expected}, - {dedup: true, expected: []series{tcase.expectedAfterDedup}}, - } { - - resolution := time.Duration(tcase.hints.Step) * time.Millisecond - t.Run(fmt.Sprintf("dedup=%v, resolution=%v", sc.dedup, resolution.String()), func(t *testing.T) { - var actual []series - // Boostrap a local store and pass the data through promql. - { - g := gate.New(2) - mq := &mockedQueryable{ - Creator: func(mint, maxt int64) storage.Querier { - return newQuerier(context.Background(), nil, nil, mint, maxt, tcase.replicaLabels, nil, tcase.storeAPI, sc.dedup, 0, true, false, g, timeout) - }, - } - t.Cleanup(func() { - testutil.Ok(t, mq.Close()) - }) - q, err := e.NewRangeQuery(mq, tcase.equivalentQuery, timestamp.Time(tcase.hints.Start), timestamp.Time(tcase.hints.End), resolution) - testutil.Ok(t, err) - t.Cleanup(q.Close) - res := q.Exec(context.Background()) - testutil.Ok(t, res.Err) - actual = promqlResToSeries(res) - if tcase.expectedWarning != "" { - warns := res.Warnings - testutil.Assert(t, len(warns) == 1, "expected only single warnings") - testutil.Equals(t, tcase.expectedWarning, warns[0].Error()) - } - } - - testutil.Equals(t, sc.expected, actual, "promql result doesn't match the expected output") - if sc.dedup { - testutil.Assert(t, len(actual) == 1, "expected only single response, subqueries?") - } - }) - } - }) - } -} - func jsonToSeries(t *testing.T, filename string) []series { file, err := ioutil.ReadFile(filename) testutil.Ok(t, err) @@ -770,29 +770,6 @@ func promqlResToSeries(res *promql.Result) []series { return series } -type querierResponseCatcher struct { - storage.Querier - t testing.TB - - resp []storage.SeriesSet -} - -func (q *querierResponseCatcher) Select(selectSorted bool, p *storage.SelectHints, m ...*labels.Matcher) storage.SeriesSet { - s := q.Querier.Select(selectSorted, p, m...) - q.resp = append(q.resp, s) - return storage.NoopSeriesSet() -} - -func (q querierResponseCatcher) Close() error { return nil } - -func (q *querierResponseCatcher) warns() []storage.Warnings { - var warns []storage.Warnings - for _, r := range q.resp { - warns = append(warns, r.Warnings()) - } - return warns -} - type mockedQueryable struct { Creator func(int64, int64) storage.Querier querier storage.Querier @@ -820,6 +797,29 @@ func (q *mockedQueryable) Close() error { return nil } +type querierResponseCatcher struct { + storage.Querier + t testing.TB + + resp []storage.SeriesSet +} + +func (q *querierResponseCatcher) Select(selectSorted bool, p *storage.SelectHints, m ...*labels.Matcher) storage.SeriesSet { + s := q.Querier.Select(selectSorted, p, m...) + q.resp = append(q.resp, s) + return storage.NoopSeriesSet() +} + +func (q querierResponseCatcher) Close() error { return nil } + +func (q *querierResponseCatcher) warns() []storage.Warnings { + var warns []storage.Warnings + for _, r := range q.resp { + warns = append(warns, r.Warnings()) + } + return warns +} + // TODO(bwplotka): Reuse SeriesSets from chunk iterators from Prometheus. type mockedSeriesSet struct { series []series