Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Consistent chunk metrics #2534

Merged
merged 1 commit into from
Aug 21, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
53 changes: 30 additions & 23 deletions pkg/storage/batch.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,15 +25,15 @@ import (
)

type ChunkMetrics struct {
refs prometheus.Counter
filteredRefs prometheus.Counter
chunks *prometheus.CounterVec
batches prometheus.Histogram
refs *prometheus.CounterVec
series *prometheus.CounterVec
chunks *prometheus.CounterVec
batches *prometheus.HistogramVec
}

const (
statusFiltered = "filtered"
statusMatched = "matched"
statusDiscarded = "discarded"
statusMatched = "matched"
)

func NewChunkMetrics(r prometheus.Registerer, maxBatchSize int) *ChunkMetrics {
Expand All @@ -43,33 +43,33 @@ func NewChunkMetrics(r prometheus.Registerer, maxBatchSize int) *ChunkMetrics {
}

return &ChunkMetrics{
refs: promauto.With(r).NewCounter(prometheus.CounterOpts{
refs: promauto.With(r).NewCounterVec(prometheus.CounterOpts{
Namespace: "loki",
Subsystem: "index",
Name: "chunk_refs_pre_filtering_total",
Help: "Number of chunks refs downloaded.",
}),
filteredRefs: promauto.With(r).NewCounter(prometheus.CounterOpts{
Name: "chunk_refs_total",
Help: "Number of chunks refs downloaded, partitioned by whether they intersect the query bounds.",
}, []string{"status"}),
series: promauto.With(r).NewCounterVec(prometheus.CounterOpts{
Namespace: "loki",
Subsystem: "index",
Name: "chunk_refs_post_filtering_total",
Help: "Number of chunks refs downloaded whose bounds intersect the query bounds.",
}),
Subsystem: "store",
Name: "series_total",
Help: "Number of series referenced by a query, partitioned by whether they satisfy matchers.",
}, []string{"status"}),
chunks: promauto.With(r).NewCounterVec(prometheus.CounterOpts{
Namespace: "loki",
Subsystem: "store",
Name: "chunks_downloaded_total",
Help: "Number of chunks downloaded, partitioned by if they satisfy matchers.",
}, []string{"status"}),
batches: promauto.With(r).NewHistogram(prometheus.HistogramOpts{
batches: promauto.With(r).NewHistogramVec(prometheus.HistogramOpts{
Namespace: "loki",
Subsystem: "store",
Name: "chunks_per_batch_post_filtering",
Help: "The post-matching chunk batch size.",
Name: "chunks_per_batch",
Help: "The chunk batch size, partitioned by if they satisfy matchers.",

// split buckets evenly across 0->maxBatchSize
Buckets: prometheus.LinearBuckets(0, float64(maxBatchSize/buckets), buckets),
}),
}, []string{"status"}),
}
}

Expand Down Expand Up @@ -581,7 +581,9 @@ func fetchChunkBySeries(ctx context.Context, metrics *ChunkMetrics, chunks []*La
return nil, err
}
metrics.chunks.WithLabelValues(statusMatched).Add(float64(len(allChunks)))
metrics.batches.Observe(float64(len(allChunks)))
metrics.series.WithLabelValues(statusMatched).Add(float64(len(chksBySeries)))
metrics.batches.WithLabelValues(statusMatched).Observe(float64(len(allChunks)))
metrics.batches.WithLabelValues(statusDiscarded).Observe(float64(len(chunks) - len(allChunks)))

return chksBySeries, nil
}
Expand All @@ -591,20 +593,25 @@ func filterSeriesByMatchers(
matchers []*labels.Matcher,
metrics *ChunkMetrics,
) map[model.Fingerprint][][]*LazyChunk {
var filtered int // Number of chunks downlaoded to check labels, but filtered out after.
var filteredSeries, filteredChks int
outer:
for fp, chunks := range chks {
for _, matcher := range matchers {
if !matcher.Matches(chunks[0][0].Chunk.Metric.Get(matcher.Name)) {

delete(chks, fp)
filtered++
filteredSeries++

for _, grp := range chunks {
filteredChks += len(grp)
}

continue outer
}
}
}
metrics.chunks.WithLabelValues(statusFiltered).Add(float64(filtered))
metrics.chunks.WithLabelValues(statusDiscarded).Add(float64(filteredChks))
metrics.series.WithLabelValues(statusDiscarded).Add(float64(filteredSeries))
return chks
}

Expand Down
9 changes: 5 additions & 4 deletions pkg/storage/store.go
Original file line number Diff line number Diff line change
Expand Up @@ -164,16 +164,17 @@ func (s *store) lazyChunks(ctx context.Context, matchers []*labels.Matcher, from
return nil, err
}

var prefilter int
var prefiltered int
var filtered int
for i := range chks {
prefilter += len(chks[i])
prefiltered += len(chks[i])
storeStats.TotalChunksRef += int64(len(chks[i]))
chks[i] = filterChunksByTime(from, through, chks[i])
filtered += len(chks[i])
}
s.chunkMetrics.refs.Add(float64(prefilter))
s.chunkMetrics.filteredRefs.Add(float64(filtered))

s.chunkMetrics.refs.WithLabelValues(statusDiscarded).Add(float64(prefiltered - filtered))
s.chunkMetrics.refs.WithLabelValues(statusMatched).Add(float64(filtered))

// creates lazychunks with chunks ref.
lazyChunks := make([]*LazyChunk, 0, filtered)
Expand Down