Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

thanos-query-frontend: add --query-frontend.query-stats-enabled #9

Merged
merged 1 commit into from
Oct 24, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 3 additions & 1 deletion cmd/thanos/query_frontend.go
Original file line number Diff line number Diff line change
Expand Up @@ -145,6 +145,8 @@ func registerQueryFrontend(app *extkingpin.App) {

cmd.Flag("query-frontend.log-queries-longer-than", "Log queries that are slower than the specified duration. "+
"Set to 0 to disable. Set to < 0 to enable on all queries.").Default("0").DurationVar(&cfg.CortexHandlerConfig.LogQueriesLongerThan)
cmd.Flag("query-frontend.query-stats-enabled", "True to enable query statistics tracking. "+
"When enabled, a message with some statistics is logged for every query.").Default("false").BoolVar(&cfg.CortexHandlerConfig.QueryStatsEnabled)

cmd.Flag("query-frontend.org-id-header", "Deprecation Warning - This flag will be soon deprecated in favor of query-frontend.tenant-header"+
" and both flags cannot be used at the same time. "+
Expand Down Expand Up @@ -311,7 +313,7 @@ func runQueryFrontend(
return err
}

roundTripper, err := cortexfrontend.NewDownstreamRoundTripper(cfg.DownstreamURL, downstreamTripper)
roundTripper, err := cortexfrontend.NewDownstreamRoundTripper(cfg.DownstreamURL, downstreamTripper, cfg.CortexHandlerConfig.QueryStatsEnabled)
if err != nil {
return errors.Wrap(err, "setup downstream roundtripper")
}
Expand Down
16 changes: 12 additions & 4 deletions internal/cortex/frontend/downstream_roundtripper.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,17 +13,18 @@ import (

// RoundTripper that forwards requests to downstream URL.
type downstreamRoundTripper struct {
downstreamURL *url.URL
transport http.RoundTripper
downstreamURL *url.URL
transport http.RoundTripper
queryStatsEnabled bool
}

func NewDownstreamRoundTripper(downstreamURL string, transport http.RoundTripper) (http.RoundTripper, error) {
func NewDownstreamRoundTripper(downstreamURL string, transport http.RoundTripper, queryStatsEnabled bool) (http.RoundTripper, error) {
u, err := url.Parse(downstreamURL)
if err != nil {
return nil, err
}

return &downstreamRoundTripper{downstreamURL: u, transport: transport}, nil
return &downstreamRoundTripper{downstreamURL: u, transport: transport, queryStatsEnabled: queryStatsEnabled}, nil
}

func (d downstreamRoundTripper) RoundTrip(r *http.Request) (*http.Response, error) {
Expand All @@ -36,6 +37,13 @@ func (d downstreamRoundTripper) RoundTrip(r *http.Request) (*http.Response, erro
}
}

if d.queryStatsEnabled {
// add &stats query param to get thanos-query to add query statistics to log
q := r.URL.Query()
q.Set("stats", "true")
r.URL.RawQuery = q.Encode()
}

r.URL.Scheme = d.downstreamURL.Scheme
r.URL.Host = d.downstreamURL.Host
r.URL.Path = path.Join(d.downstreamURL.Path, r.URL.Path)
Expand Down
142 changes: 72 additions & 70 deletions internal/cortex/frontend/transport/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,25 +6,24 @@ package transport
import (
"bytes"
"context"
"encoding/json"
"errors"
"fmt"
"github.com/prometheus/client_golang/prometheus/promauto"
"github.com/prometheus/prometheus/util/stats"
"io"
"net/http"
"net/url"
"strconv"
"strings"
"syscall"
"time"

"github.com/go-kit/log"
"github.com/go-kit/log/level"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/promauto"
"github.com/weaveworks/common/httpgrpc"
"github.com/weaveworks/common/httpgrpc/server"

querier_stats "github.com/thanos-io/thanos/internal/cortex/querier/stats"
"github.com/thanos-io/thanos/internal/cortex/tenant"
"github.com/thanos-io/thanos/internal/cortex/util"
util_log "github.com/thanos-io/thanos/internal/cortex/util/log"
)
Expand Down Expand Up @@ -56,10 +55,9 @@ type Handler struct {
roundTripper http.RoundTripper

// Metrics.
querySeconds *prometheus.CounterVec
querySeries *prometheus.CounterVec
queryBytes *prometheus.CounterVec
activeUsers *util.ActiveUsersCleanupService
querySeconds *prometheus.HistogramVec
querySamplesTotal *prometheus.HistogramVec
activeUsers *util.ActiveUsersCleanupService
}

// NewHandler creates a new frontend handler.
Expand All @@ -71,25 +69,21 @@ func NewHandler(cfg HandlerConfig, roundTripper http.RoundTripper, log log.Logge
}

if cfg.QueryStatsEnabled {
h.querySeconds = promauto.With(reg).NewCounterVec(prometheus.CounterOpts{
Name: "cortex_query_seconds_total",
Help: "Total amount of wall clock time spend processing queries.",
h.querySeconds = promauto.With(reg).NewHistogramVec(prometheus.HistogramOpts{
Name: "thanos_query_frontend_query_seconds",
Help: "Total amount of wall clock time spend processing queries.",
Buckets: []float64{0.01, 0.1, 0.5, 1, 2, 5, 10, 30, 60, 120, 360},
}, []string{"user"})

h.querySeries = promauto.With(reg).NewCounterVec(prometheus.CounterOpts{
Name: "cortex_query_fetched_series_total",
Help: "Number of series fetched to execute a query.",
}, []string{"user"})

h.queryBytes = promauto.With(reg).NewCounterVec(prometheus.CounterOpts{
Name: "cortex_query_fetched_chunks_bytes_total",
Help: "Size of all chunks fetched to execute a query in bytes.",
h.querySamplesTotal = promauto.With(reg).NewHistogramVec(prometheus.HistogramOpts{
Name: "thanos_query_frontend_query_total_fetched_samples",
Help: "Number of samples touched to execute a query.",
Buckets: []float64{1, 100, 1000, 10000, 100000, 1000000, 10000000, 100000000},
}, []string{"user"})

h.activeUsers = util.NewActiveUsersCleanupWithDefaultValues(func(user string) {
h.querySeconds.DeleteLabelValues(user)
h.querySeries.DeleteLabelValues(user)
h.queryBytes.DeleteLabelValues(user)
h.querySamplesTotal.DeleteLabelValues(user)
})
// If cleaner stops or fail, we will simply not clean the metrics for inactive users.
_ = h.activeUsers.StartAsync(context.Background())
Expand All @@ -98,25 +92,23 @@ func NewHandler(cfg HandlerConfig, roundTripper http.RoundTripper, log log.Logge
return h
}

type ResponseDataWithStats struct {
Stats *stats.BuiltinStats `json:"stats"`
}
type ResponseWithStats struct {
Data ResponseDataWithStats `json:"data"`
}

func (f *Handler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
var (
stats *querier_stats.Stats
queryString url.Values
)

// Initialise the stats in the context and make sure it's propagated
// down the request chain.
if f.cfg.QueryStatsEnabled {
var ctx context.Context
stats, ctx = querier_stats.ContextWithEmptyStats(r.Context())
r = r.WithContext(ctx)
}

defer func() {
_ = r.Body.Close()
}()

// Buffer the body for later use to track slow queries.
// Buffer the request body for later use to track slow queries.
var buf bytes.Buffer
r.Body = http.MaxBytesReader(w, r.Body, f.cfg.MaxBodySize)
r.Body = io.NopCloser(io.TeeReader(r.Body, &buf))
Expand All @@ -135,17 +127,37 @@ func (f *Handler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
hs[h] = vs
}

w.WriteHeader(resp.StatusCode)

var respBuf bytes.Buffer
if f.cfg.QueryStatsEnabled {
writeServiceTimingHeader(queryResponseTime, hs, stats)
// Buffer the response body for query stat tracking later
resp.Body = io.NopCloser(io.TeeReader(resp.Body, &respBuf))
}

w.WriteHeader(resp.StatusCode)
// log copy response body error so that we will know even though success response code returned
bytesCopied, err := io.Copy(w, resp.Body)
if err != nil && !errors.Is(err, syscall.EPIPE) {
level.Error(util_log.WithContext(r.Context(), f.log)).Log("msg", "write response body error", "bytesCopied", bytesCopied, "err", err)
}

if f.cfg.QueryStatsEnabled {
// Parse the stats field out of the response body
var statsResponse ResponseWithStats
if err := json.Unmarshal(respBuf.Bytes(), &statsResponse); err == nil {
if statsResponse.Data.Stats != nil {
queryString = f.parseRequestQueryString(r, buf)
f.reportQueryStats(r, queryString, queryResponseTime, statsResponse.Data.Stats)
} else {
// Don't fail the request if the stats are nil, just log a warning
level.Warn(util_log.WithContext(r.Context(), f.log)).Log("msg", "error parsing query stats", "err", errors.New("stats are nil"))
}
} else {
// Don't fail the request if the stats are nil, just log a warning
level.Warn(util_log.WithContext(r.Context(), f.log)).Log("msg", "error parsing query stats", "err", err)
}
}

// Check whether we should parse the query string.
shouldReportSlowQuery := f.cfg.LogQueriesLongerThan != 0 && queryResponseTime > f.cfg.LogQueriesLongerThan
if shouldReportSlowQuery || f.cfg.QueryStatsEnabled {
Expand All @@ -155,9 +167,6 @@ func (f *Handler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
if shouldReportSlowQuery {
f.reportSlowQuery(r, hs, queryString, queryResponseTime)
}
if f.cfg.QueryStatsEnabled {
f.reportQueryStats(r, queryString, queryResponseTime, stats)
}
}

// reportSlowQuery reports slow queries.
Expand Down Expand Up @@ -194,38 +203,45 @@ func (f *Handler) reportSlowQuery(r *http.Request, responseHeaders http.Header,
level.Info(util_log.WithContext(r.Context(), f.log)).Log(logMessage...)
}

func (f *Handler) reportQueryStats(r *http.Request, queryString url.Values, queryResponseTime time.Duration, stats *querier_stats.Stats) {
tenantIDs, err := tenant.TenantIDs(r.Context())
if err != nil {
return
}
userID := tenant.JoinTenantIDs(tenantIDs)
wallTime := stats.LoadWallTime()
numSeries := stats.LoadFetchedSeries()
numBytes := stats.LoadFetchedChunkBytes()
func (f *Handler) reportQueryStats(r *http.Request, queryString url.Values, queryResponseTime time.Duration, stats *stats.BuiltinStats) {
remoteUser, _, _ := r.BasicAuth()

// Track stats.
f.querySeconds.WithLabelValues(userID).Add(wallTime.Seconds())
f.querySeries.WithLabelValues(userID).Add(float64(numSeries))
f.queryBytes.WithLabelValues(userID).Add(float64(numBytes))
f.activeUsers.UpdateUserTimestamp(userID, time.Now())

// Log stats.
logMessage := append([]interface{}{
logMessage := []interface{}{
"msg", "query stats",
"component", "query-frontend",
"method", r.Method,
"path", r.URL.Path,
"remote_user", remoteUser,
"remote_addr", r.RemoteAddr,
"response_time", queryResponseTime,
"query_wall_time_seconds", wallTime.Seconds(),
"fetched_series_count", numSeries,
"fetched_chunks_bytes", numBytes,
}, formatQueryString(queryString)...)
"query_timings_preparation_time", stats.Timings.QueryPreparationTime,
"query_timings_eval_total_time", stats.Timings.EvalTotalTime,
"query_timings_exec_total_time", stats.Timings.ExecTotalTime,
"query_timings_exec_queue_time", stats.Timings.ExecQueueTime,
"query_timings_inner_eval_time", stats.Timings.InnerEvalTime,
"query_timings_result_sort_time", stats.Timings.ResultSortTime,
}
if stats.Samples != nil {
samples := stats.Samples

logMessage = append(logMessage, []interface{}{
"total_queryable_samples", samples.TotalQueryableSamples,
"peak_samples", samples.PeakSamples,
}...)
}

logMessage = append(logMessage, formatQueryString(queryString)...)

level.Info(util_log.WithContext(r.Context(), f.log)).Log(logMessage...)

// Record metrics.
if f.querySeconds != nil {
f.querySeconds.WithLabelValues(remoteUser).Observe(queryResponseTime.Seconds())
}
if f.querySamplesTotal != nil && stats.Samples != nil {
f.querySamplesTotal.WithLabelValues(remoteUser).Observe(float64(stats.Samples.TotalQueryableSamples))
}
}

func (f *Handler) parseRequestQueryString(r *http.Request, bodyBuf bytes.Buffer) url.Values {
Expand Down Expand Up @@ -262,17 +278,3 @@ func writeError(w http.ResponseWriter, err error) {
}
server.WriteError(w, err)
}

func writeServiceTimingHeader(queryResponseTime time.Duration, headers http.Header, stats *querier_stats.Stats) {
if stats != nil {
parts := make([]string, 0)
parts = append(parts, statsValue("querier_wall_time", stats.LoadWallTime()))
parts = append(parts, statsValue("response_time", queryResponseTime))
headers.Set(ServiceTimingHeaderName, strings.Join(parts, ", "))
}
}

func statsValue(name string, d time.Duration) string {
durationInMs := strconv.FormatFloat(float64(d)/float64(time.Millisecond), 'f', -1, 64)
return name + ";dur=" + durationInMs
}
Loading