Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

log query response size in QFE and querier #5288

Merged
merged 4 commits into from
Apr 25, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions pkg/cortex/modules.go
Original file line number Diff line number Diff line change
Expand Up @@ -346,6 +346,7 @@ func (t *Cortex) initQuerier() (serv services.Service, err error) {
}

t.Cfg.Worker.MaxConcurrentRequests = t.Cfg.Querier.MaxConcurrent
t.Cfg.Worker.TargetHeaders = t.Cfg.API.HTTPRequestHeadersToLog
return querier_worker.NewQuerierWorker(t.Cfg.Worker, httpgrpc_server.NewServer(internalQuerierRouter), util_log.Logger, prometheus.DefaultRegisterer)
}

Expand Down
18 changes: 16 additions & 2 deletions pkg/frontend/transport/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -177,7 +177,7 @@ func (f *Handler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
}
}

f.reportQueryStats(r, queryString, queryResponseTime, stats, err, statusCode)
f.reportQueryStats(r, queryString, queryResponseTime, stats, err, statusCode, resp)
}

if err != nil {
Expand Down Expand Up @@ -232,7 +232,7 @@ func (f *Handler) reportSlowQuery(r *http.Request, queryString url.Values, query
level.Info(util_log.WithContext(r.Context(), f.log)).Log(logMessage...)
}

func (f *Handler) reportQueryStats(r *http.Request, queryString url.Values, queryResponseTime time.Duration, stats *querier_stats.QueryStats, error error, statusCode int) {
func (f *Handler) reportQueryStats(r *http.Request, queryString url.Values, queryResponseTime time.Duration, stats *querier_stats.QueryStats, error error, statusCode int, resp *http.Response) {
tenantIDs, err := tenant.TenantIDs(r.Context())
if err != nil {
return
Expand All @@ -252,6 +252,15 @@ func (f *Handler) reportQueryStats(r *http.Request, queryString url.Values, quer
f.queryDataBytes.WithLabelValues(userID).Add(float64(numDataBytes))
f.activeUsers.UpdateUserTimestamp(userID, time.Now())

var (
contentLength int64
encoding string
)
if resp != nil {
contentLength = resp.ContentLength
encoding = resp.Header.Get("Content-Encoding")
}

// Log stats.
logMessage := append([]interface{}{
"msg", "query stats",
Expand All @@ -266,6 +275,7 @@ func (f *Handler) reportQueryStats(r *http.Request, queryString url.Values, quer
"fetched_chunks_bytes", numChunkBytes,
"fetched_data_bytes", numDataBytes,
"status_code", statusCode,
"response_size", contentLength,
}, stats.LoadExtraFields()...)

logMessage = append(logMessage, formatQueryString(queryString)...)
Expand All @@ -274,6 +284,10 @@ func (f *Handler) reportQueryStats(r *http.Request, queryString url.Values, quer
logMessage = append(logMessage, grafanaFields...)
}

if len(encoding) > 0 {
logMessage = append(logMessage, "content_encoding", encoding)
}

if error != nil {
s, ok := status.FromError(error)
if !ok {
Expand Down
27 changes: 27 additions & 0 deletions pkg/querier/worker/frontend_processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,17 +4,20 @@ import (
"context"
"fmt"
"net/http"
"net/textproto"
"time"

"github.com/go-kit/log"
"github.com/go-kit/log/level"
"github.com/weaveworks/common/httpgrpc"
"github.com/weaveworks/common/user"
"google.golang.org/grpc"

"github.com/cortexproject/cortex/pkg/frontend/v1/frontendv1pb"
"github.com/cortexproject/cortex/pkg/querier/stats"
querier_stats "github.com/cortexproject/cortex/pkg/querier/stats"
"github.com/cortexproject/cortex/pkg/util/backoff"
util_log "github.com/cortexproject/cortex/pkg/util/log"
)

var (
Expand All @@ -30,6 +33,7 @@ func newFrontendProcessor(cfg Config, handler RequestHandler, log log.Logger) pr
handler: handler,
maxMessageSize: cfg.GRPCClientConfig.MaxSendMsgSize,
querierID: cfg.QuerierID,
targetHeaders: cfg.TargetHeaders,
}
}

Expand All @@ -40,6 +44,8 @@ type frontendProcessor struct {
querierID string

log log.Logger

targetHeaders []string
}

// notifyShutdown implements processor.
Expand Down Expand Up @@ -120,6 +126,24 @@ func (fp *frontendProcessor) runRequest(ctx context.Context, request *httpgrpc.H
stats, ctx = querier_stats.ContextWithEmptyStats(ctx)
}

headers := make(map[string]string, 0)
for _, h := range request.Headers {
headers[h.Key] = h.Values[0]
}
headerMap := make(map[string]string, 0)
// Remove non-existent header.
for _, header := range fp.targetHeaders {
if v, ok := headers[textproto.CanonicalMIMEHeaderKey(header)]; ok {
headerMap[header] = v
}
}
orgID, ok := headers[textproto.CanonicalMIMEHeaderKey(user.OrgIDHeaderName)]
if ok {
ctx = user.InjectOrgID(ctx, orgID)
}
ctx = util_log.ContextWithHeaderMap(ctx, headerMap)
logger := util_log.WithContext(ctx, fp.log)

response, err := fp.handler.Handle(ctx, request)
if err != nil {
var ok bool
Expand All @@ -131,6 +155,9 @@ func (fp *frontendProcessor) runRequest(ctx context.Context, request *httpgrpc.H
}
}
}
if statsEnabled {
level.Info(logger).Log("msg", "finished request", "status_code", response.Code, "response_size", len(response.GetBody()))
}

// Ensure responses that are too big are not retried.
if len(response.Body) >= fp.maxMessageSize {
Expand Down
22 changes: 20 additions & 2 deletions pkg/querier/worker/scheduler_processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"context"
"fmt"
"net/http"
"net/textproto"
"time"

"github.com/go-kit/log"
Expand Down Expand Up @@ -37,7 +38,7 @@ func newSchedulerProcessor(cfg Config, handler RequestHandler, log log.Logger, r
maxMessageSize: cfg.GRPCClientConfig.MaxSendMsgSize,
querierID: cfg.QuerierID,
grpcConfig: cfg.GRPCClientConfig,

targetHeaders: cfg.TargetHeaders,
frontendClientRequestDuration: promauto.With(reg).NewHistogramVec(prometheus.HistogramOpts{
Name: "cortex_querier_query_frontend_request_duration_seconds",
Help: "Time spend doing requests to frontend.",
Expand Down Expand Up @@ -70,6 +71,8 @@ type schedulerProcessor struct {

frontendPool *client.Pool
frontendClientRequestDuration *prometheus.HistogramVec

targetHeaders []string
}

// notifyShutdown implements processor.
Expand Down Expand Up @@ -130,6 +133,19 @@ func (sp *schedulerProcessor) querierLoop(c schedulerpb.SchedulerForQuerier_Quer
// We need to inject user into context for sending response back.
ctx := user.InjectOrgID(ctx, request.UserID)

headers := make(map[string]string, 0)
for _, h := range request.HttpRequest.Headers {
headers[h.Key] = h.Values[0]
}
headerMap := make(map[string]string, 0)
// Remove non-existent header.
for _, header := range sp.targetHeaders {
if v, ok := headers[textproto.CanonicalMIMEHeaderKey(header)]; ok {
headerMap[header] = v
}
}
ctx = util_log.ContextWithHeaderMap(ctx, headerMap)

tracer := opentracing.GlobalTracer()
// Ignore errors here. If we cannot get parent span, we just don't create new one.
parentSpanContext, _ := httpgrpcutil.GetParentSpanForRequest(tracer, request.HttpRequest)
Expand All @@ -140,7 +156,6 @@ func (sp *schedulerProcessor) querierLoop(c schedulerpb.SchedulerForQuerier_Quer
ctx = spanCtx
}
logger := util_log.WithContext(ctx, sp.log)

sp.runRequest(ctx, logger, request.QueryID, request.FrontendAddress, request.StatsEnabled, request.HttpRequest)

// Report back to scheduler that processing of the query has finished.
Expand Down Expand Up @@ -168,6 +183,9 @@ func (sp *schedulerProcessor) runRequest(ctx context.Context, logger log.Logger,
}
}
}
if statsEnabled {
level.Info(logger).Log("msg", "finished request", "status_code", response.Code, "response_size", len(response.GetBody()))
}

// Ensure responses that are too big are not retried.
if len(response.Body) >= sp.maxMessageSize {
Expand Down
2 changes: 2 additions & 0 deletions pkg/querier/worker/worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,8 @@ type Config struct {
QuerierID string `yaml:"id"`

GRPCClientConfig grpcclient.Config `yaml:"grpc_client_config"`

TargetHeaders []string `yaml:"-"` // Propagated by config.
}

func (cfg *Config) RegisterFlags(f *flag.FlagSet) {
Expand Down