Skip to content

Commit 00ffb3c

Browse files
authored
cleanup legacy Querier iterators code (take 1) (#5825)
* cleanup unnecessary iterators code Signed-off-by: Ben Ye <benye@amazon.com> * remove one more test Signed-off-by: Ben Ye <benye@amazon.com> * cleanup Signed-off-by: Ben Ye <benye@amazon.com> * remove unused code Signed-off-by: Ben Ye <benye@amazon.com> * fix lint Signed-off-by: Ben Ye <benye@amazon.com> --------- Signed-off-by: Ben Ye <benye@amazon.com>
1 parent 6bd8d82 commit 00ffb3c

File tree

8 files changed

+18
-341
lines changed

8 files changed

+18
-341
lines changed

pkg/querier/chunk_store_queryable.go

Lines changed: 0 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -2,32 +2,9 @@ package querier
22

33
import (
44
"github.com/prometheus/common/model"
5-
"github.com/prometheus/prometheus/model/labels"
65
"github.com/prometheus/prometheus/tsdb/chunkenc"
76

87
"github.com/cortexproject/cortex/pkg/chunk"
98
)
109

1110
type chunkIteratorFunc func(chunks []chunk.Chunk, from, through model.Time) chunkenc.Iterator
12-
13-
// Implements SeriesWithChunks
14-
type chunkSeries struct {
15-
labels labels.Labels
16-
chunks []chunk.Chunk
17-
chunkIteratorFunc chunkIteratorFunc
18-
mint, maxt int64
19-
}
20-
21-
func (s *chunkSeries) Labels() labels.Labels {
22-
return s.labels
23-
}
24-
25-
// Iterator returns a new iterator of the data of the series.
26-
func (s *chunkSeries) Iterator(chunkenc.Iterator) chunkenc.Iterator {
27-
return s.chunkIteratorFunc(s.chunks, model.Time(s.mint), model.Time(s.maxt))
28-
}
29-
30-
// Chunks implements SeriesWithChunks interface.
31-
func (s *chunkSeries) Chunks() []chunk.Chunk {
32-
return s.chunks
33-
}

pkg/querier/chunk_store_queryable_test.go

Lines changed: 0 additions & 75 deletions
Original file line numberDiff line numberDiff line change
@@ -2,58 +2,16 @@ package querier
22

33
import (
44
"context"
5-
"fmt"
6-
"sort"
7-
"testing"
85
"time"
96

10-
"github.com/go-kit/log"
117
"github.com/prometheus/common/model"
128
"github.com/prometheus/prometheus/model/labels"
13-
"github.com/prometheus/prometheus/promql"
149
"github.com/stretchr/testify/require"
15-
"github.com/thanos-io/promql-engine/engine"
16-
"github.com/thanos-io/promql-engine/logicalplan"
1710

1811
"github.com/cortexproject/cortex/pkg/chunk"
1912
promchunk "github.com/cortexproject/cortex/pkg/chunk/encoding"
2013
)
2114

22-
// Make sure that chunkSeries implements SeriesWithChunks
23-
var _ SeriesWithChunks = &chunkSeries{}
24-
25-
func TestChunkQueryable(t *testing.T) {
26-
t.Parallel()
27-
opts := promql.EngineOpts{
28-
Logger: log.NewNopLogger(),
29-
MaxSamples: 1e6,
30-
Timeout: 1 * time.Minute,
31-
}
32-
for _, thanosEngine := range []bool{false, true} {
33-
for _, testcase := range testcases {
34-
for _, encoding := range encodings {
35-
for _, query := range queries {
36-
t.Run(fmt.Sprintf("%s/%s/%s/ thanos engine enabled = %t", testcase.name, encoding.name, query.query, thanosEngine), func(t *testing.T) {
37-
var queryEngine promql.QueryEngine
38-
if thanosEngine {
39-
queryEngine = engine.New(engine.Opts{
40-
EngineOpts: opts,
41-
LogicalOptimizers: logicalplan.AllOptimizers,
42-
})
43-
} else {
44-
queryEngine = promql.NewEngine(opts)
45-
}
46-
47-
store, from := makeMockChunkStore(t, 24, encoding.e)
48-
queryable := newMockStoreQueryable(store, testcase.f)
49-
testRangeQuery(t, queryable, queryEngine, from, query)
50-
})
51-
}
52-
}
53-
}
54-
}
55-
}
56-
5715
type mockChunkStore struct {
5816
chunks []chunk.Chunk
5917
}
@@ -91,36 +49,3 @@ func mkChunk(t require.TestingT, mint, maxt model.Time, step time.Duration, enco
9149
}
9250
return chunk.NewChunk(metric, pc, mint, maxt)
9351
}
94-
95-
func TestPartitionChunksOutputIsSortedByLabels(t *testing.T) {
96-
t.Parallel()
97-
98-
var allChunks []chunk.Chunk
99-
100-
const count = 10
101-
// go down, to add series in reversed order
102-
for i := count; i > 0; i-- {
103-
ch := mkChunk(t, model.Time(0), model.Time(1000), time.Millisecond, promchunk.PrometheusXorChunk)
104-
// mkChunk uses `foo` as metric name, so we rename metric to be unique
105-
ch.Metric[0].Value = fmt.Sprintf("%02d", i)
106-
107-
allChunks = append(allChunks, ch)
108-
}
109-
110-
res := partitionChunks(allChunks, 0, 1000, mergeChunks)
111-
112-
// collect labels from each series
113-
var seriesLabels []labels.Labels
114-
for res.Next() {
115-
seriesLabels = append(seriesLabels, res.At().Labels())
116-
}
117-
118-
require.Len(t, seriesLabels, count)
119-
require.True(t, sort.IsSorted(sortedByLabels(seriesLabels)))
120-
}
121-
122-
type sortedByLabels []labels.Labels
123-
124-
func (b sortedByLabels) Len() int { return len(b) }
125-
func (b sortedByLabels) Swap(i, j int) { b[i], b[j] = b[j], b[i] }
126-
func (b sortedByLabels) Less(i, j int) bool { return labels.Compare(b[i], b[j]) < 0 }

pkg/querier/distributor_queryable.go

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,7 @@ import (
1111
"github.com/prometheus/prometheus/model/labels"
1212
"github.com/prometheus/prometheus/scrape"
1313
"github.com/prometheus/prometheus/storage"
14+
"github.com/prometheus/prometheus/tsdb/chunkenc"
1415
"github.com/prometheus/prometheus/util/annotations"
1516

1617
"github.com/cortexproject/cortex/pkg/cortexpb"
@@ -168,12 +169,11 @@ func (q *distributorQuerier) streamingSelect(ctx context.Context, sortSeries boo
168169
return storage.ErrSeriesSet(err)
169170
}
170171

171-
serieses = append(serieses, &chunkSeries{
172-
labels: ls,
173-
chunks: chunks,
174-
chunkIteratorFunc: q.chunkIterFn,
175-
mint: minT,
176-
maxt: maxT,
172+
serieses = append(serieses, &storage.SeriesEntry{
173+
Lset: ls,
174+
SampleIteratorFn: func(_ chunkenc.Iterator) chunkenc.Iterator {
175+
return q.chunkIterFn(chunks, model.Time(minT), model.Time(maxT))
176+
},
177177
})
178178
}
179179

pkg/querier/querier.go

Lines changed: 7 additions & 78 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@ import (
1717
"github.com/prometheus/prometheus/model/labels"
1818
"github.com/prometheus/prometheus/promql"
1919
"github.com/prometheus/prometheus/storage"
20+
"github.com/prometheus/prometheus/tsdb/chunkenc"
2021
"github.com/prometheus/prometheus/util/annotations"
2122
"github.com/thanos-io/promql-engine/engine"
2223
"github.com/thanos-io/promql-engine/logicalplan"
@@ -433,10 +434,7 @@ func (q querier) Select(ctx context.Context, sortSeries bool, sp *storage.Select
433434
}
434435
}
435436

436-
// we have all the sets from different sources (chunk from store, chunks from ingesters,
437-
// time series from store and time series from ingesters).
438-
// mergeSeriesSets will return sorted set.
439-
return q.mergeSeriesSets(result)
437+
return storage.NewMergeSeriesSet(result, storage.ChainedSeriesMerge)
440438
}
441439

442440
// LabelValues implements storage.Querier.
@@ -552,74 +550,6 @@ func (querier) Close() error {
552550
return nil
553551
}
554552

555-
func (q querier) mergeSeriesSets(sets []storage.SeriesSet) storage.SeriesSet {
556-
// Here we deal with sets that are based on chunks and build single set from them.
557-
// Remaining sets are merged with chunks-based one using storage.NewMergeSeriesSet
558-
559-
otherSets := []storage.SeriesSet(nil)
560-
chunks := []chunk.Chunk(nil)
561-
562-
for _, set := range sets {
563-
nonChunkSeries := []storage.Series(nil)
564-
565-
// SeriesSet may have some series backed up by chunks, and some not.
566-
for set.Next() {
567-
s := set.At()
568-
569-
if sc, ok := s.(SeriesWithChunks); ok {
570-
chunks = append(chunks, sc.Chunks()...)
571-
} else {
572-
nonChunkSeries = append(nonChunkSeries, s)
573-
}
574-
}
575-
576-
if err := set.Err(); err != nil {
577-
otherSets = append(otherSets, storage.ErrSeriesSet(err))
578-
} else if len(nonChunkSeries) > 0 {
579-
otherSets = append(otherSets, &sliceSeriesSet{series: nonChunkSeries, ix: -1})
580-
}
581-
}
582-
583-
if len(chunks) == 0 {
584-
return storage.NewMergeSeriesSet(otherSets, storage.ChainedSeriesMerge)
585-
}
586-
587-
// partitionChunks returns set with sorted series, so it can be used by NewMergeSeriesSet
588-
chunksSet := partitionChunks(chunks, q.mint, q.maxt, q.chunkIterFn)
589-
590-
if len(otherSets) == 0 {
591-
return chunksSet
592-
}
593-
594-
otherSets = append(otherSets, chunksSet)
595-
return storage.NewMergeSeriesSet(otherSets, storage.ChainedSeriesMerge)
596-
}
597-
598-
type sliceSeriesSet struct {
599-
series []storage.Series
600-
ix int
601-
}
602-
603-
func (s *sliceSeriesSet) Next() bool {
604-
s.ix++
605-
return s.ix < len(s.series)
606-
}
607-
608-
func (s *sliceSeriesSet) At() storage.Series {
609-
if s.ix < 0 || s.ix >= len(s.series) {
610-
return nil
611-
}
612-
return s.series[s.ix]
613-
}
614-
615-
func (s *sliceSeriesSet) Err() error {
616-
return nil
617-
}
618-
619-
func (s *sliceSeriesSet) Warnings() annotations.Annotations {
620-
return nil
621-
}
622-
623553
type storeQueryable struct {
624554
QueryableWithFilter
625555
QueryStoreAfter time.Duration
@@ -726,12 +656,11 @@ func partitionChunks(chunks []chunk.Chunk, mint, maxt int64, iteratorFunc chunkI
726656

727657
series := make([]storage.Series, 0, len(chunksBySeries))
728658
for i := range chunksBySeries {
729-
series = append(series, &chunkSeries{
730-
labels: chunksBySeries[i][0].Metric,
731-
chunks: chunksBySeries[i],
732-
chunkIteratorFunc: iteratorFunc,
733-
mint: mint,
734-
maxt: maxt,
659+
series = append(series, &storage.SeriesEntry{
660+
Lset: chunksBySeries[i][0].Metric,
661+
SampleIteratorFn: func(_ chunkenc.Iterator) chunkenc.Iterator {
662+
return iteratorFunc(chunksBySeries[i], model.Time(mint), model.Time(maxt))
663+
},
735664
})
736665
}
737666

pkg/querier/querier_test.go

Lines changed: 5 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -31,7 +31,6 @@ import (
3131
"github.com/cortexproject/cortex/pkg/ingester/client"
3232
"github.com/cortexproject/cortex/pkg/prom1/storage/metric"
3333
"github.com/cortexproject/cortex/pkg/querier/batch"
34-
"github.com/cortexproject/cortex/pkg/querier/iterators"
3534
"github.com/cortexproject/cortex/pkg/tenant"
3635
"github.com/cortexproject/cortex/pkg/util"
3736
"github.com/cortexproject/cortex/pkg/util/chunkcompat"
@@ -80,15 +79,6 @@ type query struct {
8079
}
8180

8281
var (
83-
testcases = []struct {
84-
name string
85-
f chunkIteratorFunc
86-
}{
87-
{"matrixes", mergeChunks},
88-
{"iterators", iterators.NewChunkMergeIterator},
89-
{"batches", batch.NewChunkMergeIterator},
90-
}
91-
9282
encodings = []struct {
9383
name string
9484
e promchunk.Encoding
@@ -512,7 +502,7 @@ func TestQuerier(t *testing.T) {
512502
cfg.ActiveQueryTrackerDir = ""
513503

514504
chunkStore, through := makeMockChunkStore(t, chunks, encoding.e)
515-
distributor := mockDistibutorFor(t, chunkStore, through)
505+
distributor := mockDistibutorFor(t, chunkStore.chunks)
516506

517507
overrides, err := validation.NewOverrides(DefaultLimitsConfig(), nil)
518508
require.NoError(t, err)
@@ -535,8 +525,8 @@ func TestQuerierMetric(t *testing.T) {
535525
overrides, err := validation.NewOverrides(DefaultLimitsConfig(), nil)
536526
require.NoError(t, err)
537527

538-
chunkStore, through := makeMockChunkStore(t, 24, promchunk.PrometheusXorChunk)
539-
distributor := mockDistibutorFor(t, chunkStore, through)
528+
chunkStore, _ := makeMockChunkStore(t, 24, promchunk.PrometheusXorChunk)
529+
distributor := mockDistibutorFor(t, chunkStore.chunks)
540530

541531
queryables := []QueryableWithFilter{}
542532
r := prometheus.NewRegistry()
@@ -1207,9 +1197,9 @@ func TestValidateMaxQueryLength(t *testing.T) {
12071197

12081198
// mockDistibutorFor duplicates the chunks in the mockChunkStore into the mockDistributor
12091199
// so we can test everything is dedupe correctly.
1210-
func mockDistibutorFor(t *testing.T, cs mockChunkStore, through model.Time) *MockDistributor {
1200+
func mockDistibutorFor(t *testing.T, cks []chunk.Chunk) *MockDistributor {
12111201
//parallel testing causes data race
1212-
chunks, err := chunkcompat.ToChunks(cs.chunks)
1202+
chunks, err := chunkcompat.ToChunks(cks)
12131203
require.NoError(t, err)
12141204

12151205
tsc := client.TimeSeriesChunk{

pkg/querier/series_with_chunks.go

Lines changed: 0 additions & 15 deletions
This file was deleted.

0 commit comments

Comments
 (0)