diff --git a/CHANGELOG.md b/CHANGELOG.md index a3fbb467f8f..1ac9730909d 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,5 +1,7 @@ ## main / unreleased * [FEATURE] tempo-cli: support dropping multiple traces in a single operation [#4266](https://github.com/grafana/tempo/pull/4266) (@ndk) +* [CHANGE] slo: include all request cancellations within SLO [#4355] (https://github.com/grafana/tempo/pull/4355) (@electron0zero) + `tempo_query_frontend_queries_within_slo_total` is now more accurate by including all request cancellations within the SLO. * [CHANGE] update default config values to better align with production workloads [#4340](https://github.com/grafana/tempo/pull/4340) (@electron0zero) * [CHANGE] fix deprecation warning by switching to DoBatchWithOptions [#4343](https://github.com/grafana/tempo/pull/4343) (@dastrobu) * [CHANGE] **BREAKING CHANGE** The Tempo serverless is now deprecated and will be removed in an upcoming release [#4017](https://github.com/grafana/tempo/pull/4017/) @electron0zero diff --git a/modules/frontend/combiner/common.go b/modules/frontend/combiner/common.go index 7b73a93ac36..5fe6488b005 100644 --- a/modules/frontend/combiner/common.go +++ b/modules/frontend/combiner/common.go @@ -8,6 +8,7 @@ import ( "sync" tempo_io "github.com/grafana/tempo/pkg/io" + "github.com/grafana/tempo/pkg/util" "github.com/gogo/protobuf/jsonpb" "github.com/gogo/protobuf/proto" @@ -129,9 +130,14 @@ func (c *genericCombiner[T]) HTTPFinal() (*http.Response, error) { c.mu.Lock() defer c.mu.Unlock() - httpErr, _ := c.erroredResponse() - if httpErr != nil { - return httpErr, nil + // TODO: verify if this fixes the issue of HTTPFinal returning nil error?? + httpResp, grpcErr := c.erroredResponse() + if grpcErr != nil { + // TODO: it's a gRPC error, maybe not a good idea to return it as is, turn into the HTTP error?? + return nil, grpcErr + } + if httpResp != nil { + return httpResp, nil } final, err := c.finalize(c.current) @@ -217,6 +223,9 @@ func (c *genericCombiner[T]) erroredResponse() (*http.Response, error) { grpcErr = status.Error(codes.ResourceExhausted, c.httpRespBody) case http.StatusBadRequest: grpcErr = status.Error(codes.InvalidArgument, c.httpRespBody) + case util.StatusClientClosedRequest: + // HTTP 499 is mapped to codes.Canceled grpc error + grpcErr = status.Error(codes.Canceled, c.httpRespBody) default: if c.httpStatusCode/100 == 5 { grpcErr = status.Error(codes.Internal, c.httpRespBody) @@ -226,7 +235,7 @@ func (c *genericCombiner[T]) erroredResponse() (*http.Response, error) { } httpResp := &http.Response{ StatusCode: c.httpStatusCode, - Status: http.StatusText(c.httpStatusCode), + Status: util.StatusText(c.httpStatusCode), Body: io.NopCloser(strings.NewReader(c.httpRespBody)), } diff --git a/modules/frontend/combiner/common_test.go b/modules/frontend/combiner/common_test.go index 8bee69c1bb4..8c5a0abc054 100644 --- a/modules/frontend/combiner/common_test.go +++ b/modules/frontend/combiner/common_test.go @@ -13,10 +13,12 @@ import ( "github.com/gogo/status" "github.com/grafana/tempo/pkg/api" "github.com/grafana/tempo/pkg/tempopb" + "github.com/grafana/tempo/pkg/util" "github.com/stretchr/testify/require" "google.golang.org/grpc/codes" ) +// FIXME: add a test here for canceling the context func TestErroredResponse(t *testing.T) { tests := []struct { name string @@ -55,6 +57,17 @@ func TestErroredResponse(t *testing.T) { }, expectedErr: status.Error(codes.InvalidArgument, "foo"), }, + { + name: "499", + statusCode: util.StatusClientClosedRequest, + respBody: "foo", + expectedResp: &http.Response{ + StatusCode: util.StatusClientClosedRequest, + Status: util.StatusTextClientClosedRequest, + Body: io.NopCloser(strings.NewReader("foo")), + }, + expectedErr: status.Error(codes.Canceled, "foo"), + }, } for _, tc := range tests { diff --git a/modules/frontend/combiner/trace_by_id.go b/modules/frontend/combiner/trace_by_id.go index 27ba0948c38..2c5c62b5697 100644 --- a/modules/frontend/combiner/trace_by_id.go +++ b/modules/frontend/combiner/trace_by_id.go @@ -95,6 +95,7 @@ func (c *traceByIDCombiner) AddResponse(r PipelineResponse) error { return err } +// TODO: verify if we return error correctly here?? func (c *traceByIDCombiner) HTTPFinal() (*http.Response, error) { c.mu.Lock() defer c.mu.Unlock() diff --git a/modules/frontend/handler.go b/modules/frontend/handler.go index 4a9fbe2e3ff..4efd1124c5f 100644 --- a/modules/frontend/handler.go +++ b/modules/frontend/handler.go @@ -10,6 +10,7 @@ import ( "time" "github.com/grafana/dskit/flagext" + "github.com/grafana/tempo/pkg/util" "go.opentelemetry.io/otel/attribute" "go.opentelemetry.io/otel/trace" @@ -22,16 +23,14 @@ import ( ) const ( - // StatusClientClosedRequest is the status code for when a client request cancellation of an http request - StatusClientClosedRequest = 499 // nil response in ServeHTTP NilResponseError = "nil resp in ServeHTTP" ) var ( - errCanceled = httpgrpc.Errorf(StatusClientClosedRequest, context.Canceled.Error()) - errDeadlineExceeded = httpgrpc.Errorf(http.StatusGatewayTimeout, context.DeadlineExceeded.Error()) - errRequestEntityTooLarge = httpgrpc.Errorf(http.StatusRequestEntityTooLarge, "http: request body too large") + errCanceled = httpgrpc.Error(util.StatusClientClosedRequest, context.Canceled.Error()) + errDeadlineExceeded = httpgrpc.Error(http.StatusGatewayTimeout, context.DeadlineExceeded.Error()) + errRequestEntityTooLarge = httpgrpc.Error(http.StatusRequestEntityTooLarge, "http: request body too large") ) // handler exists to wrap a roundtripper with an HTTP handler. It wraps all @@ -88,7 +87,7 @@ func (f *handler) ServeHTTP(w http.ResponseWriter, r *http.Request) { logMessage = append( logMessage, "status", statusCode, - "err", err.Error(), + "error", err.Error(), "response_size", 0, ) level.Info(f.logger).Log(logMessage...) @@ -161,14 +160,9 @@ func writeError(w http.ResponseWriter, err error) error { err = errCanceled } else if errors.Is(err, context.DeadlineExceeded) { err = errDeadlineExceeded - } else if isRequestBodyTooLarge(err) { + } else if util.IsRequestBodyTooLarge(err) { err = errRequestEntityTooLarge } httpgrpc.WriteError(w, err) return err } - -// isRequestBodyTooLarge returns true if the error is "http: request body too large". -func isRequestBodyTooLarge(err error) bool { - return err != nil && strings.Contains(err.Error(), "http: request body too large") -} diff --git a/modules/frontend/pipeline/collector_grpc.go b/modules/frontend/pipeline/collector_grpc.go index cd63181f543..a512cda09d3 100644 --- a/modules/frontend/pipeline/collector_grpc.go +++ b/modules/frontend/pipeline/collector_grpc.go @@ -2,6 +2,7 @@ package pipeline import ( "context" + "errors" "net/http" "time" @@ -27,7 +28,8 @@ func NewGRPCCollector[T combiner.TResponse](next AsyncRoundTripper[combiner.Pipe } } -// Handle +// RoundTrip implements the http.RoundTripper interface +// FIXME: this RoundTrip should return an error if the req.Context is cancelled??? func (c GRPCCollector[T]) RoundTrip(req *http.Request) error { ctx := req.Context() ctx, cancel := context.WithCancel(ctx) // create a new context with a cancel function @@ -41,11 +43,19 @@ func (c GRPCCollector[T]) RoundTrip(req *http.Request) error { lastUpdate := time.Now() - err = consumeAndCombineResponses(ctx, c.consumers, resps, c.combiner, func() error { + // sendDiffCb should return an error if the context is cancelled, + // callback's error is used to exit early from the loop and return the error to the caller + sendDiffCb := func() error { // check if we should send an update if time.Since(lastUpdate) > 500*time.Millisecond { lastUpdate = time.Now() + // check and return the request context has an error, maybe it was cancelled, etc + // TODO: verify if we need this error check? check if err from c.combiner.GRPCDiff() enough?? + if req.Context().Err() != nil { + return req.Context().Err() + } + // send a diff only during streaming resp, err := c.combiner.GRPCDiff() if err != nil { @@ -58,7 +68,9 @@ func (c GRPCCollector[T]) RoundTrip(req *http.Request) error { } return nil - }) + } + + err = consumeAndCombineResponses(ctx, c.consumers, resps, c.combiner, sendDiffCb) if err != nil { return grpcError(err) } @@ -68,6 +80,12 @@ func (c GRPCCollector[T]) RoundTrip(req *http.Request) error { if err != nil { return grpcError(err) } + + // TODO: verify if we need this error check? check if err from c.combiner.GRPCDiff() enough?? + if req.Context().Err() != nil { + return grpcError(req.Context().Err()) + } + err = c.send(resp) if err != nil { return grpcError(err) @@ -83,5 +101,11 @@ func grpcError(err error) error { return err } + // if this is context cancelled, we return a grpc cancelled error + if errors.Is(err, context.Canceled) { + return status.Error(codes.Canceled, err.Error()) + } + + // rest all fall into internal server error return status.Error(codes.Internal, err.Error()) } diff --git a/modules/frontend/pipeline/collector_http.go b/modules/frontend/pipeline/collector_http.go index ae21e298a5a..d51834358d7 100644 --- a/modules/frontend/pipeline/collector_http.go +++ b/modules/frontend/pipeline/collector_http.go @@ -29,7 +29,8 @@ func NewHTTPCollector(next AsyncRoundTripper[combiner.PipelineResponse], consume } } -// Handle +// RoundTrip implements the http.RoundTripper interface +// TODO: this should always return an error if the req.Context is cancelled func (r httpCollector) RoundTrip(req *http.Request) (*http.Response, error) { ctx, cancel := context.WithCancel(req.Context()) defer cancel() diff --git a/modules/frontend/search_handlers_test.go b/modules/frontend/search_handlers_test.go index 05766eb78ba..56c719237d6 100644 --- a/modules/frontend/search_handlers_test.go +++ b/modules/frontend/search_handlers_test.go @@ -295,7 +295,7 @@ func runnerClientCancelContext(t *testing.T, f *QueryFrontend) { }() grpcReq := &tempopb.SearchRequest{} err := f.streamingSearch(grpcReq, srv) - require.Equal(t, status.Error(codes.Internal, "context canceled"), err) + require.Equal(t, status.Error(codes.Canceled, "context canceled"), err) } func TestSearchLimitHonored(t *testing.T) { diff --git a/modules/frontend/slos.go b/modules/frontend/slos.go index e2e7c8cc918..f038bb02a84 100644 --- a/modules/frontend/slos.go +++ b/modules/frontend/slos.go @@ -1,10 +1,13 @@ package frontend import ( + "context" + "errors" "net/http" "time" "github.com/gogo/status" + "github.com/grafana/tempo/pkg/util" "github.com/prometheus/client_golang/prometheus" "github.com/prometheus/client_golang/prometheus/promauto" "google.golang.org/grpc/codes" @@ -89,11 +92,15 @@ func sloHook(allByTenantCounter, withinSLOByTenantCounter *prometheus.CounterVec // most errors are SLO violations if err != nil { - // however, if this is a grpc resource exhausted error (429) or invalid argument (400) then we are within SLO + // However, gRPC resource exhausted error (429), invalid argument (400), not found (404) and + // request cancellations are considered within the SLO. switch status.Code(err) { - case codes.ResourceExhausted, - codes.InvalidArgument, - codes.NotFound: + case codes.ResourceExhausted, codes.InvalidArgument, codes.NotFound, codes.Canceled: + withinSLOByTenantCounter.WithLabelValues(tenant).Inc() + } + + // we don't always get a gRPC codes.Canceled status code, so check for context.Canceled and http 499 as well + if errors.Is(err, context.Canceled) || (resp != nil && resp.StatusCode == util.StatusClientClosedRequest) { withinSLOByTenantCounter.WithLabelValues(tenant).Inc() } return diff --git a/modules/frontend/slos_test.go b/modules/frontend/slos_test.go index c88481d73cc..f24b5bb201c 100644 --- a/modules/frontend/slos_test.go +++ b/modules/frontend/slos_test.go @@ -1,6 +1,7 @@ package frontend import ( + "context" "errors" "io" "net/http" @@ -8,6 +9,7 @@ import ( "testing" "time" + "github.com/grafana/tempo/pkg/util" "github.com/grafana/tempo/pkg/util/test" "github.com/prometheus/client_golang/prometheus" "github.com/stretchr/testify/require" @@ -165,3 +167,101 @@ func TestBadRequest(t *testing.T) { require.Equal(t, 1.0, actualAll) require.Equal(t, 1.0, actualSLO) } + +func TestCanceledRequest(t *testing.T) { + tests := []struct { + name string + statusCode int + err error + withInSLO bool + }{ + { + name: "random error with http response has 499 status code", + statusCode: util.StatusClientClosedRequest, + err: errors.New("foo"), + withInSLO: true, + }, + { + name: "context.Canceled error with http response has 499 status code", + statusCode: util.StatusClientClosedRequest, + err: context.Canceled, + withInSLO: true, + }, + { + name: "context.Canceled error with 500 status code", + statusCode: http.StatusInternalServerError, + err: context.Canceled, + withInSLO: true, + }, + { + name: "context.Canceled error with 200 status code", + statusCode: http.StatusOK, + err: context.Canceled, + withInSLO: true, + }, + { + name: "grpc codes.Canceled error with 500 status code", + statusCode: http.StatusInternalServerError, + err: status.Error(codes.Canceled, "foo"), + withInSLO: true, + }, + { + name: "grpc codes.Canceled error with 200 status code", + statusCode: http.StatusOK, + err: status.Error(codes.Canceled, "foo"), + withInSLO: true, + }, + { + name: "no error with 200 status code", + statusCode: http.StatusOK, + err: nil, + withInSLO: false, + }, + { + name: "no error with 500 status code", + statusCode: http.StatusInternalServerError, + err: nil, + withInSLO: false, + }, + { + name: "no error with http response has 499 status code", + statusCode: util.StatusClientClosedRequest, + err: nil, + withInSLO: false, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + allCounter := prometheus.NewCounterVec(prometheus.CounterOpts{Name: "all"}, []string{"tenant"}) + sloCounter := prometheus.NewCounterVec(prometheus.CounterOpts{Name: "slo"}, []string{"tenant"}) + throughputVec := prometheus.NewHistogramVec(prometheus.HistogramOpts{Name: "throughput"}, []string{"tenant"}) + + hook := sloHook(allCounter, sloCounter, throughputVec, SLOConfig{ + DurationSLO: 10 * time.Second, + ThroughputBytesSLO: 100, + }) + + res := &http.Response{ + StatusCode: tt.statusCode, + Status: "context canceled", + Body: io.NopCloser(strings.NewReader("foo")), + } + + // latency is below DurationSLO threshold + hook(res, "tenant", 0, 15*time.Second, tt.err) + + actualAll, err := test.GetCounterValue(allCounter.WithLabelValues("tenant")) + require.NoError(t, err) + require.Equal(t, 1.0, actualAll) + + actualSLO, err := test.GetCounterValue(sloCounter.WithLabelValues("tenant")) + require.NoError(t, err) + if tt.withInSLO { + require.Equal(t, 1.0, actualSLO) + } else { + require.Equal(t, 0.0, actualSLO) + } + }) + } +} diff --git a/modules/frontend/tag_handlers_test.go b/modules/frontend/tag_handlers_test.go index 23f46147f83..77915e55b72 100644 --- a/modules/frontend/tag_handlers_test.go +++ b/modules/frontend/tag_handlers_test.go @@ -128,7 +128,7 @@ func runnerTagsV2ClientCancelContext(t *testing.T, f *QueryFrontend) { }() grpcReq := &tempopb.SearchTagsRequest{} err := f.streamingTagsV2(grpcReq, srv) - require.Equal(t, status.Error(codes.Internal, "context canceled"), err) + require.Equal(t, status.Error(codes.Canceled, "context canceled"), err) } func runnerTagValuesV2ClientCancelContext(t *testing.T, f *QueryFrontend) { @@ -160,7 +160,7 @@ func runnerTagValuesV2ClientCancelContext(t *testing.T, f *QueryFrontend) { TagName: "foo", } err := f.streamingTagValuesV2(grpcReq, srv) - require.Equal(t, status.Error(codes.Internal, "context canceled"), err) + require.Equal(t, status.Error(codes.Canceled, "context canceled"), err) } // todo: a lot of code is replicated between all of these "failure propagates from queriers" tests. we should refactor diff --git a/modules/querier/querier_query_range.go b/modules/querier/querier_query_range.go index bee13a3a155..cd2b18f7284 100644 --- a/modules/querier/querier_query_range.go +++ b/modules/querier/querier_query_range.go @@ -50,7 +50,7 @@ func (q *Querier) queryRangeRecent(ctx context.Context, req *tempopb.QueryRangeR } err = q.forGivenGenerators(ctx, replicationSet, forEach) if err != nil { - _ = level.Error(log.Logger).Log("error querying generators in Querier.queryRangeRecent", "err", err) + _ = level.Error(log.Logger).Log("msg", "error querying generators in Querier.queryRangeRecent", "err", err) return nil, fmt.Errorf("error querying generators in Querier.queryRangeRecent: %w", err) } diff --git a/pkg/util/errors.go b/pkg/util/errors.go index 626325ac51c..56c041ab0a4 100644 --- a/pkg/util/errors.go +++ b/pkg/util/errors.go @@ -3,34 +3,26 @@ package util import ( "errors" "fmt" - - "google.golang.org/grpc/codes" - "google.golang.org/grpc/status" + "net/http" ) var ( // ErrTraceNotFound can be used when we don't find a trace ErrTraceNotFound = errors.New("trace not found") - // ErrSearchKeyValueNotFound is used to indicate the requested key/value pair was not found. - ErrSearchKeyValueNotFound = errors.New("key/value not found") + // StatusClientClosedRequest is the status code for when a client request cancellation of an http request + StatusClientClosedRequest = 499 + StatusTextClientClosedRequest = "Request Cancelled" ErrUnsupported = fmt.Errorf("unsupported") ) -// IsConnCanceled returns true, if error is from a closed gRPC connection. -// copied from https://github.com/etcd-io/etcd/blob/7f47de84146bdc9225d2080ec8678ca8189a2d2b/clientv3/client.go#L646 -func IsConnCanceled(err error) bool { - if err == nil { - return false - } - - // >= gRPC v1.23.x - s, ok := status.FromError(err) - if ok { - // connection is canceled or server has already closed the connection - return s.Code() == codes.Canceled || s.Message() == "transport is closing" +func StatusText(code int) string { + switch code { + case StatusClientClosedRequest: + // 499 doesn't have status text in http package, so we define it here + return StatusTextClientClosedRequest + default: + return http.StatusText(code) } - - return false }