Skip to content

Commit

Permalink
Query: Add tenant label to exported metrics (#6794)
Browse files Browse the repository at this point in the history
* Receive: Add default tenant to HTTP metrics

Previously, if the tenant header was empty/not supplied, the exported
metrics would have an empty string as tenant. With this commit we
instead use the default tenant as can be configured with:
`--receive.default-tenant-id`.

Signed-off-by: Jacob Baungard Hansen <jacobbaungard@redhat.com>

* Query: Add tenant label to exported metrics

With this commit we now add the tenant label to relevant metrics
exported by the query component.

This includes the HTTP metrics handled by the InstrumentationMiddleware
and the query latency metrics.

Signed-off-by: Jacob Baungard Hansen <jacobbaungard@redhat.com>

---------

Signed-off-by: Jacob Baungard Hansen <jacobbaungard@redhat.com>
  • Loading branch information
jacobbaungard committed Oct 21, 2023
1 parent e195df9 commit ea746be
Show file tree
Hide file tree
Showing 7 changed files with 138 additions and 12 deletions.
2 changes: 2 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -24,12 +24,14 @@ We use *breaking :warning:* to mark changes that are not backward compatible (re
- [#6530](https://github.com/thanos-io/thanos/pull/6530) / [#6690](https://github.com/thanos-io/thanos/pull/6690) Query: Add command line arguments for configuring tenants and forward tenant information to Store Gateway.
- [#6765](https://github.com/thanos-io/thanos/pull/6765) Index Cache: Add `enabled_items` to index cache config to selectively cache configured items. Available item types are `Postings`, `Series` and `ExpandedPostings`.
- [#6773](https://github.com/thanos-io/thanos/pull/6773) Index Cache: Add `ttl` to control the ttl to store items in remote index caches like memcached and redis.
- [#6794](https://github.com/thanos-io/thanos/pull/6794) Query: *breaking :warning:* Add tenant label to relevant exported metrics. Note that this change may cause some pre-existing custom dashboard queries to be incorrect due to the added label.

### Changed

- [#6698](https://github.com/thanos-io/thanos/pull/6608) Receive: Change write log level from warn to info.
- [#6753](https://github.com/thanos-io/thanos/pull/6753) mixin(Rule): *breaking :warning:* Fixed the mixin rules with duplicate names and updated the promtool version from v0.37.0 to v0.47.0
- [#6772](https://github.com/thanos-io/thanos/pull/6772) *: Bump prometheus to v0.47.2-0.20231006112807-a5a4eab679cc
- [#6794](https://github.com/thanos-io/thanos/pull/6794) Receive: the exported HTTP metrics now uses the specified default tenant for requests where no tenants are found.

### Removed

Expand Down
2 changes: 1 addition & 1 deletion cmd/thanos/query.go
Original file line number Diff line number Diff line change
Expand Up @@ -715,7 +715,7 @@ func runQuery(
// Configure Request Logging for HTTP calls.
logMiddleware := logging.NewHTTPServerMiddleware(logger, httpLogOpts...)

ins := extpromhttp.NewInstrumentationMiddleware(reg, nil)
ins := extpromhttp.NewTenantInstrumentationMiddleware(tenantHeader, defaultTenant, reg, nil)
// TODO(bplotka in PR #513 review): pass all flags, not only the flags needed by prefix rewriting.
ui.NewQueryUI(logger, endpoints, webExternalPrefix, webPrefixHeaderName, alertQueryURL).Register(router, ins)

Expand Down
4 changes: 2 additions & 2 deletions pkg/api/query/v1.go
Original file line number Diff line number Diff line change
Expand Up @@ -704,7 +704,7 @@ func (qapi *QueryAPI) query(r *http.Request) (interface{}, []error, *api.ApiErro
return nil, nil, &api.ApiError{Typ: api.ErrorExec, Err: res.Err}, qry.Close
}

aggregator := qapi.seriesStatsAggregatorFactory.NewAggregator()
aggregator := qapi.seriesStatsAggregatorFactory.NewAggregator(tenant)
for i := range seriesStats {
aggregator.Aggregate(seriesStats[i])
}
Expand Down Expand Up @@ -1007,7 +1007,7 @@ func (qapi *QueryAPI) queryRange(r *http.Request) (interface{}, []error, *api.Ap
}
return nil, nil, &api.ApiError{Typ: api.ErrorExec, Err: res.Err}, qry.Close
}
aggregator := qapi.seriesStatsAggregatorFactory.NewAggregator()
aggregator := qapi.seriesStatsAggregatorFactory.NewAggregator(tenant)
for i := range seriesStats {
aggregator.Aggregate(seriesStats[i])
}
Expand Down
7 changes: 6 additions & 1 deletion pkg/extprom/http/instrument_tenant_server.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,14 +12,16 @@ import (
type tenantInstrumentationMiddleware struct {
metrics *defaultMetrics
tenantHeaderName string
defaultTenant string
}

// NewTenantInstrumentationMiddleware provides the same instrumentation as defaultInstrumentationMiddleware,
// but with a tenant label fetched from the given tenantHeaderName header.
// Passing nil as buckets uses the default buckets.
func NewTenantInstrumentationMiddleware(tenantHeaderName string, reg prometheus.Registerer, buckets []float64) InstrumentationMiddleware {
func NewTenantInstrumentationMiddleware(tenantHeaderName string, defaultTenant string, reg prometheus.Registerer, buckets []float64) InstrumentationMiddleware {
return &tenantInstrumentationMiddleware{
tenantHeaderName: tenantHeaderName,
defaultTenant: defaultTenant,
metrics: newDefaultMetrics(reg, buckets, []string{"tenant"}),
}
}
Expand All @@ -33,6 +35,9 @@ func NewTenantInstrumentationMiddleware(tenantHeaderName string, reg prometheus.
func (ins *tenantInstrumentationMiddleware) NewHandler(handlerName string, next http.Handler) http.HandlerFunc {
tenantWrapper := func(w http.ResponseWriter, r *http.Request) {
tenant := r.Header.Get(ins.tenantHeaderName)
if tenant == "" {
tenant = ins.defaultTenant
}
baseLabels := prometheus.Labels{"handler": handlerName, "tenant": tenant}
handlerStack := httpInstrumentationHandler(baseLabels, ins.metrics, next)
handlerStack.ServeHTTP(w, r)
Expand Down
1 change: 1 addition & 0 deletions pkg/receive/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -198,6 +198,7 @@ func NewHandler(logger log.Logger, o *Options) *Handler {
if o.Registry != nil {
ins = extpromhttp.NewTenantInstrumentationMiddleware(
o.TenantHeader,
o.DefaultTenantID,
o.Registry,
[]float64{0.001, 0.005, 0.01, 0.02, 0.03, 0.04, 0.05, 0.06, 0.07, 0.08, 0.09, 0.1, 0.25, 0.5, 0.75, 1, 2, 3, 4, 5},
)
Expand Down
13 changes: 8 additions & 5 deletions pkg/store/telemetry.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ type seriesStatsAggregator struct {
queryDuration *prometheus.HistogramVec
seriesLeBuckets []float64
samplesLeBuckets []float64
tenant string

seriesStats storepb.SeriesStatsCounter
}
Expand All @@ -29,11 +30,12 @@ type seriesStatsAggregatorFactory struct {
samplesLeBuckets []float64
}

func (f *seriesStatsAggregatorFactory) NewAggregator() SeriesQueryPerformanceMetricsAggregator {
func (f *seriesStatsAggregatorFactory) NewAggregator(tenant string) SeriesQueryPerformanceMetricsAggregator {
return &seriesStatsAggregator{
queryDuration: f.queryDuration,
seriesLeBuckets: f.seriesLeBuckets,
samplesLeBuckets: f.samplesLeBuckets,
tenant: tenant,
seriesStats: storepb.SeriesStatsCounter{},
}
}
Expand All @@ -49,7 +51,7 @@ func NewSeriesStatsAggregatorFactory(
Name: "thanos_store_api_query_duration_seconds",
Help: "Duration of the Thanos Store API select phase for a query.",
Buckets: durationQuantiles,
}, []string{"series_le", "samples_le"}),
}, []string{"series_le", "samples_le", "tenant"}),
seriesLeBuckets: seriesQuantiles,
samplesLeBuckets: sampleQuantiles,
}
Expand All @@ -67,7 +69,7 @@ func NewSeriesStatsAggregator(
Name: "thanos_store_api_query_duration_seconds",
Help: "Duration of the Thanos Store API select phase for a query.",
Buckets: durationQuantiles,
}, []string{"series_le", "samples_le"}),
}, []string{"series_le", "samples_le", "tenant"}),
seriesLeBuckets: seriesQuantiles,
samplesLeBuckets: sampleQuantiles,
seriesStats: storepb.SeriesStatsCounter{},
Expand All @@ -92,6 +94,7 @@ func (s *seriesStatsAggregator) Observe(duration float64) {
s.queryDuration.With(prometheus.Labels{
"series_le": seriesLeBucket,
"samples_le": samplesLeBucket,
"tenant": s.tenant,
}).Observe(duration)
s.reset()
}
Expand All @@ -115,7 +118,7 @@ func findBucket(value float64, quantiles []float64) string {
}

type SeriesQueryPerformanceMetricsAggregatorFactory interface {
NewAggregator() SeriesQueryPerformanceMetricsAggregator
NewAggregator(tenant string) SeriesQueryPerformanceMetricsAggregator
}

type SeriesQueryPerformanceMetricsAggregator interface {
Expand All @@ -133,7 +136,7 @@ func (s *NoopSeriesStatsAggregator) Observe(_ float64) {}
// NoopSeriesStatsAggregatorFactory is a query performance series aggregator factory that does nothing.
type NoopSeriesStatsAggregatorFactory struct{}

func (s *NoopSeriesStatsAggregatorFactory) NewAggregator() SeriesQueryPerformanceMetricsAggregator {
func (s *NoopSeriesStatsAggregatorFactory) NewAggregator(tenant string) SeriesQueryPerformanceMetricsAggregator {
return &NoopSeriesStatsAggregator{}
}

Expand Down
121 changes: 118 additions & 3 deletions test/e2e/query_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ import (
"github.com/efficientgo/e2e"
e2edb "github.com/efficientgo/e2e/db"
e2emon "github.com/efficientgo/e2e/monitoring"
"github.com/efficientgo/e2e/monitoring/matchers"
"github.com/go-kit/log"
"github.com/gogo/protobuf/proto"
"github.com/golang/snappy"
Expand Down Expand Up @@ -799,13 +800,22 @@ func TestQueryStoreMetrics(t *testing.T) {
Deduplicate: true,
}, 10001)
testutil.Ok(t, err)

// query with a non-default tenant
instantQuery(t, ctx, querier.Endpoint("http"), func() string {
return "max_over_time(one_series{instance='foo_0'}[2h])"
}, time.Now, promclient.QueryOptions{
Deduplicate: true,
HTTPHeaders: map[string][]string{"thanos-tenant": {"test-tenant-1"}},
}, 1)
testutil.Ok(t, err)
}

mon, err := e2emon.Start(e)
testutil.Ok(t, err)

queryWaitAndAssert(t, ctx, mon.GetMonitoringRunnable().Endpoint(e2edb.AccessPortName), func() string {
return "thanos_store_api_query_duration_seconds_count{samples_le='100000',series_le='10000'}"
return "thanos_store_api_query_duration_seconds_count{samples_le='100000',series_le='10000',tenant='default-tenant'}"
}, time.Now, promclient.QueryOptions{
Deduplicate: true,
}, model.Vector{
Expand All @@ -816,13 +826,14 @@ func TestQueryStoreMetrics(t *testing.T) {
"job": "querier-1",
"samples_le": "100000",
"series_le": "10000",
"tenant": "default-tenant",
},
Value: model.SampleValue(1),
},
})

queryWaitAndAssert(t, ctx, mon.GetMonitoringRunnable().Endpoint(e2edb.AccessPortName), func() string {
return "thanos_store_api_query_duration_seconds_count{samples_le='100',series_le='10'}"
return "thanos_store_api_query_duration_seconds_count{samples_le='100',series_le='10',tenant='default-tenant'}"
}, time.Now, promclient.QueryOptions{
Deduplicate: true,
}, model.Vector{
Expand All @@ -833,13 +844,14 @@ func TestQueryStoreMetrics(t *testing.T) {
"job": "querier-1",
"samples_le": "100",
"series_le": "10",
"tenant": "default-tenant",
},
Value: model.SampleValue(1),
},
})

queryWaitAndAssert(t, ctx, mon.GetMonitoringRunnable().Endpoint(e2edb.AccessPortName), func() string {
return "thanos_store_api_query_duration_seconds_count{samples_le='+Inf',series_le='+Inf'}"
return "thanos_store_api_query_duration_seconds_count{samples_le='+Inf',series_le='+Inf',tenant='default-tenant'}"
}, time.Now, promclient.QueryOptions{
Deduplicate: true,
}, model.Vector{
Expand All @@ -850,6 +862,25 @@ func TestQueryStoreMetrics(t *testing.T) {
"job": "querier-1",
"samples_le": "+Inf",
"series_le": "+Inf",
"tenant": "default-tenant",
},
Value: model.SampleValue(1),
},
})

queryWaitAndAssert(t, ctx, mon.GetMonitoringRunnable().Endpoint(e2edb.AccessPortName), func() string {
return "thanos_store_api_query_duration_seconds_count{samples_le='100',series_le='10',tenant='test-tenant-1'}"
}, time.Now, promclient.QueryOptions{
Deduplicate: true,
}, model.Vector{
&model.Sample{
Metric: model.Metric{
"__name__": "thanos_store_api_query_duration_seconds_count",
"instance": "storemetrics01-querier-1:8080",
"job": "querier-1",
"samples_le": "100",
"series_le": "10",
"tenant": "test-tenant-1",
},
Value: model.SampleValue(1),
},
Expand Down Expand Up @@ -2304,3 +2335,87 @@ func TestSidecarPrefersExtLabels(t *testing.T) {
Timestamp: model.TimeFromUnixNano(now.Add(time.Hour).UnixNano()),
}}, retv)
}

func TestTenantHTTPMetrics(t *testing.T) {
t.Parallel()

e, err := e2e.NewDockerEnvironment("tenant-metrics")
testutil.Ok(t, err)
t.Cleanup(e2ethanos.CleanScenario(t, e))

// scrape the local prometheus, and our querier metrics
prom1, sidecar1 := e2ethanos.NewPrometheusWithSidecar(e, "alone", e2ethanos.DefaultPromConfig("prom-alone", 0, "", "", e2ethanos.LocalPrometheusTarget, "tenant-metrics-querier-1:8080"), "", e2ethanos.DefaultPrometheusImage(), "")

q := e2ethanos.NewQuerierBuilder(e, "1", sidecar1.InternalEndpoint("grpc")).Init()
testutil.Ok(t, e2e.StartAndWaitReady(q))

ctx, cancel := context.WithTimeout(context.Background(), 1*time.Minute)
t.Cleanup(cancel)

testutil.Ok(t, e2e.StartAndWaitReady(prom1, sidecar1))

// Query once with default-tenant to ensure everything is ready
// for the following requests
instantQuery(t, ctx, q.Endpoint("http"), func() string {
return "prometheus_api_remote_read_queries"
}, time.Now, promclient.QueryOptions{
Deduplicate: true,
}, 1)
testutil.Ok(t, err)

// Query a few times with tenant 1
instantQuery(t, ctx, q.Endpoint("http"), func() string {
return "prometheus_api_remote_read_queries"
}, time.Now, promclient.QueryOptions{
Deduplicate: true,
HTTPHeaders: map[string][]string{"thanos-tenant": {"test-tenant-1"}},
}, 1)
testutil.Ok(t, err)

instantQuery(t, ctx, q.Endpoint("http"), func() string {
return "go_goroutines"
}, time.Now, promclient.QueryOptions{
Deduplicate: true,
HTTPHeaders: map[string][]string{"thanos-tenant": {"test-tenant-1"}},
}, 2)
testutil.Ok(t, err)

instantQuery(t, ctx, q.Endpoint("http"), func() string {
return "go_memstats_frees_total"
}, time.Now, promclient.QueryOptions{
Deduplicate: true,
HTTPHeaders: map[string][]string{"thanos-tenant": {"test-tenant-1"}},
}, 2)
testutil.Ok(t, err)

// query just once with tenant-2
instantQuery(t, ctx, q.Endpoint("http"), func() string {
return "go_memstats_heap_alloc_bytes"
}, time.Now, promclient.QueryOptions{
Deduplicate: true,
HTTPHeaders: map[string][]string{"thanos-tenant": {"test-tenant-2"}},
}, 2)
testutil.Ok(t, err)

// tenant-1 made 3 requests
tenant1Matcher, err := matchers.NewMatcher(matchers.MatchEqual, "tenant", "test-tenant-1")
testutil.Ok(t, err)
testutil.Ok(t, q.WaitSumMetricsWithOptions(
e2emon.Equals(3),
[]string{"http_requests_total"}, e2emon.WithLabelMatchers(
tenant1Matcher,
),
e2emon.WaitMissingMetrics(),
))

// tenant 2 just made one request
tenant2Matcher, err := matchers.NewMatcher(matchers.MatchEqual, "tenant", "test-tenant-2")
testutil.Ok(t, err)
testutil.Ok(t, q.WaitSumMetricsWithOptions(
e2emon.Equals(1),
[]string{"http_requests_total"}, e2emon.WithLabelMatchers(
tenant2Matcher,
),
e2emon.WaitMissingMetrics(),
))
}

0 comments on commit ea746be

Please sign in to comment.