diff --git a/CHANGELOG.md b/CHANGELOG.md index 534d189ff8..3ea6d68094 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -22,6 +22,7 @@ We use *breaking :warning:* to mark changes that are not backward compatible (re - [#5391](https://github.com/thanos-io/thanos/pull/5391) Receive: Add relabeling support. - [#5408](https://github.com/thanos-io/thanos/pull/5391) Receive: Add support for consistent hashrings. - [#5391](https://github.com/thanos-io/thanos/pull/5391) Receive: Implement api/v1/status/tsdb. +- [#5424](https://github.com/thanos-io/thanos/pull/5424) Receive: export metrics regarding size of remote write requests ### Changed diff --git a/pkg/extprom/http/instrument_server.go b/pkg/extprom/http/instrument_server.go index 18dcf4ccb3..300ba5be12 100644 --- a/pkg/extprom/http/instrument_server.go +++ b/pkg/extprom/http/instrument_server.go @@ -11,7 +11,6 @@ import ( "github.com/opentracing/opentracing-go" "github.com/prometheus/client_golang/prometheus" - "github.com/prometheus/client_golang/prometheus/promauto" "github.com/prometheus/client_golang/prometheus/promhttp" "github.com/uber/jaeger-client-go" ) @@ -37,81 +36,43 @@ func NewNopInstrumentationMiddleware() InstrumentationMiddleware { } type defaultInstrumentationMiddleware struct { - requestDuration *prometheus.HistogramVec - requestSize *prometheus.SummaryVec - requestsTotal *prometheus.CounterVec - responseSize *prometheus.SummaryVec + metrics *defaultMetrics } // NewInstrumentationMiddleware provides default InstrumentationMiddleware. // Passing nil as buckets uses the default buckets. func NewInstrumentationMiddleware(reg prometheus.Registerer, buckets []float64) InstrumentationMiddleware { - if buckets == nil { - buckets = []float64{0.001, 0.01, 0.1, 0.3, 0.6, 1, 3, 6, 9, 20, 30, 60, 90, 120, 240, 360, 720} + return &defaultInstrumentationMiddleware{ + metrics: newDefaultMetrics(reg, buckets, []string{}), } - - ins := defaultInstrumentationMiddleware{ - requestDuration: promauto.With(reg).NewHistogramVec( - prometheus.HistogramOpts{ - Name: "http_request_duration_seconds", - Help: "Tracks the latencies for HTTP requests.", - Buckets: buckets, - }, - []string{"code", "handler", "method"}, - ), - - requestSize: promauto.With(reg).NewSummaryVec( - prometheus.SummaryOpts{ - Name: "http_request_size_bytes", - Help: "Tracks the size of HTTP requests.", - }, - []string{"code", "handler", "method"}, - ), - - requestsTotal: promauto.With(reg).NewCounterVec( - prometheus.CounterOpts{ - Name: "http_requests_total", - Help: "Tracks the number of HTTP requests.", - }, []string{"code", "handler", "method"}, - ), - - responseSize: promauto.With(reg).NewSummaryVec( - prometheus.SummaryOpts{ - Name: "http_response_size_bytes", - Help: "Tracks the size of HTTP responses.", - }, - []string{"code", "handler", "method"}, - ), - } - return &ins } // NewHandler wraps the given HTTP handler for instrumentation. It // registers four metric collectors (if not already done) and reports HTTP // metrics to the (newly or already) registered collectors: http_requests_total // (CounterVec), http_request_duration_seconds (Histogram), -// http_request_size_bytes (Summary), http_response_size_bytes (Summary). Each -// has a constant label named "handler" with the provided handlerName as -// value. http_requests_total is a metric vector partitioned by HTTP method -// (label name "method") and HTTP status code (label name "code"). +// http_request_size_bytes (Summary), http_response_size_bytes (Summary). +// Each has a constant label named "handler" with the provided handlerName as value. func (ins *defaultInstrumentationMiddleware) NewHandler(handlerName string, handler http.Handler) http.HandlerFunc { + baseLabels := prometheus.Labels{"handler": handlerName} + return httpInstrumentationHandler(baseLabels, ins.metrics, handler) +} + +func httpInstrumentationHandler(baseLabels prometheus.Labels, metrics *defaultMetrics, next http.Handler) http.HandlerFunc { return promhttp.InstrumentHandlerRequestSize( - ins.requestSize.MustCurryWith(prometheus.Labels{"handler": handlerName}), + metrics.requestSize.MustCurryWith(baseLabels), promhttp.InstrumentHandlerCounter( - ins.requestsTotal.MustCurryWith(prometheus.Labels{"handler": handlerName}), + metrics.requestsTotal.MustCurryWith(baseLabels), promhttp.InstrumentHandlerResponseSize( - ins.responseSize.MustCurryWith(prometheus.Labels{"handler": handlerName}), + metrics.responseSize.MustCurryWith(baseLabels), http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { now := time.Now() wd := &responseWriterDelegator{w: w} - handler.ServeHTTP(wd, r) + next.ServeHTTP(wd, r) - observer := ins.requestDuration.WithLabelValues( - wd.Status(), - handlerName, - strings.ToLower(r.Method), - ) + requestLabels := prometheus.Labels{"code": wd.Status(), "method": strings.ToLower(r.Method)} + observer := metrics.requestDuration.MustCurryWith(baseLabels).With(requestLabels) observer.Observe(time.Since(now).Seconds()) // If we find a tracingID we'll expose it as Exemplar. @@ -164,3 +125,12 @@ func (wd *responseWriterDelegator) StatusCode() int { func (wd *responseWriterDelegator) Status() string { return fmt.Sprintf("%d", wd.StatusCode()) } + +// NewInstrumentHandlerInflightTenant creates a middleware used to export the current amount of concurrent requests +// being handled. It has an optional tenant label whenever a tenant is present in the context. +func NewInstrumentHandlerInflightTenant(gaugeVec *prometheus.GaugeVec, tenantHeader string, next http.HandlerFunc) http.HandlerFunc { + return func(w http.ResponseWriter, r *http.Request) { + tenant := r.Header.Get(tenantHeader) + promhttp.InstrumentHandlerInFlight(gaugeVec.With(prometheus.Labels{"tenant": tenant}), next).ServeHTTP(w, r) + } +} diff --git a/pkg/extprom/http/instrument_tenant_server.go b/pkg/extprom/http/instrument_tenant_server.go new file mode 100644 index 0000000000..0705717858 --- /dev/null +++ b/pkg/extprom/http/instrument_tenant_server.go @@ -0,0 +1,41 @@ +// Copyright (c) The Thanos Authors. +// Licensed under the Apache License 2.0. + +package http + +import ( + "net/http" + + "github.com/prometheus/client_golang/prometheus" +) + +type tenantInstrumentationMiddleware struct { + metrics *defaultMetrics + tenantHeaderName string +} + +// NewTenantInstrumentationMiddleware provides the same instrumentation as defaultInstrumentationMiddleware, +// but with a tenant label fetched from the given tenantHeaderName header. +// Passing nil as buckets uses the default buckets. +func NewTenantInstrumentationMiddleware(tenantHeaderName string, reg prometheus.Registerer, buckets []float64) InstrumentationMiddleware { + return &tenantInstrumentationMiddleware{ + tenantHeaderName: tenantHeaderName, + metrics: newDefaultMetrics(reg, buckets, []string{"tenant"}), + } +} + +// NewHandler wraps the given HTTP handler for instrumentation. It +// registers four metric collectors (if not already done) and reports HTTP +// metrics to the (newly or already) registered collectors: http_requests_total +// (CounterVec), http_request_duration_seconds (Histogram), +// http_request_size_bytes (Summary), http_response_size_bytes (Summary). +// Each has a constant label named "handler" with the provided handlerName as value. +func (ins *tenantInstrumentationMiddleware) NewHandler(handlerName string, next http.Handler) http.HandlerFunc { + tenantWrapper := func(w http.ResponseWriter, r *http.Request) { + tenant := r.Header.Get(ins.tenantHeaderName) + baseLabels := prometheus.Labels{"handler": handlerName, "tenant": tenant} + handlerStack := httpInstrumentationHandler(baseLabels, ins.metrics, next) + handlerStack.ServeHTTP(w, r) + } + return tenantWrapper +} diff --git a/pkg/extprom/http/metrics.go b/pkg/extprom/http/metrics.go new file mode 100644 index 0000000000..24307fc087 --- /dev/null +++ b/pkg/extprom/http/metrics.go @@ -0,0 +1,54 @@ +// Copyright (c) The Thanos Authors. +// Licensed under the Apache License 2.0. + +package http + +import ( + "github.com/prometheus/client_golang/prometheus" + "github.com/prometheus/client_golang/prometheus/promauto" +) + +type defaultMetrics struct { + requestDuration *prometheus.HistogramVec + requestSize *prometheus.SummaryVec + requestsTotal *prometheus.CounterVec + responseSize *prometheus.SummaryVec +} + +func newDefaultMetrics(reg prometheus.Registerer, buckets []float64, extraLabels []string) *defaultMetrics { + if buckets == nil { + buckets = []float64{0.001, 0.01, 0.1, 0.3, 0.6, 1, 3, 6, 9, 20, 30, 60, 90, 120, 240, 360, 720} + } + + return &defaultMetrics{ + requestDuration: promauto.With(reg).NewHistogramVec( + prometheus.HistogramOpts{ + Name: "http_request_duration_seconds", + Help: "Tracks the latencies for HTTP requests.", + Buckets: buckets, + }, + append([]string{"code", "handler", "method"}, extraLabels...), + ), + requestSize: promauto.With(reg).NewSummaryVec( + prometheus.SummaryOpts{ + Name: "http_request_size_bytes", + Help: "Tracks the size of HTTP requests.", + }, + append([]string{"code", "handler", "method"}, extraLabels...), + ), + requestsTotal: promauto.With(reg).NewCounterVec( + prometheus.CounterOpts{ + Name: "http_requests_total", + Help: "Tracks the number of HTTP requests.", + }, + append([]string{"code", "handler", "method"}, extraLabels...), + ), + responseSize: promauto.With(reg).NewSummaryVec( + prometheus.SummaryOpts{ + Name: "http_response_size_bytes", + Help: "Tracks the size of HTTP responses.", + }, + append([]string{"code", "handler", "method"}, extraLabels...), + ), + } +} diff --git a/pkg/receive/handler.go b/pkg/receive/handler.go index 418f8a93f1..8627d158d6 100644 --- a/pkg/receive/handler.go +++ b/pkg/receive/handler.go @@ -116,6 +116,9 @@ type Handler struct { forwardRequests *prometheus.CounterVec replications *prometheus.CounterVec replicationFactor prometheus.Gauge + + writeSamplesTotal *prometheus.HistogramVec + writeTimeseriesTotal *prometheus.HistogramVec } func NewHandler(logger log.Logger, o *Options) *Handler { @@ -159,6 +162,24 @@ func NewHandler(logger log.Logger, o *Options) *Handler { Help: "The number of times to replicate incoming write requests.", }, ), + writeTimeseriesTotal: promauto.With(registerer).NewHistogramVec( + prometheus.HistogramOpts{ + Namespace: "thanos", + Subsystem: "receive", + Name: "write_timeseries", + Help: "The number of timeseries received in the incoming write requests.", + Buckets: []float64{10, 50, 100, 500, 1000, 5000, 10000}, + }, []string{"code", "tenant"}, + ), + writeSamplesTotal: promauto.With(registerer).NewHistogramVec( + prometheus.HistogramOpts{ + Namespace: "thanos", + Subsystem: "receive", + Name: "write_samples", + Help: "The number of sampled received in the incoming write requests.", + Buckets: []float64{10, 50, 100, 500, 1000, 5000, 10000}, + }, []string{"code", "tenant"}, + ), } h.forwardRequests.WithLabelValues(labelSuccess) @@ -174,7 +195,9 @@ func NewHandler(logger log.Logger, o *Options) *Handler { ins := extpromhttp.NewNopInstrumentationMiddleware() if o.Registry != nil { - ins = extpromhttp.NewInstrumentationMiddleware(o.Registry, + ins = extpromhttp.NewTenantInstrumentationMiddleware( + o.TenantHeader, + o.Registry, []float64{0.001, 0.005, 0.01, 0.02, 0.03, 0.04, 0.05, 0.06, 0.07, 0.08, 0.09, 0.1, 0.25, 0.5, 0.75, 1, 2, 3, 4, 5}, ) } @@ -182,13 +205,24 @@ func NewHandler(logger log.Logger, o *Options) *Handler { readyf := h.testReady instrf := func(name string, next func(w http.ResponseWriter, r *http.Request)) http.HandlerFunc { next = ins.NewHandler(name, http.HandlerFunc(next)) + if o.Tracer != nil { next = tracing.HTTPMiddleware(o.Tracer, name, logger, http.HandlerFunc(next)) } return next } - h.router.Post("/api/v1/receive", instrf("receive", readyf(middleware.RequestID(http.HandlerFunc(h.receiveHTTP))))) + h.router.Post( + "/api/v1/receive", + instrf( + "receive", + readyf( + middleware.RequestID( + http.HandlerFunc(h.receiveHTTP), + ), + ), + ), + ) statusAPI := statusapi.New(statusapi.Options{ GetStats: h.getStats, @@ -409,26 +443,30 @@ func (h *Handler) receiveHTTP(w http.ResponseWriter, r *http.Request) { return } - err = h.handleRequest(ctx, rep, tenant, &wreq) - if err != nil { + responseStatusCode := http.StatusOK + if err = h.handleRequest(ctx, rep, tenant, &wreq); err != nil { level.Debug(tLogger).Log("msg", "failed to handle request", "err", err) + switch determineWriteErrorCause(err, 1) { + case errNotReady: + responseStatusCode = http.StatusServiceUnavailable + case errUnavailable: + responseStatusCode = http.StatusServiceUnavailable + case errConflict: + responseStatusCode = http.StatusConflict + case errBadReplica: + responseStatusCode = http.StatusBadRequest + default: + level.Error(tLogger).Log("err", err, "msg", "internal server error") + responseStatusCode = http.StatusInternalServerError + } + http.Error(w, err.Error(), responseStatusCode) } - - switch determineWriteErrorCause(err, 1) { - case nil: - return - case errNotReady: - http.Error(w, err.Error(), http.StatusServiceUnavailable) - case errUnavailable: - http.Error(w, err.Error(), http.StatusServiceUnavailable) - case errConflict: - http.Error(w, err.Error(), http.StatusConflict) - case errBadReplica: - http.Error(w, err.Error(), http.StatusBadRequest) - default: - level.Error(tLogger).Log("err", err, "msg", "internal server error") - http.Error(w, err.Error(), http.StatusInternalServerError) + h.writeTimeseriesTotal.WithLabelValues(strconv.Itoa(responseStatusCode), tenant).Observe(float64(len(wreq.Timeseries))) + totalSamples := 0 + for _, timeseries := range wreq.Timeseries { + totalSamples += len(timeseries.Samples) } + h.writeSamplesTotal.WithLabelValues(strconv.Itoa(responseStatusCode), tenant).Observe(float64(totalSamples)) } // forward accepts a write request, batches its time series by