Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Query Frontend: Expose samples_processed in Server-Timing header #10103

Merged
merged 9 commits into from
Dec 19, 2024
Merged
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@

* [CHANGE] Distributor: OTLP and push handler replace all non-UTF8 characters with the unicode replacement character `\uFFFD` in error messages before propagating them. #10236
* [CHANGE] Querier: pass query matchers to queryable `IsApplicable` hook. #10256
* [ENHANCEMENT] Query Frontend: Return server-side `samples_processed` statistics. #10103
* [ENHANCEMENT] Distributor: OTLP receiver now converts also metric metadata. See also https://github.com/prometheus/prometheus/pull/15416. #10168
* [ENHANCEMENT] Distributor: discard float and histogram samples with duplicated timestamps from each timeseries in a request before the request is forwarded to ingesters. Discarded samples are tracked by the `cortex_discarded_samples_total` metrics with reason `sample_duplicate_timestamp`. #10145
* [ENHANCEMENT] Ruler: Add `cortex_prometheus_rule_group_last_rule_duration_sum_seconds` metric to track the total evaluation duration of a rule group regardless of concurrency #10189
Expand Down
2 changes: 1 addition & 1 deletion integration/query_frontend_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -383,7 +383,7 @@ func runQueryFrontendTest(t *testing.T, cfg queryFrontendTestConfig) {
if userID == 0 && cfg.queryStatsEnabled {
res, _, err := c.QueryRaw("{instance=~\"hello.*\"}")
require.NoError(t, err)
require.Regexp(t, "querier_wall_time;dur=[0-9.]*, response_time;dur=[0-9.]*, bytes_processed;val=[0-9.]*$", res.Header.Values("Server-Timing")[0])
require.Regexp(t, "querier_wall_time;dur=[0-9.]*, response_time;dur=[0-9.]*, bytes_processed;val=[0-9.]*, samples_processed;val=[0-9.]*$", res.Header.Values("Server-Timing")[0])
}

// Beyond the range of -querier.query-ingesters-within should return nothing. No need to repeat it for each user.
Expand Down
2 changes: 1 addition & 1 deletion pkg/api/handlers.go
Original file line number Diff line number Diff line change
Expand Up @@ -281,7 +281,7 @@ func NewQuerierHandler(
// This is used for the stats API which we should not support. Or find other ways to.
prometheus.GathererFunc(func() ([]*dto.MetricFamily, error) { return nil, nil }),
reg,
nil,
querier.StatsRenderer,
remoteWriteEnabled,
nil,
otlpEnabled,
Expand Down
2 changes: 2 additions & 0 deletions pkg/frontend/transport/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -332,6 +332,7 @@ func (f *Handler) reportQueryStats(
"estimated_series_count", stats.GetEstimatedSeriesCount(),
"queue_time_seconds", stats.LoadQueueTime().Seconds(),
"encode_time_seconds", stats.LoadEncodeTime().Seconds(),
"samples_processed", stats.LoadSamplesProcessed(),
}, formatQueryString(details, queryString)...)

if details != nil {
Expand Down Expand Up @@ -485,6 +486,7 @@ func writeServiceTimingHeader(queryResponseTime time.Duration, headers http.Head
parts = append(parts, statsValue("querier_wall_time", stats.LoadWallTime()))
parts = append(parts, statsValue("response_time", queryResponseTime))
parts = append(parts, statsValue("bytes_processed", stats.LoadFetchedChunkBytes()+stats.LoadFetchedIndexBytes()))
parts = append(parts, statsValue("samples_processed", stats.GetSamplesProcessed()))
headers.Set(ServiceTimingHeaderName, strings.Join(parts, ", "))
}
}
Expand Down
1 change: 1 addition & 0 deletions pkg/frontend/transport/handler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -304,6 +304,7 @@ func TestHandler_ServeHTTP(t *testing.T) {
expectedReadConsistency: "",
assertHeaders: func(t *testing.T, headers http.Header) {
assert.Contains(t, headers.Get(ServiceTimingHeaderName), "bytes_processed;val=0")
assert.Contains(t, headers.Get(ServiceTimingHeaderName), "samples_processed;val=0")
},
},
} {
Expand Down
4 changes: 4 additions & 0 deletions pkg/querier/remote_read_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,10 @@ type mockQuerier struct {
selectFn func(ctx context.Context, sorted bool, hints *storage.SelectHints, matchers ...*labels.Matcher) storage.SeriesSet
}

func (m mockQuerier) Close() error {
return nil
}

func (m mockQuerier) Select(ctx context.Context, sorted bool, hints *storage.SelectHints, matchers ...*labels.Matcher) storage.SeriesSet {
if m.selectFn != nil {
return m.selectFn(ctx, sorted, hints, matchers...)
Expand Down
17 changes: 17 additions & 0 deletions pkg/querier/stats/stats.go
tinitiuset marked this conversation as resolved.
Show resolved Hide resolved
Original file line number Diff line number Diff line change
Expand Up @@ -203,6 +203,22 @@ func (s *Stats) LoadEncodeTime() time.Duration {
return time.Duration(atomic.LoadInt64((*int64)(&s.EncodeTime)))
}

func (s *Stats) AddSamplesProcessed(c uint64) {
if s == nil {
return
}

atomic.AddUint64(&s.SamplesProcessed, c)
}

func (s *Stats) LoadSamplesProcessed() uint64 {
if s == nil {
return 0
}

return atomic.LoadUint64(&s.SamplesProcessed)
}

// Merge the provided Stats into this one.
func (s *Stats) Merge(other *Stats) {
if s == nil || other == nil {
Expand All @@ -219,6 +235,7 @@ func (s *Stats) Merge(other *Stats) {
s.AddEstimatedSeriesCount(other.LoadEstimatedSeriesCount())
s.AddQueueTime(other.LoadQueueTime())
s.AddEncodeTime(other.LoadEncodeTime())
s.AddSamplesProcessed(other.LoadSamplesProcessed())
}

// Copy returns a copy of the stats. Use this rather than regular struct assignment
Expand Down
98 changes: 71 additions & 27 deletions pkg/querier/stats/stats.pb.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

3 changes: 3 additions & 0 deletions pkg/querier/stats/stats.proto
Original file line number Diff line number Diff line change
Expand Up @@ -36,4 +36,7 @@ message Stats {
google.protobuf.Duration queue_time = 9 [(gogoproto.stdduration) = true, (gogoproto.nullable) = false];
// The time spent at the frontend encoding the query's final results. Does not include time spent serializing results at the querier.
google.protobuf.Duration encode_time = 10 [(gogoproto.stdduration) = true, (gogoproto.nullable) = false];
// TotalSamples represents the total number of samples scanned while evaluating a query.
// This value is taken from the Prometheus engine github.com/prometheus/prometheus/util/stats/query_stats.go
tinitiuset marked this conversation as resolved.
Show resolved Hide resolved
uint64 samples_processed = 11;
}
22 changes: 22 additions & 0 deletions pkg/querier/stats/stats_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -132,6 +132,23 @@ func TestStats_QueueTime(t *testing.T) {
})
}

func TestStats_SamplesProcessed(t *testing.T) {
t.Run("add and load samples processed", func(t *testing.T) {
stats, _ := ContextWithEmptyStats(context.Background())
stats.AddSamplesProcessed(10)
stats.AddSamplesProcessed(20)

assert.Equal(t, uint64(30), stats.LoadSamplesProcessed())
})

t.Run("add and load samples processed nil receiver", func(t *testing.T) {
var stats *Stats
stats.AddSamplesProcessed(10)

assert.Equal(t, uint64(0), stats.LoadSamplesProcessed())
})
}

func TestStats_Merge(t *testing.T) {
t.Run("merge two stats objects", func(t *testing.T) {
stats1 := &Stats{}
Expand All @@ -142,6 +159,7 @@ func TestStats_Merge(t *testing.T) {
stats1.AddShardedQueries(20)
stats1.AddSplitQueries(10)
stats1.AddQueueTime(5 * time.Second)
stats1.AddSamplesProcessed(10)

stats2 := &Stats{}
stats2.AddWallTime(time.Second)
Expand All @@ -151,6 +169,7 @@ func TestStats_Merge(t *testing.T) {
stats2.AddShardedQueries(21)
stats2.AddSplitQueries(11)
stats2.AddQueueTime(10 * time.Second)
stats2.AddSamplesProcessed(20)

stats1.Merge(stats2)

Expand All @@ -161,6 +180,7 @@ func TestStats_Merge(t *testing.T) {
assert.Equal(t, uint32(41), stats1.LoadShardedQueries())
assert.Equal(t, uint32(21), stats1.LoadSplitQueries())
assert.Equal(t, 15*time.Second, stats1.LoadQueueTime())
assert.Equal(t, uint64(30), stats1.LoadSamplesProcessed())
})

t.Run("merge two nil stats objects", func(t *testing.T) {
Expand All @@ -176,6 +196,7 @@ func TestStats_Merge(t *testing.T) {
assert.Equal(t, uint32(0), stats1.LoadShardedQueries())
assert.Equal(t, uint32(0), stats1.LoadSplitQueries())
assert.Equal(t, time.Duration(0), stats1.LoadQueueTime())
assert.Equal(t, uint64(0), stats1.LoadSamplesProcessed())
})
}

Expand All @@ -190,6 +211,7 @@ func TestStats_Copy(t *testing.T) {
FetchedIndexBytes: 7,
EstimatedSeriesCount: 8,
QueueTime: 9,
SamplesProcessed: 10,
}
s2 := s1.Copy()
assert.NotSame(t, s1, s2)
Expand Down
20 changes: 20 additions & 0 deletions pkg/querier/stats_renderer.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
// SPDX-License-Identifier: AGPL-3.0-only

package querier

import (
"context"

promql_stats "github.com/prometheus/prometheus/util/stats"
prom_api "github.com/prometheus/prometheus/web/api/v1"

"github.com/grafana/mimir/pkg/querier/stats"
)

func StatsRenderer(ctx context.Context, s *promql_stats.Statistics, param string) promql_stats.QueryStats {
tinitiuset marked this conversation as resolved.
Show resolved Hide resolved
mimirStats := stats.FromContext(ctx)
if mimirStats != nil && s != nil {
mimirStats.AddSamplesProcessed(uint64(s.Samples.TotalSamples))
}
return prom_api.DefaultStatsRenderer(ctx, s, param)
}
Loading
Loading