From 8363d15509f0cf9849cacf4bba794153abbd2487 Mon Sep 17 00:00:00 2001 From: Ben Ye Date: Mon, 24 Apr 2023 20:06:24 -0700 Subject: [PATCH 1/4] log final query response size in QFE Signed-off-by: Ben Ye enable response size message in processor Signed-off-by: Ben Ye --- pkg/cortex/modules.go | 1 + pkg/frontend/transport/handler.go | 19 +++++++++++++--- pkg/querier/worker/frontend_processor.go | 27 +++++++++++++++++++++++ pkg/querier/worker/scheduler_processor.go | 22 ++++++++++++++++-- pkg/querier/worker/worker.go | 2 ++ 5 files changed, 66 insertions(+), 5 deletions(-) diff --git a/pkg/cortex/modules.go b/pkg/cortex/modules.go index 9470b86042..aef9765148 100644 --- a/pkg/cortex/modules.go +++ b/pkg/cortex/modules.go @@ -346,6 +346,7 @@ func (t *Cortex) initQuerier() (serv services.Service, err error) { } t.Cfg.Worker.MaxConcurrentRequests = t.Cfg.Querier.MaxConcurrent + t.Cfg.Worker.TargetHeaders = t.Cfg.API.HTTPRequestHeadersToLog return querier_worker.NewQuerierWorker(t.Cfg.Worker, httpgrpc_server.NewServer(internalQuerierRouter), util_log.Logger, prometheus.DefaultRegisterer) } diff --git a/pkg/frontend/transport/handler.go b/pkg/frontend/transport/handler.go index 5259dd8574..4de12c66e2 100644 --- a/pkg/frontend/transport/handler.go +++ b/pkg/frontend/transport/handler.go @@ -66,6 +66,7 @@ type Handler struct { querySeries *prometheus.CounterVec queryChunkBytes *prometheus.CounterVec queryDataBytes *prometheus.CounterVec + responseBytes *prometheus.CounterVec activeUsers *util.ActiveUsersCleanupService } @@ -98,11 +99,17 @@ func NewHandler(cfg HandlerConfig, roundTripper http.RoundTripper, log log.Logge Help: "Size of all data fetched to execute a query in bytes.", }, []string{"user"}) + h.responseBytes = promauto.With(reg).NewCounterVec(prometheus.CounterOpts{ + Name: "cortex_query_response_size_bytes_total", + Help: "Size of response body size in bytes.", + }, []string{"user"}) + h.activeUsers = util.NewActiveUsersCleanupWithDefaultValues(func(user string) { h.querySeconds.DeleteLabelValues(user) h.querySeries.DeleteLabelValues(user) h.queryChunkBytes.DeleteLabelValues(user) h.queryDataBytes.DeleteLabelValues(user) + h.responseBytes.DeleteLabelValues(user) }) // If cleaner stops or fail, we will simply not clean the metrics for inactive users. _ = h.activeUsers.StartAsync(context.Background()) @@ -162,11 +169,15 @@ func (f *Handler) ServeHTTP(w http.ResponseWriter, r *http.Request) { f.reportSlowQuery(r, queryString, queryResponseTime) } if f.cfg.QueryStatsEnabled { - var statusCode int + var ( + statusCode int + contentLength int64 + ) if err != nil { statusCode = getStatusCodeFromError(err) } else if resp != nil { statusCode = resp.StatusCode + contentLength = resp.ContentLength // If the response status code is not 2xx, try to get the // error message from response body. if resp.StatusCode/100 != 2 { @@ -177,7 +188,7 @@ func (f *Handler) ServeHTTP(w http.ResponseWriter, r *http.Request) { } } - f.reportQueryStats(r, queryString, queryResponseTime, stats, err, statusCode) + f.reportQueryStats(r, queryString, queryResponseTime, stats, err, statusCode, contentLength) } if err != nil { @@ -232,7 +243,7 @@ func (f *Handler) reportSlowQuery(r *http.Request, queryString url.Values, query level.Info(util_log.WithContext(r.Context(), f.log)).Log(logMessage...) } -func (f *Handler) reportQueryStats(r *http.Request, queryString url.Values, queryResponseTime time.Duration, stats *querier_stats.QueryStats, error error, statusCode int) { +func (f *Handler) reportQueryStats(r *http.Request, queryString url.Values, queryResponseTime time.Duration, stats *querier_stats.QueryStats, error error, statusCode int, contentLength int64) { tenantIDs, err := tenant.TenantIDs(r.Context()) if err != nil { return @@ -250,6 +261,7 @@ func (f *Handler) reportQueryStats(r *http.Request, queryString url.Values, quer f.querySeries.WithLabelValues(userID).Add(float64(numSeries)) f.queryChunkBytes.WithLabelValues(userID).Add(float64(numChunkBytes)) f.queryDataBytes.WithLabelValues(userID).Add(float64(numDataBytes)) + f.responseBytes.WithLabelValues(userID).Add(float64(contentLength)) f.activeUsers.UpdateUserTimestamp(userID, time.Now()) // Log stats. @@ -266,6 +278,7 @@ func (f *Handler) reportQueryStats(r *http.Request, queryString url.Values, quer "fetched_chunks_bytes", numChunkBytes, "fetched_data_bytes", numDataBytes, "status_code", statusCode, + "response_size", contentLength, }, stats.LoadExtraFields()...) logMessage = append(logMessage, formatQueryString(queryString)...) diff --git a/pkg/querier/worker/frontend_processor.go b/pkg/querier/worker/frontend_processor.go index 6120817666..1f9dddb90e 100644 --- a/pkg/querier/worker/frontend_processor.go +++ b/pkg/querier/worker/frontend_processor.go @@ -3,7 +3,10 @@ package worker import ( "context" "fmt" + util_log "github.com/cortexproject/cortex/pkg/util/log" + "github.com/weaveworks/common/user" "net/http" + "net/textproto" "time" "github.com/go-kit/log" @@ -30,6 +33,7 @@ func newFrontendProcessor(cfg Config, handler RequestHandler, log log.Logger) pr handler: handler, maxMessageSize: cfg.GRPCClientConfig.MaxSendMsgSize, querierID: cfg.QuerierID, + targetHeaders: cfg.TargetHeaders, } } @@ -40,6 +44,8 @@ type frontendProcessor struct { querierID string log log.Logger + + targetHeaders []string } // notifyShutdown implements processor. @@ -120,6 +126,24 @@ func (fp *frontendProcessor) runRequest(ctx context.Context, request *httpgrpc.H stats, ctx = querier_stats.ContextWithEmptyStats(ctx) } + headers := make(map[string]string, 0) + for _, h := range request.Headers { + headers[h.Key] = h.Values[0] + } + headerMap := make(map[string]string, 0) + // Remove non-existent header. + for _, header := range fp.targetHeaders { + if v, ok := headers[textproto.CanonicalMIMEHeaderKey(header)]; ok { + headerMap[header] = v + } + } + orgID, ok := headers[textproto.CanonicalMIMEHeaderKey(user.OrgIDHeaderName)] + if ok { + ctx = user.InjectOrgID(ctx, orgID) + } + ctx = util_log.ContextWithHeaderMap(ctx, headerMap) + logger := util_log.WithContext(ctx, fp.log) + response, err := fp.handler.Handle(ctx, request) if err != nil { var ok bool @@ -131,6 +155,9 @@ func (fp *frontendProcessor) runRequest(ctx context.Context, request *httpgrpc.H } } } + if statsEnabled { + level.Info(logger).Log("msg", "finished request", "status_code", response.Code, "response_size", len(response.GetBody())) + } // Ensure responses that are too big are not retried. if len(response.Body) >= fp.maxMessageSize { diff --git a/pkg/querier/worker/scheduler_processor.go b/pkg/querier/worker/scheduler_processor.go index 379c0bd8ec..d4e549e1a9 100644 --- a/pkg/querier/worker/scheduler_processor.go +++ b/pkg/querier/worker/scheduler_processor.go @@ -4,6 +4,7 @@ import ( "context" "fmt" "net/http" + "net/textproto" "time" "github.com/go-kit/log" @@ -37,7 +38,7 @@ func newSchedulerProcessor(cfg Config, handler RequestHandler, log log.Logger, r maxMessageSize: cfg.GRPCClientConfig.MaxSendMsgSize, querierID: cfg.QuerierID, grpcConfig: cfg.GRPCClientConfig, - + targetHeaders: cfg.TargetHeaders, frontendClientRequestDuration: promauto.With(reg).NewHistogramVec(prometheus.HistogramOpts{ Name: "cortex_querier_query_frontend_request_duration_seconds", Help: "Time spend doing requests to frontend.", @@ -70,6 +71,8 @@ type schedulerProcessor struct { frontendPool *client.Pool frontendClientRequestDuration *prometheus.HistogramVec + + targetHeaders []string } // notifyShutdown implements processor. @@ -130,6 +133,19 @@ func (sp *schedulerProcessor) querierLoop(c schedulerpb.SchedulerForQuerier_Quer // We need to inject user into context for sending response back. ctx := user.InjectOrgID(ctx, request.UserID) + headers := make(map[string]string, 0) + for _, h := range request.HttpRequest.Headers { + headers[h.Key] = h.Values[0] + } + headerMap := make(map[string]string, 0) + // Remove non-existent header. + for _, header := range sp.targetHeaders { + if v, ok := headers[textproto.CanonicalMIMEHeaderKey(header)]; ok { + headerMap[header] = v + } + } + ctx = util_log.ContextWithHeaderMap(ctx, headerMap) + tracer := opentracing.GlobalTracer() // Ignore errors here. If we cannot get parent span, we just don't create new one. parentSpanContext, _ := httpgrpcutil.GetParentSpanForRequest(tracer, request.HttpRequest) @@ -140,7 +156,6 @@ func (sp *schedulerProcessor) querierLoop(c schedulerpb.SchedulerForQuerier_Quer ctx = spanCtx } logger := util_log.WithContext(ctx, sp.log) - sp.runRequest(ctx, logger, request.QueryID, request.FrontendAddress, request.StatsEnabled, request.HttpRequest) // Report back to scheduler that processing of the query has finished. @@ -168,6 +183,9 @@ func (sp *schedulerProcessor) runRequest(ctx context.Context, logger log.Logger, } } } + if statsEnabled { + level.Info(logger).Log("msg", "finished request", "status_code", response.Code, "response_size", len(response.GetBody())) + } // Ensure responses that are too big are not retried. if len(response.Body) >= sp.maxMessageSize { diff --git a/pkg/querier/worker/worker.go b/pkg/querier/worker/worker.go index 49c807bce9..b93ff746db 100644 --- a/pkg/querier/worker/worker.go +++ b/pkg/querier/worker/worker.go @@ -31,6 +31,8 @@ type Config struct { QuerierID string `yaml:"id"` GRPCClientConfig grpcclient.Config `yaml:"grpc_client_config"` + + TargetHeaders []string `yaml:"-"` // Propagated by config. } func (cfg *Config) RegisterFlags(f *flag.FlagSet) { From c5ec3d1f5209b7b14edfea8cce7473c4f011d069 Mon Sep 17 00:00:00 2001 From: Ben Ye Date: Mon, 24 Apr 2023 22:19:34 -0700 Subject: [PATCH 2/4] fix import Signed-off-by: Ben Ye --- pkg/querier/worker/frontend_processor.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/pkg/querier/worker/frontend_processor.go b/pkg/querier/worker/frontend_processor.go index 1f9dddb90e..75483cb651 100644 --- a/pkg/querier/worker/frontend_processor.go +++ b/pkg/querier/worker/frontend_processor.go @@ -3,8 +3,6 @@ package worker import ( "context" "fmt" - util_log "github.com/cortexproject/cortex/pkg/util/log" - "github.com/weaveworks/common/user" "net/http" "net/textproto" "time" @@ -12,12 +10,14 @@ import ( "github.com/go-kit/log" "github.com/go-kit/log/level" "github.com/weaveworks/common/httpgrpc" + "github.com/weaveworks/common/user" "google.golang.org/grpc" "github.com/cortexproject/cortex/pkg/frontend/v1/frontendv1pb" "github.com/cortexproject/cortex/pkg/querier/stats" querier_stats "github.com/cortexproject/cortex/pkg/querier/stats" "github.com/cortexproject/cortex/pkg/util/backoff" + util_log "github.com/cortexproject/cortex/pkg/util/log" ) var ( From 7338aa94285da18af49f84c25869f1536b95c7e9 Mon Sep 17 00:00:00 2001 From: Ben Ye Date: Tue, 25 Apr 2023 10:46:17 -0700 Subject: [PATCH 3/4] remove the response size counter metric Signed-off-by: Ben Ye --- pkg/frontend/transport/handler.go | 8 -------- 1 file changed, 8 deletions(-) diff --git a/pkg/frontend/transport/handler.go b/pkg/frontend/transport/handler.go index 4de12c66e2..644aae6e1b 100644 --- a/pkg/frontend/transport/handler.go +++ b/pkg/frontend/transport/handler.go @@ -66,7 +66,6 @@ type Handler struct { querySeries *prometheus.CounterVec queryChunkBytes *prometheus.CounterVec queryDataBytes *prometheus.CounterVec - responseBytes *prometheus.CounterVec activeUsers *util.ActiveUsersCleanupService } @@ -99,17 +98,11 @@ func NewHandler(cfg HandlerConfig, roundTripper http.RoundTripper, log log.Logge Help: "Size of all data fetched to execute a query in bytes.", }, []string{"user"}) - h.responseBytes = promauto.With(reg).NewCounterVec(prometheus.CounterOpts{ - Name: "cortex_query_response_size_bytes_total", - Help: "Size of response body size in bytes.", - }, []string{"user"}) - h.activeUsers = util.NewActiveUsersCleanupWithDefaultValues(func(user string) { h.querySeconds.DeleteLabelValues(user) h.querySeries.DeleteLabelValues(user) h.queryChunkBytes.DeleteLabelValues(user) h.queryDataBytes.DeleteLabelValues(user) - h.responseBytes.DeleteLabelValues(user) }) // If cleaner stops or fail, we will simply not clean the metrics for inactive users. _ = h.activeUsers.StartAsync(context.Background()) @@ -261,7 +254,6 @@ func (f *Handler) reportQueryStats(r *http.Request, queryString url.Values, quer f.querySeries.WithLabelValues(userID).Add(float64(numSeries)) f.queryChunkBytes.WithLabelValues(userID).Add(float64(numChunkBytes)) f.queryDataBytes.WithLabelValues(userID).Add(float64(numDataBytes)) - f.responseBytes.WithLabelValues(userID).Add(float64(contentLength)) f.activeUsers.UpdateUserTimestamp(userID, time.Now()) // Log stats. From 8b737569bb63c011a2ed94791f6b2ff7847c7f4a Mon Sep 17 00:00:00 2001 From: Ben Ye Date: Tue, 25 Apr 2023 10:52:31 -0700 Subject: [PATCH 4/4] update content encoding log Signed-off-by: Ben Ye --- pkg/frontend/transport/handler.go | 23 ++++++++++++++++------- 1 file changed, 16 insertions(+), 7 deletions(-) diff --git a/pkg/frontend/transport/handler.go b/pkg/frontend/transport/handler.go index 644aae6e1b..dfe096f86a 100644 --- a/pkg/frontend/transport/handler.go +++ b/pkg/frontend/transport/handler.go @@ -162,15 +162,11 @@ func (f *Handler) ServeHTTP(w http.ResponseWriter, r *http.Request) { f.reportSlowQuery(r, queryString, queryResponseTime) } if f.cfg.QueryStatsEnabled { - var ( - statusCode int - contentLength int64 - ) + var statusCode int if err != nil { statusCode = getStatusCodeFromError(err) } else if resp != nil { statusCode = resp.StatusCode - contentLength = resp.ContentLength // If the response status code is not 2xx, try to get the // error message from response body. if resp.StatusCode/100 != 2 { @@ -181,7 +177,7 @@ func (f *Handler) ServeHTTP(w http.ResponseWriter, r *http.Request) { } } - f.reportQueryStats(r, queryString, queryResponseTime, stats, err, statusCode, contentLength) + f.reportQueryStats(r, queryString, queryResponseTime, stats, err, statusCode, resp) } if err != nil { @@ -236,7 +232,7 @@ func (f *Handler) reportSlowQuery(r *http.Request, queryString url.Values, query level.Info(util_log.WithContext(r.Context(), f.log)).Log(logMessage...) } -func (f *Handler) reportQueryStats(r *http.Request, queryString url.Values, queryResponseTime time.Duration, stats *querier_stats.QueryStats, error error, statusCode int, contentLength int64) { +func (f *Handler) reportQueryStats(r *http.Request, queryString url.Values, queryResponseTime time.Duration, stats *querier_stats.QueryStats, error error, statusCode int, resp *http.Response) { tenantIDs, err := tenant.TenantIDs(r.Context()) if err != nil { return @@ -256,6 +252,15 @@ func (f *Handler) reportQueryStats(r *http.Request, queryString url.Values, quer f.queryDataBytes.WithLabelValues(userID).Add(float64(numDataBytes)) f.activeUsers.UpdateUserTimestamp(userID, time.Now()) + var ( + contentLength int64 + encoding string + ) + if resp != nil { + contentLength = resp.ContentLength + encoding = resp.Header.Get("Content-Encoding") + } + // Log stats. logMessage := append([]interface{}{ "msg", "query stats", @@ -279,6 +284,10 @@ func (f *Handler) reportQueryStats(r *http.Request, queryString url.Values, quer logMessage = append(logMessage, grafanaFields...) } + if len(encoding) > 0 { + logMessage = append(logMessage, "content_encoding", encoding) + } + if error != nil { s, ok := status.FromError(error) if !ok {