Skip to content

Commit

Permalink
Query: Add tenant label to exported metrics
Browse files Browse the repository at this point in the history
With this commit we now add the tenant label to relevant metrics
exported by the query component.

This includes the HTTP metrics handled by the InstrumentationMiddleware
and the query latency metrics.

Signed-off-by: Jacob Baungard Hansen <jacobbaungard@redhat.com>
  • Loading branch information
jacobbaungard committed Oct 12, 2023
1 parent cf29750 commit e3d727d
Show file tree
Hide file tree
Showing 4 changed files with 129 additions and 11 deletions.
2 changes: 1 addition & 1 deletion cmd/thanos/query.go
Original file line number Diff line number Diff line change
Expand Up @@ -715,7 +715,7 @@ func runQuery(
// Configure Request Logging for HTTP calls.
logMiddleware := logging.NewHTTPServerMiddleware(logger, httpLogOpts...)

ins := extpromhttp.NewInstrumentationMiddleware(reg, nil)
ins := extpromhttp.NewTenantInstrumentationMiddleware(tenantHeader, defaultTenant, reg, nil)
// TODO(bplotka in PR #513 review): pass all flags, not only the flags needed by prefix rewriting.
ui.NewQueryUI(logger, endpoints, webExternalPrefix, webPrefixHeaderName, alertQueryURL).Register(router, ins)

Expand Down
4 changes: 2 additions & 2 deletions pkg/api/query/v1.go
Original file line number Diff line number Diff line change
Expand Up @@ -704,7 +704,7 @@ func (qapi *QueryAPI) query(r *http.Request) (interface{}, []error, *api.ApiErro
return nil, nil, &api.ApiError{Typ: api.ErrorExec, Err: res.Err}, qry.Close
}

aggregator := qapi.seriesStatsAggregatorFactory.NewAggregator()
aggregator := qapi.seriesStatsAggregatorFactory.NewAggregator(tenant)
for i := range seriesStats {
aggregator.Aggregate(seriesStats[i])
}
Expand Down Expand Up @@ -1007,7 +1007,7 @@ func (qapi *QueryAPI) queryRange(r *http.Request) (interface{}, []error, *api.Ap
}
return nil, nil, &api.ApiError{Typ: api.ErrorExec, Err: res.Err}, qry.Close
}
aggregator := qapi.seriesStatsAggregatorFactory.NewAggregator()
aggregator := qapi.seriesStatsAggregatorFactory.NewAggregator(tenant)
for i := range seriesStats {
aggregator.Aggregate(seriesStats[i])
}
Expand Down
13 changes: 8 additions & 5 deletions pkg/store/telemetry.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ type seriesStatsAggregator struct {
queryDuration *prometheus.HistogramVec
seriesLeBuckets []float64
samplesLeBuckets []float64
tenant string

seriesStats storepb.SeriesStatsCounter
}
Expand All @@ -29,11 +30,12 @@ type seriesStatsAggregatorFactory struct {
samplesLeBuckets []float64
}

func (f *seriesStatsAggregatorFactory) NewAggregator() SeriesQueryPerformanceMetricsAggregator {
func (f *seriesStatsAggregatorFactory) NewAggregator(tenant string) SeriesQueryPerformanceMetricsAggregator {
return &seriesStatsAggregator{
queryDuration: f.queryDuration,
seriesLeBuckets: f.seriesLeBuckets,
samplesLeBuckets: f.samplesLeBuckets,
tenant: tenant,
seriesStats: storepb.SeriesStatsCounter{},
}
}
Expand All @@ -49,7 +51,7 @@ func NewSeriesStatsAggregatorFactory(
Name: "thanos_store_api_query_duration_seconds",
Help: "Duration of the Thanos Store API select phase for a query.",
Buckets: durationQuantiles,
}, []string{"series_le", "samples_le"}),
}, []string{"series_le", "samples_le", "tenant"}),
seriesLeBuckets: seriesQuantiles,
samplesLeBuckets: sampleQuantiles,
}
Expand All @@ -67,7 +69,7 @@ func NewSeriesStatsAggregator(
Name: "thanos_store_api_query_duration_seconds",
Help: "Duration of the Thanos Store API select phase for a query.",
Buckets: durationQuantiles,
}, []string{"series_le", "samples_le"}),
}, []string{"series_le", "samples_le", "tenant"}),
seriesLeBuckets: seriesQuantiles,
samplesLeBuckets: sampleQuantiles,
seriesStats: storepb.SeriesStatsCounter{},
Expand All @@ -92,6 +94,7 @@ func (s *seriesStatsAggregator) Observe(duration float64) {
s.queryDuration.With(prometheus.Labels{
"series_le": seriesLeBucket,
"samples_le": samplesLeBucket,
"tenant": s.tenant,
}).Observe(duration)
s.reset()
}
Expand All @@ -115,7 +118,7 @@ func findBucket(value float64, quantiles []float64) string {
}

type SeriesQueryPerformanceMetricsAggregatorFactory interface {
NewAggregator() SeriesQueryPerformanceMetricsAggregator
NewAggregator(tenant string) SeriesQueryPerformanceMetricsAggregator
}

type SeriesQueryPerformanceMetricsAggregator interface {
Expand All @@ -133,7 +136,7 @@ func (s *NoopSeriesStatsAggregator) Observe(_ float64) {}
// NoopSeriesStatsAggregatorFactory is a query performance series aggregator factory that does nothing.
type NoopSeriesStatsAggregatorFactory struct{}

func (s *NoopSeriesStatsAggregatorFactory) NewAggregator() SeriesQueryPerformanceMetricsAggregator {
func (s *NoopSeriesStatsAggregatorFactory) NewAggregator(tenant string) SeriesQueryPerformanceMetricsAggregator {
return &NoopSeriesStatsAggregator{}
}

Expand Down
121 changes: 118 additions & 3 deletions test/e2e/query_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ import (
"github.com/efficientgo/e2e"
e2edb "github.com/efficientgo/e2e/db"
e2emon "github.com/efficientgo/e2e/monitoring"
"github.com/efficientgo/e2e/monitoring/matchers"
"github.com/go-kit/log"
"github.com/gogo/protobuf/proto"
"github.com/golang/snappy"
Expand Down Expand Up @@ -799,13 +800,22 @@ func TestQueryStoreMetrics(t *testing.T) {
Deduplicate: true,
}, 10001)
testutil.Ok(t, err)

// query with a non-default tenant
instantQuery(t, ctx, querier.Endpoint("http"), func() string {
return "max_over_time(one_series{instance='foo_0'}[2h])"
}, time.Now, promclient.QueryOptions{
Deduplicate: true,
HTTPHeaders: map[string][]string{"thanos-tenant": {"test-tenant-1"}},
}, 1)
testutil.Ok(t, err)
}

mon, err := e2emon.Start(e)
testutil.Ok(t, err)

queryWaitAndAssert(t, ctx, mon.GetMonitoringRunnable().Endpoint(e2edb.AccessPortName), func() string {
return "thanos_store_api_query_duration_seconds_count{samples_le='100000',series_le='10000'}"
return "thanos_store_api_query_duration_seconds_count{samples_le='100000',series_le='10000',tenant='default-tenant'}"
}, time.Now, promclient.QueryOptions{
Deduplicate: true,
}, model.Vector{
Expand All @@ -816,13 +826,14 @@ func TestQueryStoreMetrics(t *testing.T) {
"job": "querier-1",
"samples_le": "100000",
"series_le": "10000",
"tenant": "default-tenant",
},
Value: model.SampleValue(1),
},
})

queryWaitAndAssert(t, ctx, mon.GetMonitoringRunnable().Endpoint(e2edb.AccessPortName), func() string {
return "thanos_store_api_query_duration_seconds_count{samples_le='100',series_le='10'}"
return "thanos_store_api_query_duration_seconds_count{samples_le='100',series_le='10',tenant='default-tenant'}"
}, time.Now, promclient.QueryOptions{
Deduplicate: true,
}, model.Vector{
Expand All @@ -833,13 +844,14 @@ func TestQueryStoreMetrics(t *testing.T) {
"job": "querier-1",
"samples_le": "100",
"series_le": "10",
"tenant": "default-tenant",
},
Value: model.SampleValue(1),
},
})

queryWaitAndAssert(t, ctx, mon.GetMonitoringRunnable().Endpoint(e2edb.AccessPortName), func() string {
return "thanos_store_api_query_duration_seconds_count{samples_le='+Inf',series_le='+Inf'}"
return "thanos_store_api_query_duration_seconds_count{samples_le='+Inf',series_le='+Inf',tenant='default-tenant'}"
}, time.Now, promclient.QueryOptions{
Deduplicate: true,
}, model.Vector{
Expand All @@ -850,6 +862,25 @@ func TestQueryStoreMetrics(t *testing.T) {
"job": "querier-1",
"samples_le": "+Inf",
"series_le": "+Inf",
"tenant": "default-tenant",
},
Value: model.SampleValue(1),
},
})

queryWaitAndAssert(t, ctx, mon.GetMonitoringRunnable().Endpoint(e2edb.AccessPortName), func() string {
return "thanos_store_api_query_duration_seconds_count{samples_le='100',series_le='10',tenant='test-tenant-1'}"
}, time.Now, promclient.QueryOptions{
Deduplicate: true,
}, model.Vector{
&model.Sample{
Metric: model.Metric{
"__name__": "thanos_store_api_query_duration_seconds_count",
"instance": "storemetrics01-querier-1:8080",
"job": "querier-1",
"samples_le": "100",
"series_le": "10",
"tenant": "test-tenant-2",
},
Value: model.SampleValue(1),
},
Expand Down Expand Up @@ -2304,3 +2335,87 @@ func TestSidecarPrefersExtLabels(t *testing.T) {
Timestamp: model.TimeFromUnixNano(now.Add(time.Hour).UnixNano()),
}}, retv)
}

func TestTenantHTTPMetrics(t *testing.T) {
t.Parallel()

e, err := e2e.NewDockerEnvironment("tenant-metrics")
testutil.Ok(t, err)
t.Cleanup(e2ethanos.CleanScenario(t, e))

// scrape the local prometheus, and our querier metrics
prom1, sidecar1 := e2ethanos.NewPrometheusWithSidecar(e, "alone", e2ethanos.DefaultPromConfig("prom-alone", 0, "", "", e2ethanos.LocalPrometheusTarget, "tenant-metrics-querier-1:8080"), "", e2ethanos.DefaultPrometheusImage(), "")

q := e2ethanos.NewQuerierBuilder(e, "1", sidecar1.InternalEndpoint("grpc")).Init()
testutil.Ok(t, e2e.StartAndWaitReady(q))

ctx, cancel := context.WithTimeout(context.Background(), 1*time.Minute)
t.Cleanup(cancel)

testutil.Ok(t, e2e.StartAndWaitReady(prom1, sidecar1))

// Query once with default-tenant to ensure everything is ready
// for the following requests
instantQuery(t, ctx, q.Endpoint("http"), func() string {
return "prometheus_api_remote_read_queries"
}, time.Now, promclient.QueryOptions{
Deduplicate: true,
}, 1)
testutil.Ok(t, err)

// Query a few times with tenant 1
instantQuery(t, ctx, q.Endpoint("http"), func() string {
return "prometheus_api_remote_read_queries"
}, time.Now, promclient.QueryOptions{
Deduplicate: true,
HTTPHeaders: map[string][]string{"thanos-tenant": {"test-tenant-1"}},
}, 1)
testutil.Ok(t, err)

instantQuery(t, ctx, q.Endpoint("http"), func() string {
return "go_goroutines"
}, time.Now, promclient.QueryOptions{
Deduplicate: true,
HTTPHeaders: map[string][]string{"thanos-tenant": {"test-tenant-1"}},
}, 2)
testutil.Ok(t, err)

instantQuery(t, ctx, q.Endpoint("http"), func() string {
return "go_memstats_frees_total"
}, time.Now, promclient.QueryOptions{
Deduplicate: true,
HTTPHeaders: map[string][]string{"thanos-tenant": {"test-tenant-1"}},
}, 2)
testutil.Ok(t, err)

// query just once with tenant-2
instantQuery(t, ctx, q.Endpoint("http"), func() string {
return "go_memstats_heap_alloc_bytes"
}, time.Now, promclient.QueryOptions{
Deduplicate: true,
HTTPHeaders: map[string][]string{"thanos-tenant": {"test-tenant-2"}},
}, 2)
testutil.Ok(t, err)

// tenant-1 made 3 requests
tenant1Matcher, err := matchers.NewMatcher(matchers.MatchEqual, "tenant", "test-tenant-1")
testutil.Ok(t, err)
testutil.Ok(t, q.WaitSumMetricsWithOptions(
e2emon.Equals(3),
[]string{"http_requests_total"}, e2emon.WithLabelMatchers(
tenant1Matcher,
),
e2emon.WaitMissingMetrics(),
))

// tenant 2 just made one request
tenant2Matcher, err := matchers.NewMatcher(matchers.MatchEqual, "tenant", "test-tenant-2")
testutil.Ok(t, err)
testutil.Ok(t, q.WaitSumMetricsWithOptions(
e2emon.Equals(1),
[]string{"http_requests_total"}, e2emon.WithLabelMatchers(
tenant2Matcher,
),
e2emon.WaitMissingMetrics(),
))
}

0 comments on commit e3d727d

Please sign in to comment.