Skip to content

Commit b81d808

Browse files
authored
Refactor Cortex querier to use Queryable interface directly (#2060)
* New function now requires Queryable for store, instead of ChunkStore. This makes it easier to use non-chunk-based queryables. On the other hand, chunk-based queryables are still optimized in the way they were before (in mergeSeriesSets). Merged ingester_streaming_queryable.go into distributor_queryable.go. Merged unified_querier.go into querier.go. Signed-off-by: Peter Štibraný <peter.stibrany@grafana.com> * Don't wait for results to arrive if context is done. Signed-off-by: Peter Štibraný <peter.stibrany@grafana.com> * Rename parameters to NewQueryable. Signed-off-by: Peter Štibraný <peter.stibrany@grafana.com> * Include empty set with error. Unify checks in seriesSetWithFirstSeries methods. Signed-off-by: Peter Štibraný <peter.stibrany@grafana.com>
1 parent 1619938 commit b81d808

14 files changed

+308
-350
lines changed

pkg/compactor/compactor_test.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -339,6 +339,7 @@ func TestCompactor_ShouldCompactAllUsersOnShardingEnabledButOnlyOneInstanceRunni
339339
cfg := prepareConfig()
340340
cfg.ShardingEnabled = true
341341
cfg.ShardingRing.InstanceID = "compactor-1"
342+
cfg.ShardingRing.InstanceAddr = "1.2.3.4"
342343
cfg.ShardingRing.KVStore.Mock = consul.NewInMemoryClient(ring.GetCodec())
343344

344345
c, tsdbCompactor, logs, _, cleanup := prepare(t, cfg, bucketClient)

pkg/cortex/modules.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -238,7 +238,7 @@ func (t *Cortex) stopDistributor() (err error) {
238238
}
239239

240240
func (t *Cortex) initQuerier(cfg *Config) (err error) {
241-
queryable, engine := querier.New(cfg.Querier, t.distributor, t.querierChunkStore)
241+
queryable, engine := querier.New(cfg.Querier, t.distributor, querier.NewChunkStoreQueryable(cfg.Querier, t.querierChunkStore))
242242
api := v1.NewAPI(
243243
engine,
244244
queryable,
@@ -439,7 +439,7 @@ func (t *Cortex) stopTableManager() error {
439439
func (t *Cortex) initRuler(cfg *Config) (err error) {
440440
cfg.Ruler.Ring.ListenPort = cfg.Server.GRPCListenPort
441441
cfg.Ruler.Ring.KVStore.MemberlistKV = t.memberlistKVState.getMemberlistKV
442-
queryable, engine := querier.New(cfg.Querier, t.distributor, t.querierChunkStore)
442+
queryable, engine := querier.New(cfg.Querier, t.distributor, querier.NewChunkStoreQueryable(cfg.Querier, t.querierChunkStore))
443443

444444
t.ruler, err = ruler.NewRuler(cfg.Ruler, engine, queryable, t.distributor, prometheus.DefaultRegisterer, util.Logger)
445445
if err != nil {

pkg/querier/chunk_store_queryable.go

Lines changed: 10 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -44,10 +44,11 @@ func (q *chunkStoreQuerier) Select(sp *storage.SelectParams, matchers ...*labels
4444
return nil, nil, promql.ErrStorage{Err: err}
4545
}
4646

47-
return q.partitionChunks(chunks), nil, nil
47+
return partitionChunks(chunks, q.mint, q.maxt, q.chunkIteratorFunc), nil, nil
4848
}
4949

50-
func (q *chunkStoreQuerier) partitionChunks(chunks []chunk.Chunk) storage.SeriesSet {
50+
// Series in the returned set are sorted alphabetically by labels.
51+
func partitionChunks(chunks []chunk.Chunk, mint, maxt int64, iteratorFunc chunkIteratorFunc) storage.SeriesSet {
5152
chunksBySeries := map[model.Fingerprint][]chunk.Chunk{}
5253
for _, c := range chunks {
5354
fp := client.Fingerprint(c.Metric)
@@ -59,9 +60,9 @@ func (q *chunkStoreQuerier) partitionChunks(chunks []chunk.Chunk) storage.Series
5960
series = append(series, &chunkSeries{
6061
labels: chunksBySeries[i][0].Metric,
6162
chunks: chunksBySeries[i],
62-
chunkIteratorFunc: q.chunkIteratorFunc,
63-
mint: q.mint,
64-
maxt: q.maxt,
63+
chunkIteratorFunc: iteratorFunc,
64+
mint: mint,
65+
maxt: maxt,
6566
})
6667
}
6768

@@ -95,3 +96,7 @@ func (s *chunkSeries) Labels() labels.Labels {
9596
func (s *chunkSeries) Iterator() storage.SeriesIterator {
9697
return s.chunkIteratorFunc(s.chunks, model.Time(s.mint), model.Time(s.maxt))
9798
}
99+
100+
func (s *chunkSeries) Chunks() []chunk.Chunk {
101+
return s.chunks
102+
}

pkg/querier/chunk_store_queryable_test.go

Lines changed: 32 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@ package querier
33
import (
44
"context"
55
"fmt"
6+
"sort"
67
"testing"
78
"time"
89

@@ -65,3 +66,34 @@ func mkChunk(t require.TestingT, mint, maxt model.Time, step time.Duration, enco
6566
}
6667
return chunk.NewChunk(userID, fp, metric, pc, mint, maxt)
6768
}
69+
70+
func TestPartitionChunksOutputIsSortedByLabels(t *testing.T) {
71+
var allChunks []chunk.Chunk
72+
73+
const count = 10
74+
// go down, to add series in reversed order
75+
for i := count; i > 0; i-- {
76+
ch := mkChunk(t, model.Time(0), model.Time(1000), time.Millisecond, promchunk.Bigchunk)
77+
// mkChunk uses `foo` as metric name, so we rename metric to be unique
78+
ch.Metric[0].Value = fmt.Sprintf("%02d", i)
79+
80+
allChunks = append(allChunks, ch)
81+
}
82+
83+
res := partitionChunks(allChunks, 0, 1000, mergeChunks)
84+
85+
// collect labels from each series
86+
var seriesLabels []labels.Labels
87+
for res.Next() {
88+
seriesLabels = append(seriesLabels, res.At().Labels())
89+
}
90+
91+
require.Len(t, seriesLabels, count)
92+
require.True(t, sort.IsSorted(sortedByLabels(seriesLabels)))
93+
}
94+
95+
type sortedByLabels []labels.Labels
96+
97+
func (b sortedByLabels) Len() int { return len(b) }
98+
func (b sortedByLabels) Swap(i, j int) { b[i], b[j] = b[j], b[i] }
99+
func (b sortedByLabels) Less(i, j int) bool { return labels.Compare(b[i], b[j]) < 0 }

pkg/querier/distributor_queryable.go

Lines changed: 61 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -2,14 +2,17 @@ package querier
22

33
import (
44
"context"
5+
"sort"
56

67
"github.com/prometheus/common/model"
78
"github.com/prometheus/prometheus/pkg/labels"
89
"github.com/prometheus/prometheus/promql"
910
"github.com/prometheus/prometheus/storage"
11+
"github.com/weaveworks/common/user"
1012

1113
"github.com/cortexproject/cortex/pkg/ingester/client"
1214
"github.com/cortexproject/cortex/pkg/prom1/storage/metric"
15+
"github.com/cortexproject/cortex/pkg/util/chunkcompat"
1316
)
1417

1518
// Distributor is the read interface to the distributor, made an interface here
@@ -22,13 +25,15 @@ type Distributor interface {
2225
MetricsForLabelMatchers(ctx context.Context, from, through model.Time, matchers ...*labels.Matcher) ([]metric.Metric, error)
2326
}
2427

25-
func newDistributorQueryable(distributor Distributor) storage.Queryable {
28+
func newDistributorQueryable(distributor Distributor, streaming bool, iteratorFn chunkIteratorFunc) storage.Queryable {
2629
return storage.QueryableFunc(func(ctx context.Context, mint, maxt int64) (storage.Querier, error) {
2730
return &distributorQuerier{
2831
distributor: distributor,
2932
ctx: ctx,
3033
mint: mint,
3134
maxt: maxt,
35+
streaming: streaming,
36+
chunkIterFn: iteratorFn,
3237
}, nil
3338
})
3439
}
@@ -37,13 +42,25 @@ type distributorQuerier struct {
3742
distributor Distributor
3843
ctx context.Context
3944
mint, maxt int64
45+
streaming bool
46+
chunkIterFn chunkIteratorFunc
4047
}
4148

4249
func (q *distributorQuerier) Select(sp *storage.SelectParams, matchers ...*labels.Matcher) (storage.SeriesSet, storage.Warnings, error) {
43-
mint, maxt := q.mint, q.maxt
44-
if sp != nil {
45-
mint = sp.Start
46-
maxt = sp.End
50+
// Kludge: Prometheus passes nil SelectParams if it is doing a 'series' operation,
51+
// which needs only metadata.
52+
if sp == nil {
53+
ms, err := q.distributor.MetricsForLabelMatchers(q.ctx, model.Time(q.mint), model.Time(q.maxt), matchers...)
54+
if err != nil {
55+
return nil, nil, err
56+
}
57+
return metricsToSeriesSet(ms), nil, nil
58+
}
59+
60+
mint, maxt := sp.Start, sp.End
61+
62+
if q.streaming {
63+
return q.streamingSelect(*sp, matchers)
4764
}
4865

4966
matrix, err := q.distributor.Query(q.ctx, model.Time(mint), model.Time(maxt), matchers...)
@@ -54,6 +71,45 @@ func (q *distributorQuerier) Select(sp *storage.SelectParams, matchers ...*label
5471
return matrixToSeriesSet(matrix), nil, nil
5572
}
5673

74+
func (q *distributorQuerier) streamingSelect(sp storage.SelectParams, matchers []*labels.Matcher) (storage.SeriesSet, storage.Warnings, error) {
75+
userID, err := user.ExtractOrgID(q.ctx)
76+
if err != nil {
77+
return nil, nil, promql.ErrStorage{Err: err}
78+
}
79+
80+
mint, maxt := sp.Start, sp.End
81+
82+
results, err := q.distributor.QueryStream(q.ctx, model.Time(mint), model.Time(maxt), matchers...)
83+
if err != nil {
84+
return nil, nil, promql.ErrStorage{Err: err}
85+
}
86+
87+
serieses := make([]storage.Series, 0, len(results))
88+
for _, result := range results {
89+
// Sometimes the ingester can send series that have no data.
90+
if len(result.Chunks) == 0 {
91+
continue
92+
}
93+
94+
ls := client.FromLabelAdaptersToLabels(result.Labels)
95+
sort.Sort(ls)
96+
97+
chunks, err := chunkcompat.FromChunks(userID, ls, result.Chunks)
98+
if err != nil {
99+
return nil, nil, promql.ErrStorage{Err: err}
100+
}
101+
102+
series := &chunkSeries{
103+
labels: ls,
104+
chunks: chunks,
105+
chunkIteratorFunc: q.chunkIterFn,
106+
}
107+
serieses = append(serieses, series)
108+
}
109+
110+
return newConcreteSeriesSet(serieses), nil, nil
111+
}
112+
57113
func (q *distributorQuerier) LabelValues(name string) ([]string, storage.Warnings, error) {
58114
lv, err := q.distributor.LabelValuesForLabelName(q.ctx, model.LabelName(name))
59115
return lv, nil, err

pkg/querier/distributor_queryable_test.go

Lines changed: 54 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -6,10 +6,15 @@ import (
66

77
"github.com/prometheus/common/model"
88
"github.com/prometheus/prometheus/pkg/labels"
9+
"github.com/prometheus/prometheus/storage"
910
"github.com/stretchr/testify/require"
11+
"github.com/weaveworks/common/user"
1012

13+
"github.com/cortexproject/cortex/pkg/chunk"
14+
"github.com/cortexproject/cortex/pkg/chunk/encoding"
1115
"github.com/cortexproject/cortex/pkg/ingester/client"
1216
"github.com/cortexproject/cortex/pkg/prom1/storage/metric"
17+
"github.com/cortexproject/cortex/pkg/util/chunkcompat"
1318
)
1419

1520
const (
@@ -32,11 +37,58 @@ func TestDistributorQuerier(t *testing.T) {
3237
},
3338
},
3439
}
35-
queryable := newDistributorQueryable(d)
40+
queryable := newDistributorQueryable(d, false, nil)
3641
querier, err := queryable.Querier(context.Background(), mint, maxt)
3742
require.NoError(t, err)
3843

39-
seriesSet, _, err := querier.Select(nil)
44+
seriesSet, _, err := querier.Select(&storage.SelectParams{Start: mint, End: maxt})
45+
require.NoError(t, err)
46+
47+
require.True(t, seriesSet.Next())
48+
series := seriesSet.At()
49+
require.Equal(t, labels.Labels{{Name: "bar", Value: "baz"}}, series.Labels())
50+
51+
require.True(t, seriesSet.Next())
52+
series = seriesSet.At()
53+
require.Equal(t, labels.Labels{{Name: "foo", Value: "bar"}}, series.Labels())
54+
55+
require.False(t, seriesSet.Next())
56+
require.NoError(t, seriesSet.Err())
57+
}
58+
59+
func TestIngesterStreaming(t *testing.T) {
60+
// We need to make sure that there is atleast one chunk present,
61+
// else no series will be selected.
62+
promChunk, err := encoding.NewForEncoding(encoding.Bigchunk)
63+
require.NoError(t, err)
64+
65+
clientChunks, err := chunkcompat.ToChunks([]chunk.Chunk{
66+
chunk.NewChunk("", 0, nil, promChunk, model.Earliest, model.Earliest),
67+
})
68+
require.NoError(t, err)
69+
70+
d := &mockDistributor{
71+
r: []client.TimeSeriesChunk{
72+
{
73+
Labels: []client.LabelAdapter{
74+
{Name: "bar", Value: "baz"},
75+
},
76+
Chunks: clientChunks,
77+
},
78+
{
79+
Labels: []client.LabelAdapter{
80+
{Name: "foo", Value: "bar"},
81+
},
82+
Chunks: clientChunks,
83+
},
84+
},
85+
}
86+
ctx := user.InjectOrgID(context.Background(), "0")
87+
queryable := newDistributorQueryable(d, true, mergeChunks)
88+
querier, err := queryable.Querier(ctx, mint, maxt)
89+
require.NoError(t, err)
90+
91+
seriesSet, _, err := querier.Select(&storage.SelectParams{Start: mint, End: maxt})
4092
require.NoError(t, err)
4193

4294
require.True(t, seriesSet.Next())

0 commit comments

Comments
 (0)