Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Better logql metric status code. #1718

Merged
merged 4 commits into from
Feb 24, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion pkg/logql/ast.go
Original file line number Diff line number Diff line change
Expand Up @@ -179,7 +179,7 @@ func (e *filterExpr) logQLExpr() {}
func mustNewMatcher(t labels.MatchType, n, v string) *labels.Matcher {
m, err := labels.NewMatcher(t, n, v)
if err != nil {
panic(err)
panic(newParseError(err.Error(), 0, 0))
}
return m
}
Expand Down
7 changes: 5 additions & 2 deletions pkg/logql/engine.go
Original file line number Diff line number Diff line change
Expand Up @@ -126,9 +126,12 @@ func (q *query) Exec(ctx context.Context) (Result, error) {
statResult = stats.Snapshot(ctx, time.Since(start))
statResult.Log(level.Debug(log))

status := "success"
status := "200"
if err != nil {
status = "error"
status = "500"
if IsParseError(err) {
status = "400"
}
}
RecordMetrics(status, q.String(), rangeType, statResult)

Expand Down
31 changes: 22 additions & 9 deletions pkg/logql/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,30 +13,35 @@ const (
QueryTypeMetric = "metric"
QueryTypeFilter = "filter"
QueryTypeLimited = "limited"

latencyTypeSlow = "slow"
latencyTypeFast = "fast"

slowQueryThresholdSecond = float64(10)
)

var (
bytesPerSeconds = promauto.NewHistogramVec(prometheus.HistogramOpts{
Namespace: "loki",
Name: "logql_querystats_bytes_processed_per_seconds",
Help: "Distribution of bytes processed per seconds for LogQL queries.",
// 0 MB 40 MB 80 MB 160 MB 320 MB 640 MB 1.3 GB 2.6 GB 5.1 GB 10 GB
Buckets: prometheus.ExponentialBuckets(20*1e6, 2, 10),
}, []string{"status", "type", "range"})
// 50MB 100MB 200MB 400MB 600MB 800MB 1GB 2GB 3GB 4GB 5GB 6GB 7GB 8GB 9GB 10GB 15GB 20GB
Buckets: []float64{50 * 1e6, 100 * 1e6, 400 * 1e6, 600 * 1e6, 800 * 1e6, 1 * 1e9, 2 * 1e9, 3 * 1e9, 4 * 1e9, 5 * 1e9, 6 * 1e9, 7 * 1e9, 8 * 1e9, 9 * 1e9, 10 * 1e9, 15 * 1e9, 20 * 1e9},
}, []string{"status_code", "type", "range", "latency_type"})
execLatency = promauto.NewHistogramVec(prometheus.HistogramOpts{
Namespace: "loki",
Name: "logql_querystats_latency_seconds",
Help: "Distribution of latency for LogQL queries.",
// 0.25 0.5 1 2 4 8 16 32 64 128
Buckets: prometheus.ExponentialBuckets(0.250, 2, 10),
}, []string{"status", "type", "range"})
}, []string{"status_code", "type", "range"})
chunkDownloadLatency = promauto.NewHistogramVec(prometheus.HistogramOpts{
Namespace: "loki",
Name: "logql_querystats_chunk_download_latency_seconds",
Help: "Distribution of chunk downloads latency for LogQL queries.",
// 0.125 0.25 0.5 1 2 4 8 16 32 64
Buckets: prometheus.ExponentialBuckets(0.125, 2, 10),
}, []string{"status", "type", "range"})
// 0.25 0.5 1 2 4 8 16 32 64 128
Buckets: prometheus.ExponentialBuckets(0.250, 2, 10),
}, []string{"status_code", "type", "range"})
duplicatesTotal = promauto.NewCounter(prometheus.CounterOpts{
Namespace: "loki",
Name: "logql_querystats_duplicates_total",
Expand All @@ -46,7 +51,7 @@ var (
Namespace: "loki",
Name: "logql_querystats_downloaded_chunk_total",
Help: "Total count of chunks downloaded found while executing LogQL queries.",
}, []string{"status", "type", "range"})
}, []string{"status_code", "type", "range"})
ingesterLineTotal = promauto.NewCounter(prometheus.CounterOpts{
Namespace: "loki",
Name: "logql_querystats_ingester_sent_lines_total",
Expand All @@ -60,7 +65,15 @@ func RecordMetrics(status, query string, rangeType QueryRangeType, stats stats.R
level.Warn(util.Logger).Log("msg", "error parsing query type", "err", err)
}
rt := string(rangeType)
bytesPerSeconds.WithLabelValues(status, queryType, rt).

// Tag throughput metric by latency type based on a threshold.
// Latency below the threshold is fast, above is slow.
latencyType := latencyTypeFast
if stats.Summary.ExecTime > slowQueryThresholdSecond {
latencyType = latencyTypeSlow
}

bytesPerSeconds.WithLabelValues(status, queryType, rt, latencyType).
Observe(float64(stats.Summary.BytesProcessedPerSeconds))
execLatency.WithLabelValues(status, queryType, rt).
Observe(stats.Summary.ExecTime)
Expand Down
11 changes: 10 additions & 1 deletion pkg/logql/parser.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,10 @@ func ParseExpr(input string) (expr Expr, err error) {
if r != nil {
var ok bool
if err, ok = r.(error); ok {
return
if IsParseError(err) {
return
}
err = newParseError(err.Error(), 0, 0)
}
}
}()
Expand Down Expand Up @@ -89,3 +92,9 @@ func newParseError(msg string, line, col int) ParseError {
col: col,
}
}

// IsParseError returns true if the err is a ast parsing error.
func IsParseError(err error) bool {
_, ok := err.(ParseError)
return ok
}
33 changes: 33 additions & 0 deletions pkg/logql/parser_test.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package logql

import (
"errors"
"reflect"
"testing"
"time"
Expand Down Expand Up @@ -893,3 +894,35 @@ func TestParseMatchers(t *testing.T) {
})
}
}

func TestIsParseError(t *testing.T) {

tests := []struct {
name string
errFn func() error
want bool
}{
{
"bad query",
func() error {
_, err := ParseExpr(`{foo`)
return err
},
true,
},
{
"other error",
func() error {
return errors.New("")
},
false,
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
if got := IsParseError(tt.errFn()); got != tt.want {
t.Errorf("IsParseError() = %v, want %v", got, tt.want)
}
})
}
}
2 changes: 1 addition & 1 deletion pkg/loki/modules.go
Original file line number Diff line number Diff line change
Expand Up @@ -323,7 +323,7 @@ func (t *Loki) initQueryFrontend() (err error) {
t.frontend.Wrap(tripperware)
frontend.RegisterFrontendServer(t.server.GRPC, t.frontend)

frontendHandler := t.httpAuthMiddleware.Wrap(t.frontend.Handler())
frontendHandler := queryrange.StatsHTTPMiddleware.Wrap(t.httpAuthMiddleware.Wrap(t.frontend.Handler()))
t.server.HTTP.Handle("/loki/api/v1/query_range", frontendHandler)
t.server.HTTP.Handle("/loki/api/v1/query", frontendHandler)
t.server.HTTP.Handle("/loki/api/v1/label", frontendHandler)
Expand Down
18 changes: 15 additions & 3 deletions pkg/querier/http.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,11 @@ func (q *Querier) RangeQueryHandler(w http.ResponseWriter, r *http.Request) {
query := q.engine.NewRangeQuery(request.Query, request.Start, request.End, request.Step, request.Direction, request.Limit)
result, err := query.Exec(ctx)
if err != nil {
http.Error(w, err.Error(), http.StatusBadRequest)
if logql.IsParseError(err) {
http.Error(w, err.Error(), http.StatusBadRequest)
return
}
http.Error(w, err.Error(), http.StatusInternalServerError)
return
}

Expand All @@ -68,7 +72,11 @@ func (q *Querier) InstantQueryHandler(w http.ResponseWriter, r *http.Request) {
query := q.engine.NewInstantQuery(request.Query, request.Ts, request.Direction, request.Limit)
result, err := query.Exec(ctx)
if err != nil {
http.Error(w, err.Error(), http.StatusBadRequest)
if logql.IsParseError(err) {
http.Error(w, err.Error(), http.StatusBadRequest)
return
}
http.Error(w, err.Error(), http.StatusInternalServerError)
return
}

Expand Down Expand Up @@ -114,7 +122,11 @@ func (q *Querier) LogQueryHandler(w http.ResponseWriter, r *http.Request) {
query := q.engine.NewRangeQuery(request.Query, request.Start, request.End, request.Step, request.Direction, request.Limit)
result, err := query.Exec(ctx)
if err != nil {
http.Error(w, err.Error(), http.StatusBadRequest)
if logql.IsParseError(err) {
http.Error(w, err.Error(), http.StatusBadRequest)
return
}
http.Error(w, err.Error(), http.StatusInternalServerError)
return
}

Expand Down
4 changes: 2 additions & 2 deletions pkg/querier/queryrange/roundtrip.go
Original file line number Diff line number Diff line change
Expand Up @@ -91,7 +91,7 @@ func NewLogFilterTripperware(
limits Limits,
codec queryrange.Codec,
) (frontend.Tripperware, error) {
queryRangeMiddleware := []queryrange.Middleware{StatsMiddleware(), queryrange.LimitsMiddleware(limits)}
queryRangeMiddleware := []queryrange.Middleware{StatsCollectorMiddleware(), queryrange.LimitsMiddleware(limits)}
if cfg.SplitQueriesByInterval != 0 {
queryRangeMiddleware = append(queryRangeMiddleware, queryrange.InstrumentMiddleware("split_by_interval"), SplitByIntervalMiddleware(limits, codec))
}
Expand All @@ -115,7 +115,7 @@ func NewMetricTripperware(
extractor queryrange.Extractor,
) (frontend.Tripperware, Stopper, error) {

queryRangeMiddleware := []queryrange.Middleware{StatsMiddleware(), queryrange.LimitsMiddleware(limits)}
queryRangeMiddleware := []queryrange.Middleware{StatsCollectorMiddleware(), queryrange.LimitsMiddleware(limits)}
if cfg.AlignQueriesWithStep {
queryRangeMiddleware = append(
queryRangeMiddleware,
Expand Down
102 changes: 84 additions & 18 deletions pkg/querier/queryrange/stats.go
Original file line number Diff line number Diff line change
@@ -1,23 +1,35 @@
package queryrange

import (
"bufio"
"context"
"fmt"
"net"
"net/http"
"strconv"
"time"

"github.com/cortexproject/cortex/pkg/querier/queryrange"
"github.com/cortexproject/cortex/pkg/util"
"github.com/cortexproject/cortex/pkg/util/spanlogger"
"github.com/go-kit/kit/log/level"
"github.com/weaveworks/common/middleware"

"github.com/grafana/loki/pkg/loghttp"
"github.com/grafana/loki/pkg/logql"
"github.com/grafana/loki/pkg/logql/stats"
)

var defaultMetricRecorder = metricRecorderFn(func(status, query string, rangeType logql.QueryRangeType, stats stats.Result) {
logql.RecordMetrics(status, query, rangeType, stats)
})
type ctxKeyType string

const ctxKey ctxKeyType = "stats"

var (
defaultMetricRecorder = metricRecorderFn(func(status, query string, rangeType logql.QueryRangeType, stats stats.Result) {
logql.RecordMetrics(status, query, rangeType, stats)
})
// StatsHTTPMiddleware is an http middleware to record stats for query_range filter.
StatsHTTPMiddleware middleware.Interface = statsHTTPMiddleware(defaultMetricRecorder)
)

type metricRecorder interface {
Record(status, query string, rangeType logql.QueryRangeType, stats stats.Result)
Expand All @@ -29,13 +41,41 @@ func (m metricRecorderFn) Record(status, query string, rangeType logql.QueryRang
m(status, query, rangeType, stats)
}

// StatsMiddleware creates a new Middleware that recompute the stats summary based on the actual duration of the request.
// The middleware also register Prometheus metrics.
func StatsMiddleware() queryrange.Middleware {
return statsMiddleware(defaultMetricRecorder)
type queryData struct {
query string
statistics *stats.Result
rangeType logql.QueryRangeType
recorded bool
}

func statsHTTPMiddleware(recorder metricRecorder) middleware.Interface {
return middleware.Func(func(next http.Handler) http.Handler {
return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
data := &queryData{}
interceptor := &interceptor{ResponseWriter: w, statusCode: http.StatusOK}
next.ServeHTTP(
interceptor,
r.WithContext(context.WithValue(r.Context(), ctxKey, data)),
)
// http middlewares runs for every http request.
// but we want only to record query_range filters.
if data.recorded {
if data.statistics == nil {
data.statistics = &stats.Result{}
}
recorder.Record(
strconv.Itoa(interceptor.statusCode),
data.query,
data.rangeType,
*data.statistics,
)
}
})
})
}

func statsMiddleware(recorder metricRecorder) queryrange.Middleware {
// StatsCollectorMiddleware compute the stats summary based on the actual duration of the request and inject it in the request context.
func StatsCollectorMiddleware() queryrange.Middleware {
return queryrange.MiddlewareFunc(func(next queryrange.Handler) queryrange.Handler {
return queryrange.HandlerFunc(func(ctx context.Context, req queryrange.Request) (queryrange.Response, error) {
logger := spanlogger.FromContext(ctx)
Expand All @@ -46,31 +86,57 @@ func statsMiddleware(recorder metricRecorder) queryrange.Middleware {

// collect stats and status
var statistics *stats.Result
var status string
if resp != nil {
switch r := resp.(type) {
case *LokiResponse:
statistics = &r.Statistics
status = r.Status
case *LokiPromResponse:
statistics = &r.Statistics
if r.Response != nil {
status = r.Response.Status
}
default:
level.Warn(util.Logger).Log("msg", fmt.Sprintf("cannot compute stats, unexpected type: %T", resp))
}
}
if err != nil {
status = loghttp.QueryStatusFail
}
if statistics != nil {
// Re-calculate the summary then log and record metrics for the current query
statistics.ComputeSummary(time.Since(start))
statistics.Log(logger)
recorder.Record(status, req.GetQuery(), logql.GetRangeType(paramsFromRequest(req)), *statistics)
}
ctxValue := ctx.Value(ctxKey)
if data, ok := ctxValue.(*queryData); ok {
data.recorded = true
cyriltovena marked this conversation as resolved.
Show resolved Hide resolved
data.query = req.GetQuery()
data.statistics = statistics
data.rangeType = logql.GetRangeType(paramsFromRequest(req))
}
return resp, err
})
})
}

// interceptor implements WriteHeader to intercept status codes. WriteHeader
// may not be called on success, so initialize statusCode with the status you
// want to report on success, i.e. http.StatusOK.
//
// interceptor also implements net.Hijacker, to let the downstream Handler
// hijack the connection. This is needed, for example, for working with websockets.
type interceptor struct {
http.ResponseWriter
statusCode int
recorded bool
}

func (i *interceptor) WriteHeader(code int) {
if !i.recorded {
joe-elliott marked this conversation as resolved.
Show resolved Hide resolved
i.statusCode = code
i.recorded = true
}
i.ResponseWriter.WriteHeader(code)
}

func (i *interceptor) Hijack() (net.Conn, *bufio.ReadWriter, error) {
hj, ok := i.ResponseWriter.(http.Hijacker)
if !ok {
return nil, nil, fmt.Errorf("interceptor: can't cast parent ResponseWriter to Hijacker")
}
return hj.Hijack()
}
Loading