diff --git a/pkg/storage/batch.go b/pkg/storage/batch.go index 01643d98d3a61..c2e83063a0e5b 100644 --- a/pkg/storage/batch.go +++ b/pkg/storage/batch.go @@ -11,6 +11,8 @@ import ( "github.com/cortexproject/cortex/pkg/util/spanlogger" "github.com/go-kit/kit/log/level" "github.com/pkg/errors" + "github.com/prometheus/client_golang/prometheus" + "github.com/prometheus/client_golang/prometheus/promauto" "github.com/prometheus/common/model" "github.com/prometheus/prometheus/pkg/labels" "github.com/prometheus/prometheus/promql" @@ -22,6 +24,55 @@ import ( "github.com/grafana/loki/pkg/logql/stats" ) +type ChunkMetrics struct { + refs prometheus.Counter + filteredRefs prometheus.Counter + chunks *prometheus.CounterVec + batches prometheus.Histogram +} + +const ( + statusFiltered = "filtered" + statusMatched = "matched" +) + +func NewChunkMetrics(r prometheus.Registerer, maxBatchSize int) *ChunkMetrics { + buckets := 5 + if maxBatchSize < buckets { + maxBatchSize = buckets + } + + return &ChunkMetrics{ + refs: promauto.With(r).NewCounter(prometheus.CounterOpts{ + Namespace: "loki", + Subsystem: "index", + Name: "chunk_refs_pre_filtering_total", + Help: "Number of chunks refs downloaded.", + }), + filteredRefs: promauto.With(r).NewCounter(prometheus.CounterOpts{ + Namespace: "loki", + Subsystem: "index", + Name: "chunk_refs_post_filtering_total", + Help: "Number of chunks refs downloaded whose bounds intersect the query bounds.", + }), + chunks: promauto.With(r).NewCounterVec(prometheus.CounterOpts{ + Namespace: "loki", + Subsystem: "store", + Name: "chunks_downloaded_total", + Help: "Number of chunks downloaded, partitioned by if they satisfy matchers.", + }, []string{"status"}), + batches: promauto.With(r).NewHistogram(prometheus.HistogramOpts{ + Namespace: "loki", + Subsystem: "store", + Name: "chunks_per_batch_post_filtering", + Help: "The post-matching chunk batch size.", + + // split buckets evenly across 0->maxBatchSize + Buckets: prometheus.LinearBuckets(0, float64(maxBatchSize/buckets), buckets), + }), + } +} + type genericIterator interface { Next() bool Labels() string @@ -297,6 +348,7 @@ type logBatchIterator struct { *batchChunkIterator ctx context.Context + metrics *ChunkMetrics matchers []*labels.Matcher filter logql.LineFilter labels labelCache @@ -304,6 +356,7 @@ type logBatchIterator struct { func newLogBatchIterator( ctx context.Context, + metrics *ChunkMetrics, chunks []*LazyChunk, batchSize int, matchers []*labels.Matcher, @@ -319,6 +372,7 @@ func newLogBatchIterator( labels: map[model.Fingerprint]string{}, matchers: matchers, filter: filter, + metrics: metrics, ctx: ctx, } @@ -336,7 +390,7 @@ func (it *logBatchIterator) Entry() logproto.Entry { // newChunksIterator creates an iterator over a set of lazychunks. func (it *logBatchIterator) newChunksIterator(chunks []*LazyChunk, from, through time.Time, nextChunk *LazyChunk) (genericIterator, error) { - chksBySeries, err := fetchChunkBySeries(it.ctx, chunks, it.matchers) + chksBySeries, err := fetchChunkBySeries(it.ctx, it.metrics, chunks, it.matchers) if err != nil { return nil, err } @@ -396,6 +450,7 @@ type sampleBatchIterator struct { *batchChunkIterator ctx context.Context + metrics *ChunkMetrics matchers []*labels.Matcher filter logql.LineFilter extractor logql.SampleExtractor @@ -404,6 +459,7 @@ type sampleBatchIterator struct { func newSampleBatchIterator( ctx context.Context, + metrics *ChunkMetrics, chunks []*LazyChunk, batchSize int, matchers []*labels.Matcher, @@ -421,6 +477,7 @@ func newSampleBatchIterator( matchers: matchers, filter: filter, extractor: extractor, + metrics: metrics, ctx: ctx, } batch := newBatchChunkIterator(ctx, chunks, batchSize, logproto.FORWARD, start, end, samplebatch.newChunksIterator) @@ -438,7 +495,7 @@ func (it *sampleBatchIterator) Sample() logproto.Sample { // newChunksIterator creates an iterator over a set of lazychunks. func (it *sampleBatchIterator) newChunksIterator(chunks []*LazyChunk, from, through time.Time, nextChunk *LazyChunk) (genericIterator, error) { - chksBySeries, err := fetchChunkBySeries(it.ctx, chunks, it.matchers) + chksBySeries, err := fetchChunkBySeries(it.ctx, it.metrics, chunks, it.matchers) if err != nil { return nil, err } @@ -499,7 +556,7 @@ func removeMatchersByName(matchers []*labels.Matcher, names ...string) []*labels return matchers } -func fetchChunkBySeries(ctx context.Context, chunks []*LazyChunk, matchers []*labels.Matcher) (map[model.Fingerprint][][]*LazyChunk, error) { +func fetchChunkBySeries(ctx context.Context, metrics *ChunkMetrics, chunks []*LazyChunk, matchers []*labels.Matcher) (map[model.Fingerprint][][]*LazyChunk, error) { chksBySeries := partitionBySeriesChunks(chunks) // Make sure the initial chunks are loaded. This is not one chunk @@ -510,7 +567,7 @@ func fetchChunkBySeries(ctx context.Context, chunks []*LazyChunk, matchers []*la // Now that we have the first chunk for each series loaded, // we can proceed to filter the series that don't match. - chksBySeries = filterSeriesByMatchers(chksBySeries, matchers) + chksBySeries = filterSeriesByMatchers(chksBySeries, matchers, metrics) var allChunks []*LazyChunk for _, series := range chksBySeries { @@ -523,19 +580,31 @@ func fetchChunkBySeries(ctx context.Context, chunks []*LazyChunk, matchers []*la if err := fetchLazyChunks(ctx, allChunks); err != nil { return nil, err } + metrics.chunks.WithLabelValues(statusMatched).Add(float64(len(allChunks))) + metrics.batches.Observe(float64(len(allChunks))) + return chksBySeries, nil } -func filterSeriesByMatchers(chks map[model.Fingerprint][][]*LazyChunk, matchers []*labels.Matcher) map[model.Fingerprint][][]*LazyChunk { +func filterSeriesByMatchers( + chks map[model.Fingerprint][][]*LazyChunk, + matchers []*labels.Matcher, + metrics *ChunkMetrics, +) map[model.Fingerprint][][]*LazyChunk { + var filtered int // Number of chunks downlaoded to check labels, but filtered out after. outer: for fp, chunks := range chks { for _, matcher := range matchers { if !matcher.Matches(chunks[0][0].Chunk.Metric.Get(matcher.Name)) { + delete(chks, fp) + filtered++ + continue outer } } } + metrics.chunks.WithLabelValues(statusFiltered).Add(float64(filtered)) return chks } diff --git a/pkg/storage/batch_test.go b/pkg/storage/batch_test.go index 3acd430314629..6cf0e4b0c2186 100644 --- a/pkg/storage/batch_test.go +++ b/pkg/storage/batch_test.go @@ -22,6 +22,8 @@ import ( "github.com/grafana/loki/pkg/logql/stats" ) +var NilMetrics = NewChunkMetrics(nil, 0) + func Test_batchIterSafeStart(t *testing.T) { stream := logproto.Stream{ Labels: fooLabelsWithName, @@ -955,7 +957,7 @@ func Test_newLogBatchChunkIterator(t *testing.T) { for name, tt := range tests { tt := tt t.Run(name, func(t *testing.T) { - it, err := newLogBatchIterator(context.Background(), tt.chunks, tt.batchSize, newMatchers(tt.matchers), nil, tt.direction, tt.start, tt.end) + it, err := newLogBatchIterator(context.Background(), NilMetrics, tt.chunks, tt.batchSize, newMatchers(tt.matchers), nil, tt.direction, tt.start, tt.end) require.NoError(t, err) streams, _, err := iter.ReadBatch(it, 1000) _ = it.Close() @@ -1240,7 +1242,7 @@ func Test_newSampleBatchChunkIterator(t *testing.T) { for name, tt := range tests { tt := tt t.Run(name, func(t *testing.T) { - it, err := newSampleBatchIterator(context.Background(), tt.chunks, tt.batchSize, newMatchers(tt.matchers), nil, logql.ExtractCount, tt.start, tt.end) + it, err := newSampleBatchIterator(context.Background(), NilMetrics, tt.chunks, tt.batchSize, newMatchers(tt.matchers), nil, logql.ExtractCount, tt.start, tt.end) require.NoError(t, err) series, _, err := iter.ReadSampleBatch(it, 1000) _ = it.Close() diff --git a/pkg/storage/store.go b/pkg/storage/store.go index 7b5086ba3f1e0..9f65e306d9c24 100644 --- a/pkg/storage/store.go +++ b/pkg/storage/store.go @@ -76,7 +76,8 @@ type Store interface { type store struct { chunk.Store - cfg Config + cfg Config + chunkMetrics *ChunkMetrics } // NewStore creates a new Loki Store using configuration supplied. @@ -86,8 +87,9 @@ func NewStore(cfg Config, storeCfg chunk.StoreConfig, schemaCfg SchemaConfig, li return nil, err } return &store{ - Store: s, - cfg: cfg, + Store: s, + cfg: cfg, + chunkMetrics: NewChunkMetrics(registerer, cfg.MaxChunkBatchSize), }, nil } @@ -162,14 +164,19 @@ func (s *store) lazyChunks(ctx context.Context, matchers []*labels.Matcher, from return nil, err } - var totalChunks int + var prefilter int + var filtered int for i := range chks { + prefilter += len(chks[i]) storeStats.TotalChunksRef += int64(len(chks[i])) chks[i] = filterChunksByTime(from, through, chks[i]) - totalChunks += len(chks[i]) + filtered += len(chks[i]) } + s.chunkMetrics.refs.Add(float64(prefilter)) + s.chunkMetrics.filteredRefs.Add(float64(filtered)) + // creates lazychunks with chunks ref. - lazyChunks := make([]*LazyChunk, 0, totalChunks) + lazyChunks := make([]*LazyChunk, 0, filtered) for i := range chks { for _, c := range chks[i] { lazyChunks = append(lazyChunks, &LazyChunk{Chunk: c, Fetcher: fetchers[i]}) @@ -275,7 +282,7 @@ func (s *store) SelectLogs(ctx context.Context, req logql.SelectLogParams) (iter return iter.NoopIterator, nil } - return newLogBatchIterator(ctx, lazyChunks, s.cfg.MaxChunkBatchSize, matchers, filter, req.Direction, req.Start, req.End) + return newLogBatchIterator(ctx, s.chunkMetrics, lazyChunks, s.cfg.MaxChunkBatchSize, matchers, filter, req.Direction, req.Start, req.End) } @@ -303,7 +310,7 @@ func (s *store) SelectSamples(ctx context.Context, req logql.SelectSampleParams) if len(lazyChunks) == 0 { return iter.NoopIterator, nil } - return newSampleBatchIterator(ctx, lazyChunks, s.cfg.MaxChunkBatchSize, matchers, filter, extractor, req.Start, req.End) + return newSampleBatchIterator(ctx, s.chunkMetrics, lazyChunks, s.cfg.MaxChunkBatchSize, matchers, filter, extractor, req.Start, req.End) } func filterChunksByTime(from, through model.Time, chunks []chunk.Chunk) []chunk.Chunk { diff --git a/pkg/storage/store_test.go b/pkg/storage/store_test.go index 5530c7a4de6e1..264e50b040c78 100644 --- a/pkg/storage/store_test.go +++ b/pkg/storage/store_test.go @@ -382,6 +382,7 @@ func Test_store_SelectLogs(t *testing.T) { cfg: Config{ MaxChunkBatchSize: 10, }, + chunkMetrics: NilMetrics, } ctx = user.InjectOrgID(context.Background(), "test-user") @@ -591,6 +592,7 @@ func Test_store_SelectSample(t *testing.T) { cfg: Config{ MaxChunkBatchSize: 10, }, + chunkMetrics: NilMetrics, } ctx = user.InjectOrgID(context.Background(), "test-user") @@ -660,6 +662,7 @@ func Test_store_GetSeries(t *testing.T) { cfg: Config{ MaxChunkBatchSize: tt.batchSize, }, + chunkMetrics: NilMetrics, } ctx = user.InjectOrgID(context.Background(), "test-user") out, err := s.GetSeries(ctx, logql.SelectLogParams{QueryRequest: tt.req})