diff --git a/pkg/cortex/modules.go b/pkg/cortex/modules.go index 9470b86042..aef9765148 100644 --- a/pkg/cortex/modules.go +++ b/pkg/cortex/modules.go @@ -346,6 +346,7 @@ func (t *Cortex) initQuerier() (serv services.Service, err error) { } t.Cfg.Worker.MaxConcurrentRequests = t.Cfg.Querier.MaxConcurrent + t.Cfg.Worker.TargetHeaders = t.Cfg.API.HTTPRequestHeadersToLog return querier_worker.NewQuerierWorker(t.Cfg.Worker, httpgrpc_server.NewServer(internalQuerierRouter), util_log.Logger, prometheus.DefaultRegisterer) } diff --git a/pkg/frontend/transport/handler.go b/pkg/frontend/transport/handler.go index 5259dd8574..dfe096f86a 100644 --- a/pkg/frontend/transport/handler.go +++ b/pkg/frontend/transport/handler.go @@ -177,7 +177,7 @@ func (f *Handler) ServeHTTP(w http.ResponseWriter, r *http.Request) { } } - f.reportQueryStats(r, queryString, queryResponseTime, stats, err, statusCode) + f.reportQueryStats(r, queryString, queryResponseTime, stats, err, statusCode, resp) } if err != nil { @@ -232,7 +232,7 @@ func (f *Handler) reportSlowQuery(r *http.Request, queryString url.Values, query level.Info(util_log.WithContext(r.Context(), f.log)).Log(logMessage...) } -func (f *Handler) reportQueryStats(r *http.Request, queryString url.Values, queryResponseTime time.Duration, stats *querier_stats.QueryStats, error error, statusCode int) { +func (f *Handler) reportQueryStats(r *http.Request, queryString url.Values, queryResponseTime time.Duration, stats *querier_stats.QueryStats, error error, statusCode int, resp *http.Response) { tenantIDs, err := tenant.TenantIDs(r.Context()) if err != nil { return @@ -252,6 +252,15 @@ func (f *Handler) reportQueryStats(r *http.Request, queryString url.Values, quer f.queryDataBytes.WithLabelValues(userID).Add(float64(numDataBytes)) f.activeUsers.UpdateUserTimestamp(userID, time.Now()) + var ( + contentLength int64 + encoding string + ) + if resp != nil { + contentLength = resp.ContentLength + encoding = resp.Header.Get("Content-Encoding") + } + // Log stats. logMessage := append([]interface{}{ "msg", "query stats", @@ -266,6 +275,7 @@ func (f *Handler) reportQueryStats(r *http.Request, queryString url.Values, quer "fetched_chunks_bytes", numChunkBytes, "fetched_data_bytes", numDataBytes, "status_code", statusCode, + "response_size", contentLength, }, stats.LoadExtraFields()...) logMessage = append(logMessage, formatQueryString(queryString)...) @@ -274,6 +284,10 @@ func (f *Handler) reportQueryStats(r *http.Request, queryString url.Values, quer logMessage = append(logMessage, grafanaFields...) } + if len(encoding) > 0 { + logMessage = append(logMessage, "content_encoding", encoding) + } + if error != nil { s, ok := status.FromError(error) if !ok { diff --git a/pkg/querier/worker/frontend_processor.go b/pkg/querier/worker/frontend_processor.go index 6120817666..75483cb651 100644 --- a/pkg/querier/worker/frontend_processor.go +++ b/pkg/querier/worker/frontend_processor.go @@ -4,17 +4,20 @@ import ( "context" "fmt" "net/http" + "net/textproto" "time" "github.com/go-kit/log" "github.com/go-kit/log/level" "github.com/weaveworks/common/httpgrpc" + "github.com/weaveworks/common/user" "google.golang.org/grpc" "github.com/cortexproject/cortex/pkg/frontend/v1/frontendv1pb" "github.com/cortexproject/cortex/pkg/querier/stats" querier_stats "github.com/cortexproject/cortex/pkg/querier/stats" "github.com/cortexproject/cortex/pkg/util/backoff" + util_log "github.com/cortexproject/cortex/pkg/util/log" ) var ( @@ -30,6 +33,7 @@ func newFrontendProcessor(cfg Config, handler RequestHandler, log log.Logger) pr handler: handler, maxMessageSize: cfg.GRPCClientConfig.MaxSendMsgSize, querierID: cfg.QuerierID, + targetHeaders: cfg.TargetHeaders, } } @@ -40,6 +44,8 @@ type frontendProcessor struct { querierID string log log.Logger + + targetHeaders []string } // notifyShutdown implements processor. @@ -120,6 +126,24 @@ func (fp *frontendProcessor) runRequest(ctx context.Context, request *httpgrpc.H stats, ctx = querier_stats.ContextWithEmptyStats(ctx) } + headers := make(map[string]string, 0) + for _, h := range request.Headers { + headers[h.Key] = h.Values[0] + } + headerMap := make(map[string]string, 0) + // Remove non-existent header. + for _, header := range fp.targetHeaders { + if v, ok := headers[textproto.CanonicalMIMEHeaderKey(header)]; ok { + headerMap[header] = v + } + } + orgID, ok := headers[textproto.CanonicalMIMEHeaderKey(user.OrgIDHeaderName)] + if ok { + ctx = user.InjectOrgID(ctx, orgID) + } + ctx = util_log.ContextWithHeaderMap(ctx, headerMap) + logger := util_log.WithContext(ctx, fp.log) + response, err := fp.handler.Handle(ctx, request) if err != nil { var ok bool @@ -131,6 +155,9 @@ func (fp *frontendProcessor) runRequest(ctx context.Context, request *httpgrpc.H } } } + if statsEnabled { + level.Info(logger).Log("msg", "finished request", "status_code", response.Code, "response_size", len(response.GetBody())) + } // Ensure responses that are too big are not retried. if len(response.Body) >= fp.maxMessageSize { diff --git a/pkg/querier/worker/scheduler_processor.go b/pkg/querier/worker/scheduler_processor.go index 379c0bd8ec..d4e549e1a9 100644 --- a/pkg/querier/worker/scheduler_processor.go +++ b/pkg/querier/worker/scheduler_processor.go @@ -4,6 +4,7 @@ import ( "context" "fmt" "net/http" + "net/textproto" "time" "github.com/go-kit/log" @@ -37,7 +38,7 @@ func newSchedulerProcessor(cfg Config, handler RequestHandler, log log.Logger, r maxMessageSize: cfg.GRPCClientConfig.MaxSendMsgSize, querierID: cfg.QuerierID, grpcConfig: cfg.GRPCClientConfig, - + targetHeaders: cfg.TargetHeaders, frontendClientRequestDuration: promauto.With(reg).NewHistogramVec(prometheus.HistogramOpts{ Name: "cortex_querier_query_frontend_request_duration_seconds", Help: "Time spend doing requests to frontend.", @@ -70,6 +71,8 @@ type schedulerProcessor struct { frontendPool *client.Pool frontendClientRequestDuration *prometheus.HistogramVec + + targetHeaders []string } // notifyShutdown implements processor. @@ -130,6 +133,19 @@ func (sp *schedulerProcessor) querierLoop(c schedulerpb.SchedulerForQuerier_Quer // We need to inject user into context for sending response back. ctx := user.InjectOrgID(ctx, request.UserID) + headers := make(map[string]string, 0) + for _, h := range request.HttpRequest.Headers { + headers[h.Key] = h.Values[0] + } + headerMap := make(map[string]string, 0) + // Remove non-existent header. + for _, header := range sp.targetHeaders { + if v, ok := headers[textproto.CanonicalMIMEHeaderKey(header)]; ok { + headerMap[header] = v + } + } + ctx = util_log.ContextWithHeaderMap(ctx, headerMap) + tracer := opentracing.GlobalTracer() // Ignore errors here. If we cannot get parent span, we just don't create new one. parentSpanContext, _ := httpgrpcutil.GetParentSpanForRequest(tracer, request.HttpRequest) @@ -140,7 +156,6 @@ func (sp *schedulerProcessor) querierLoop(c schedulerpb.SchedulerForQuerier_Quer ctx = spanCtx } logger := util_log.WithContext(ctx, sp.log) - sp.runRequest(ctx, logger, request.QueryID, request.FrontendAddress, request.StatsEnabled, request.HttpRequest) // Report back to scheduler that processing of the query has finished. @@ -168,6 +183,9 @@ func (sp *schedulerProcessor) runRequest(ctx context.Context, logger log.Logger, } } } + if statsEnabled { + level.Info(logger).Log("msg", "finished request", "status_code", response.Code, "response_size", len(response.GetBody())) + } // Ensure responses that are too big are not retried. if len(response.Body) >= sp.maxMessageSize { diff --git a/pkg/querier/worker/worker.go b/pkg/querier/worker/worker.go index 49c807bce9..b93ff746db 100644 --- a/pkg/querier/worker/worker.go +++ b/pkg/querier/worker/worker.go @@ -31,6 +31,8 @@ type Config struct { QuerierID string `yaml:"id"` GRPCClientConfig grpcclient.Config `yaml:"grpc_client_config"` + + TargetHeaders []string `yaml:"-"` // Propagated by config. } func (cfg *Config) RegisterFlags(f *flag.FlagSet) {