Skip to content

cleanup legacy Querier iterators code (take 1) #5825

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 5 commits into from
Apr 18, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
23 changes: 0 additions & 23 deletions pkg/querier/chunk_store_queryable.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,32 +2,9 @@ package querier

import (
"github.com/prometheus/common/model"
"github.com/prometheus/prometheus/model/labels"
"github.com/prometheus/prometheus/tsdb/chunkenc"

"github.com/cortexproject/cortex/pkg/chunk"
)

type chunkIteratorFunc func(chunks []chunk.Chunk, from, through model.Time) chunkenc.Iterator

// Implements SeriesWithChunks
type chunkSeries struct {
labels labels.Labels
chunks []chunk.Chunk
chunkIteratorFunc chunkIteratorFunc
mint, maxt int64
}

func (s *chunkSeries) Labels() labels.Labels {
return s.labels
}

// Iterator returns a new iterator of the data of the series.
func (s *chunkSeries) Iterator(chunkenc.Iterator) chunkenc.Iterator {
return s.chunkIteratorFunc(s.chunks, model.Time(s.mint), model.Time(s.maxt))
}

// Chunks implements SeriesWithChunks interface.
func (s *chunkSeries) Chunks() []chunk.Chunk {
return s.chunks
}
75 changes: 0 additions & 75 deletions pkg/querier/chunk_store_queryable_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,58 +2,16 @@ package querier

import (
"context"
"fmt"
"sort"
"testing"
"time"

"github.com/go-kit/log"
"github.com/prometheus/common/model"
"github.com/prometheus/prometheus/model/labels"
"github.com/prometheus/prometheus/promql"
"github.com/stretchr/testify/require"
"github.com/thanos-io/promql-engine/engine"
"github.com/thanos-io/promql-engine/logicalplan"

"github.com/cortexproject/cortex/pkg/chunk"
promchunk "github.com/cortexproject/cortex/pkg/chunk/encoding"
)

// Make sure that chunkSeries implements SeriesWithChunks
var _ SeriesWithChunks = &chunkSeries{}

func TestChunkQueryable(t *testing.T) {
t.Parallel()
opts := promql.EngineOpts{
Logger: log.NewNopLogger(),
MaxSamples: 1e6,
Timeout: 1 * time.Minute,
}
for _, thanosEngine := range []bool{false, true} {
for _, testcase := range testcases {
for _, encoding := range encodings {
for _, query := range queries {
t.Run(fmt.Sprintf("%s/%s/%s/ thanos engine enabled = %t", testcase.name, encoding.name, query.query, thanosEngine), func(t *testing.T) {
var queryEngine promql.QueryEngine
if thanosEngine {
queryEngine = engine.New(engine.Opts{
EngineOpts: opts,
LogicalOptimizers: logicalplan.AllOptimizers,
})
} else {
queryEngine = promql.NewEngine(opts)
}

store, from := makeMockChunkStore(t, 24, encoding.e)
queryable := newMockStoreQueryable(store, testcase.f)
testRangeQuery(t, queryable, queryEngine, from, query)
})
}
}
}
}
}

type mockChunkStore struct {
chunks []chunk.Chunk
}
Expand Down Expand Up @@ -91,36 +49,3 @@ func mkChunk(t require.TestingT, mint, maxt model.Time, step time.Duration, enco
}
return chunk.NewChunk(metric, pc, mint, maxt)
}

func TestPartitionChunksOutputIsSortedByLabels(t *testing.T) {
t.Parallel()

var allChunks []chunk.Chunk

const count = 10
// go down, to add series in reversed order
for i := count; i > 0; i-- {
ch := mkChunk(t, model.Time(0), model.Time(1000), time.Millisecond, promchunk.PrometheusXorChunk)
// mkChunk uses `foo` as metric name, so we rename metric to be unique
ch.Metric[0].Value = fmt.Sprintf("%02d", i)

allChunks = append(allChunks, ch)
}

res := partitionChunks(allChunks, 0, 1000, mergeChunks)

// collect labels from each series
var seriesLabels []labels.Labels
for res.Next() {
seriesLabels = append(seriesLabels, res.At().Labels())
}

require.Len(t, seriesLabels, count)
require.True(t, sort.IsSorted(sortedByLabels(seriesLabels)))
}

type sortedByLabels []labels.Labels

func (b sortedByLabels) Len() int { return len(b) }
func (b sortedByLabels) Swap(i, j int) { b[i], b[j] = b[j], b[i] }
func (b sortedByLabels) Less(i, j int) bool { return labels.Compare(b[i], b[j]) < 0 }
12 changes: 6 additions & 6 deletions pkg/querier/distributor_queryable.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import (
"github.com/prometheus/prometheus/model/labels"
"github.com/prometheus/prometheus/scrape"
"github.com/prometheus/prometheus/storage"
"github.com/prometheus/prometheus/tsdb/chunkenc"
"github.com/prometheus/prometheus/util/annotations"

"github.com/cortexproject/cortex/pkg/cortexpb"
Expand Down Expand Up @@ -169,12 +170,11 @@ func (q *distributorQuerier) streamingSelect(ctx context.Context, sortSeries boo
return storage.ErrSeriesSet(err)
}

serieses = append(serieses, &chunkSeries{
labels: ls,
chunks: chunks,
chunkIteratorFunc: q.chunkIterFn,
mint: minT,
maxt: maxT,
serieses = append(serieses, &storage.SeriesEntry{
Lset: ls,
SampleIteratorFn: func(_ chunkenc.Iterator) chunkenc.Iterator {
return q.chunkIterFn(chunks, model.Time(minT), model.Time(maxT))
},
})
}

Expand Down
85 changes: 7 additions & 78 deletions pkg/querier/querier.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ import (
"github.com/prometheus/prometheus/model/labels"
"github.com/prometheus/prometheus/promql"
"github.com/prometheus/prometheus/storage"
"github.com/prometheus/prometheus/tsdb/chunkenc"
"github.com/prometheus/prometheus/util/annotations"
"github.com/thanos-io/promql-engine/engine"
"github.com/thanos-io/promql-engine/logicalplan"
Expand Down Expand Up @@ -433,10 +434,7 @@ func (q querier) Select(ctx context.Context, sortSeries bool, sp *storage.Select
}
}

// we have all the sets from different sources (chunk from store, chunks from ingesters,
// time series from store and time series from ingesters).
// mergeSeriesSets will return sorted set.
return q.mergeSeriesSets(result)
return storage.NewMergeSeriesSet(result, storage.ChainedSeriesMerge)
}

// LabelValues implements storage.Querier.
Expand Down Expand Up @@ -552,74 +550,6 @@ func (querier) Close() error {
return nil
}

func (q querier) mergeSeriesSets(sets []storage.SeriesSet) storage.SeriesSet {
// Here we deal with sets that are based on chunks and build single set from them.
// Remaining sets are merged with chunks-based one using storage.NewMergeSeriesSet

otherSets := []storage.SeriesSet(nil)
chunks := []chunk.Chunk(nil)

for _, set := range sets {
nonChunkSeries := []storage.Series(nil)

// SeriesSet may have some series backed up by chunks, and some not.
for set.Next() {
s := set.At()

if sc, ok := s.(SeriesWithChunks); ok {
Copy link
Contributor Author

@yeya24 yeya24 Mar 21, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we use SeriesWithChunks only in streamingSelect when querying Ingesters and the merge already happened there.

I am a little bit unsure why we need to get those chunks again and partition them again into a series set using partitionChunks. Isn't it already a series set from the input? We already only get 1 input series set with chunks since we only have 1 distributor querier.

chunks = append(chunks, sc.Chunks()...)
} else {
nonChunkSeries = append(nonChunkSeries, s)
}
}

if err := set.Err(); err != nil {
otherSets = append(otherSets, storage.ErrSeriesSet(err))
} else if len(nonChunkSeries) > 0 {
otherSets = append(otherSets, &sliceSeriesSet{series: nonChunkSeries, ix: -1})
}
}

if len(chunks) == 0 {
return storage.NewMergeSeriesSet(otherSets, storage.ChainedSeriesMerge)
}

// partitionChunks returns set with sorted series, so it can be used by NewMergeSeriesSet
chunksSet := partitionChunks(chunks, q.mint, q.maxt, q.chunkIterFn)

if len(otherSets) == 0 {
return chunksSet
}

otherSets = append(otherSets, chunksSet)
return storage.NewMergeSeriesSet(otherSets, storage.ChainedSeriesMerge)
}

type sliceSeriesSet struct {
series []storage.Series
ix int
}

func (s *sliceSeriesSet) Next() bool {
s.ix++
return s.ix < len(s.series)
}

func (s *sliceSeriesSet) At() storage.Series {
if s.ix < 0 || s.ix >= len(s.series) {
return nil
}
return s.series[s.ix]
}

func (s *sliceSeriesSet) Err() error {
return nil
}

func (s *sliceSeriesSet) Warnings() annotations.Annotations {
return nil
}

type storeQueryable struct {
QueryableWithFilter
QueryStoreAfter time.Duration
Expand Down Expand Up @@ -726,12 +656,11 @@ func partitionChunks(chunks []chunk.Chunk, mint, maxt int64, iteratorFunc chunkI

series := make([]storage.Series, 0, len(chunksBySeries))
for i := range chunksBySeries {
series = append(series, &chunkSeries{
labels: chunksBySeries[i][0].Metric,
chunks: chunksBySeries[i],
chunkIteratorFunc: iteratorFunc,
mint: mint,
maxt: maxt,
series = append(series, &storage.SeriesEntry{
Lset: chunksBySeries[i][0].Metric,
SampleIteratorFn: func(_ chunkenc.Iterator) chunkenc.Iterator {
return iteratorFunc(chunksBySeries[i], model.Time(mint), model.Time(maxt))
},
})
}

Expand Down
20 changes: 5 additions & 15 deletions pkg/querier/querier_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,6 @@ import (
"github.com/cortexproject/cortex/pkg/ingester/client"
"github.com/cortexproject/cortex/pkg/prom1/storage/metric"
"github.com/cortexproject/cortex/pkg/querier/batch"
"github.com/cortexproject/cortex/pkg/querier/iterators"
"github.com/cortexproject/cortex/pkg/tenant"
"github.com/cortexproject/cortex/pkg/util"
"github.com/cortexproject/cortex/pkg/util/chunkcompat"
Expand Down Expand Up @@ -80,15 +79,6 @@ type query struct {
}

var (
testcases = []struct {
name string
f chunkIteratorFunc
}{
{"matrixes", mergeChunks},
{"iterators", iterators.NewChunkMergeIterator},
{"batches", batch.NewChunkMergeIterator},
}

encodings = []struct {
name string
e promchunk.Encoding
Expand Down Expand Up @@ -512,7 +502,7 @@ func TestQuerier(t *testing.T) {
cfg.ActiveQueryTrackerDir = ""

chunkStore, through := makeMockChunkStore(t, chunks, encoding.e)
distributor := mockDistibutorFor(t, chunkStore, through)
distributor := mockDistibutorFor(t, chunkStore.chunks)

overrides, err := validation.NewOverrides(DefaultLimitsConfig(), nil)
require.NoError(t, err)
Expand All @@ -535,8 +525,8 @@ func TestQuerierMetric(t *testing.T) {
overrides, err := validation.NewOverrides(DefaultLimitsConfig(), nil)
require.NoError(t, err)

chunkStore, through := makeMockChunkStore(t, 24, promchunk.PrometheusXorChunk)
distributor := mockDistibutorFor(t, chunkStore, through)
chunkStore, _ := makeMockChunkStore(t, 24, promchunk.PrometheusXorChunk)
distributor := mockDistibutorFor(t, chunkStore.chunks)

queryables := []QueryableWithFilter{}
r := prometheus.NewRegistry()
Expand Down Expand Up @@ -1207,9 +1197,9 @@ func TestValidateMaxQueryLength(t *testing.T) {

// mockDistibutorFor duplicates the chunks in the mockChunkStore into the mockDistributor
// so we can test everything is dedupe correctly.
func mockDistibutorFor(t *testing.T, cs mockChunkStore, through model.Time) *MockDistributor {
func mockDistibutorFor(t *testing.T, cks []chunk.Chunk) *MockDistributor {
//parallel testing causes data race
chunks, err := chunkcompat.ToChunks(cs.chunks)
chunks, err := chunkcompat.ToChunks(cks)
require.NoError(t, err)

tsc := client.TimeSeriesChunk{
Expand Down
15 changes: 0 additions & 15 deletions pkg/querier/series_with_chunks.go

This file was deleted.

Loading