Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

slo: include all request cancellations within SLO #4355

Open
wants to merge 5 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
## main / unreleased
* [FEATURE] tempo-cli: support dropping multiple traces in a single operation [#4266](https://github.com/grafana/tempo/pull/4266) (@ndk)
* [CHANGE] slo: include all request cancellations within SLO [#4355] (https://github.com/grafana/tempo/pull/4355) (@electron0zero)
`tempo_query_frontend_queries_within_slo_total` is now more accurate by including all request cancellations within the SLO.
* [CHANGE] update default config values to better align with production workloads [#4340](https://github.com/grafana/tempo/pull/4340) (@electron0zero)
* [CHANGE] fix deprecation warning by switching to DoBatchWithOptions [#4343](https://github.com/grafana/tempo/pull/4343) (@dastrobu)
* [CHANGE] **BREAKING CHANGE** The Tempo serverless is now deprecated and will be removed in an upcoming release [#4017](https://github.com/grafana/tempo/pull/4017/) @electron0zero
Expand Down
17 changes: 13 additions & 4 deletions modules/frontend/combiner/common.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import (
"sync"

tempo_io "github.com/grafana/tempo/pkg/io"
"github.com/grafana/tempo/pkg/util"

"github.com/gogo/protobuf/jsonpb"
"github.com/gogo/protobuf/proto"
Expand Down Expand Up @@ -129,9 +130,14 @@ func (c *genericCombiner[T]) HTTPFinal() (*http.Response, error) {
c.mu.Lock()
defer c.mu.Unlock()

httpErr, _ := c.erroredResponse()
if httpErr != nil {
return httpErr, nil
// TODO: verify if this fixes the issue of HTTPFinal returning nil error??
httpResp, grpcErr := c.erroredResponse()
if grpcErr != nil {
// TODO: it's a gRPC error, maybe not a good idea to return it as is, turn into the HTTP error??
return nil, grpcErr
}
if httpResp != nil {
return httpResp, nil
}

final, err := c.finalize(c.current)
Expand Down Expand Up @@ -217,6 +223,9 @@ func (c *genericCombiner[T]) erroredResponse() (*http.Response, error) {
grpcErr = status.Error(codes.ResourceExhausted, c.httpRespBody)
case http.StatusBadRequest:
grpcErr = status.Error(codes.InvalidArgument, c.httpRespBody)
case util.StatusClientClosedRequest:
// HTTP 499 is mapped to codes.Canceled grpc error
grpcErr = status.Error(codes.Canceled, c.httpRespBody)
default:
if c.httpStatusCode/100 == 5 {
grpcErr = status.Error(codes.Internal, c.httpRespBody)
Expand All @@ -226,7 +235,7 @@ func (c *genericCombiner[T]) erroredResponse() (*http.Response, error) {
}
httpResp := &http.Response{
StatusCode: c.httpStatusCode,
Status: http.StatusText(c.httpStatusCode),
Status: util.StatusText(c.httpStatusCode),
Body: io.NopCloser(strings.NewReader(c.httpRespBody)),
}

Expand Down
13 changes: 13 additions & 0 deletions modules/frontend/combiner/common_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,10 +13,12 @@ import (
"github.com/gogo/status"
"github.com/grafana/tempo/pkg/api"
"github.com/grafana/tempo/pkg/tempopb"
"github.com/grafana/tempo/pkg/util"
"github.com/stretchr/testify/require"
"google.golang.org/grpc/codes"
)

// FIXME: add a test here for canceling the context
func TestErroredResponse(t *testing.T) {
tests := []struct {
name string
Expand Down Expand Up @@ -55,6 +57,17 @@ func TestErroredResponse(t *testing.T) {
},
expectedErr: status.Error(codes.InvalidArgument, "foo"),
},
{
name: "499",
statusCode: util.StatusClientClosedRequest,
respBody: "foo",
expectedResp: &http.Response{
StatusCode: util.StatusClientClosedRequest,
Status: util.StatusTextClientClosedRequest,
Body: io.NopCloser(strings.NewReader("foo")),
},
expectedErr: status.Error(codes.Canceled, "foo"),
},
}

for _, tc := range tests {
Expand Down
1 change: 1 addition & 0 deletions modules/frontend/combiner/trace_by_id.go
Original file line number Diff line number Diff line change
Expand Up @@ -95,6 +95,7 @@ func (c *traceByIDCombiner) AddResponse(r PipelineResponse) error {
return err
}

// TODO: verify if we return error correctly here??
func (c *traceByIDCombiner) HTTPFinal() (*http.Response, error) {
c.mu.Lock()
defer c.mu.Unlock()
Expand Down
18 changes: 6 additions & 12 deletions modules/frontend/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import (
"time"

"github.com/grafana/dskit/flagext"
"github.com/grafana/tempo/pkg/util"
"go.opentelemetry.io/otel/attribute"
"go.opentelemetry.io/otel/trace"

Expand All @@ -22,16 +23,14 @@ import (
)

const (
// StatusClientClosedRequest is the status code for when a client request cancellation of an http request
StatusClientClosedRequest = 499
// nil response in ServeHTTP
NilResponseError = "nil resp in ServeHTTP"
)

var (
errCanceled = httpgrpc.Errorf(StatusClientClosedRequest, context.Canceled.Error())
errDeadlineExceeded = httpgrpc.Errorf(http.StatusGatewayTimeout, context.DeadlineExceeded.Error())
errRequestEntityTooLarge = httpgrpc.Errorf(http.StatusRequestEntityTooLarge, "http: request body too large")
errCanceled = httpgrpc.Error(util.StatusClientClosedRequest, context.Canceled.Error())
errDeadlineExceeded = httpgrpc.Error(http.StatusGatewayTimeout, context.DeadlineExceeded.Error())
errRequestEntityTooLarge = httpgrpc.Error(http.StatusRequestEntityTooLarge, "http: request body too large")
)

// handler exists to wrap a roundtripper with an HTTP handler. It wraps all
Expand Down Expand Up @@ -88,7 +87,7 @@ func (f *handler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
logMessage = append(
logMessage,
"status", statusCode,
"err", err.Error(),
"error", err.Error(),
"response_size", 0,
)
level.Info(f.logger).Log(logMessage...)
Expand Down Expand Up @@ -161,14 +160,9 @@ func writeError(w http.ResponseWriter, err error) error {
err = errCanceled
} else if errors.Is(err, context.DeadlineExceeded) {
err = errDeadlineExceeded
} else if isRequestBodyTooLarge(err) {
} else if util.IsRequestBodyTooLarge(err) {
err = errRequestEntityTooLarge
}
httpgrpc.WriteError(w, err)
return err
}

// isRequestBodyTooLarge returns true if the error is "http: request body too large".
func isRequestBodyTooLarge(err error) bool {
return err != nil && strings.Contains(err.Error(), "http: request body too large")
}
30 changes: 27 additions & 3 deletions modules/frontend/pipeline/collector_grpc.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package pipeline

import (
"context"
"errors"
"net/http"
"time"

Expand All @@ -27,7 +28,8 @@ func NewGRPCCollector[T combiner.TResponse](next AsyncRoundTripper[combiner.Pipe
}
}

// Handle
// RoundTrip implements the http.RoundTripper interface
// FIXME: this RoundTrip should return an error if the req.Context is cancelled???
func (c GRPCCollector[T]) RoundTrip(req *http.Request) error {
ctx := req.Context()
ctx, cancel := context.WithCancel(ctx) // create a new context with a cancel function
Expand All @@ -41,11 +43,19 @@ func (c GRPCCollector[T]) RoundTrip(req *http.Request) error {

lastUpdate := time.Now()

err = consumeAndCombineResponses(ctx, c.consumers, resps, c.combiner, func() error {
// sendDiffCb should return an error if the context is cancelled,
// callback's error is used to exit early from the loop and return the error to the caller
sendDiffCb := func() error {
// check if we should send an update
if time.Since(lastUpdate) > 500*time.Millisecond {
lastUpdate = time.Now()

// check and return the request context has an error, maybe it was cancelled, etc
// TODO: verify if we need this error check? check if err from c.combiner.GRPCDiff() enough??
if req.Context().Err() != nil {
return req.Context().Err()
}

// send a diff only during streaming
resp, err := c.combiner.GRPCDiff()
if err != nil {
Expand All @@ -58,7 +68,9 @@ func (c GRPCCollector[T]) RoundTrip(req *http.Request) error {
}

return nil
})
}

err = consumeAndCombineResponses(ctx, c.consumers, resps, c.combiner, sendDiffCb)
if err != nil {
return grpcError(err)
}
Expand All @@ -68,6 +80,12 @@ func (c GRPCCollector[T]) RoundTrip(req *http.Request) error {
if err != nil {
return grpcError(err)
}

// TODO: verify if we need this error check? check if err from c.combiner.GRPCDiff() enough??
if req.Context().Err() != nil {
return grpcError(req.Context().Err())
}

err = c.send(resp)
if err != nil {
return grpcError(err)
Expand All @@ -83,5 +101,11 @@ func grpcError(err error) error {
return err
}

// if this is context cancelled, we return a grpc cancelled error
if errors.Is(err, context.Canceled) {
return status.Error(codes.Canceled, err.Error())
}

// rest all fall into internal server error
return status.Error(codes.Internal, err.Error())
}
3 changes: 2 additions & 1 deletion modules/frontend/pipeline/collector_http.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,8 @@ func NewHTTPCollector(next AsyncRoundTripper[combiner.PipelineResponse], consume
}
}

// Handle
// RoundTrip implements the http.RoundTripper interface
// TODO: this should always return an error if the req.Context is cancelled
func (r httpCollector) RoundTrip(req *http.Request) (*http.Response, error) {
ctx, cancel := context.WithCancel(req.Context())
defer cancel()
Expand Down
2 changes: 1 addition & 1 deletion modules/frontend/search_handlers_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -295,7 +295,7 @@ func runnerClientCancelContext(t *testing.T, f *QueryFrontend) {
}()
grpcReq := &tempopb.SearchRequest{}
err := f.streamingSearch(grpcReq, srv)
require.Equal(t, status.Error(codes.Internal, "context canceled"), err)
require.Equal(t, status.Error(codes.Canceled, "context canceled"), err)
}

func TestSearchLimitHonored(t *testing.T) {
Expand Down
15 changes: 11 additions & 4 deletions modules/frontend/slos.go
Original file line number Diff line number Diff line change
@@ -1,10 +1,13 @@
package frontend

import (
"context"
"errors"
"net/http"
"time"

"github.com/gogo/status"
"github.com/grafana/tempo/pkg/util"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/promauto"
"google.golang.org/grpc/codes"
Expand Down Expand Up @@ -89,11 +92,15 @@ func sloHook(allByTenantCounter, withinSLOByTenantCounter *prometheus.CounterVec

// most errors are SLO violations
if err != nil {
// however, if this is a grpc resource exhausted error (429) or invalid argument (400) then we are within SLO
// However, gRPC resource exhausted error (429), invalid argument (400), not found (404) and
// request cancellations are considered within the SLO.
switch status.Code(err) {
case codes.ResourceExhausted,
codes.InvalidArgument,
codes.NotFound:
case codes.ResourceExhausted, codes.InvalidArgument, codes.NotFound, codes.Canceled:
withinSLOByTenantCounter.WithLabelValues(tenant).Inc()
}

// we don't always get a gRPC codes.Canceled status code, so check for context.Canceled and http 499 as well
if errors.Is(err, context.Canceled) || (resp != nil && resp.StatusCode == util.StatusClientClosedRequest) {
withinSLOByTenantCounter.WithLabelValues(tenant).Inc()
}
return
Expand Down
100 changes: 100 additions & 0 deletions modules/frontend/slos_test.go
Original file line number Diff line number Diff line change
@@ -1,13 +1,15 @@
package frontend

import (
"context"
"errors"
"io"
"net/http"
"strings"
"testing"
"time"

"github.com/grafana/tempo/pkg/util"
"github.com/grafana/tempo/pkg/util/test"
"github.com/prometheus/client_golang/prometheus"
"github.com/stretchr/testify/require"
Expand Down Expand Up @@ -165,3 +167,101 @@ func TestBadRequest(t *testing.T) {
require.Equal(t, 1.0, actualAll)
require.Equal(t, 1.0, actualSLO)
}

func TestCanceledRequest(t *testing.T) {
tests := []struct {
name string
statusCode int
err error
withInSLO bool
}{
{
name: "random error with http response has 499 status code",
statusCode: util.StatusClientClosedRequest,
err: errors.New("foo"),
withInSLO: true,
},
{
name: "context.Canceled error with http response has 499 status code",
statusCode: util.StatusClientClosedRequest,
err: context.Canceled,
withInSLO: true,
},
{
name: "context.Canceled error with 500 status code",
statusCode: http.StatusInternalServerError,
err: context.Canceled,
withInSLO: true,
},
{
name: "context.Canceled error with 200 status code",
statusCode: http.StatusOK,
err: context.Canceled,
withInSLO: true,
},
{
name: "grpc codes.Canceled error with 500 status code",
statusCode: http.StatusInternalServerError,
err: status.Error(codes.Canceled, "foo"),
withInSLO: true,
},
{
name: "grpc codes.Canceled error with 200 status code",
statusCode: http.StatusOK,
err: status.Error(codes.Canceled, "foo"),
withInSLO: true,
},
{
name: "no error with 200 status code",
statusCode: http.StatusOK,
err: nil,
withInSLO: false,
},
{
name: "no error with 500 status code",
statusCode: http.StatusInternalServerError,
err: nil,
withInSLO: false,
},
{
name: "no error with http response has 499 status code",
statusCode: util.StatusClientClosedRequest,
err: nil,
withInSLO: false,
},
}

for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
allCounter := prometheus.NewCounterVec(prometheus.CounterOpts{Name: "all"}, []string{"tenant"})
sloCounter := prometheus.NewCounterVec(prometheus.CounterOpts{Name: "slo"}, []string{"tenant"})
throughputVec := prometheus.NewHistogramVec(prometheus.HistogramOpts{Name: "throughput"}, []string{"tenant"})

hook := sloHook(allCounter, sloCounter, throughputVec, SLOConfig{
DurationSLO: 10 * time.Second,
ThroughputBytesSLO: 100,
})

res := &http.Response{
StatusCode: tt.statusCode,
Status: "context canceled",
Body: io.NopCloser(strings.NewReader("foo")),
}

// latency is below DurationSLO threshold
hook(res, "tenant", 0, 15*time.Second, tt.err)

actualAll, err := test.GetCounterValue(allCounter.WithLabelValues("tenant"))
require.NoError(t, err)
require.Equal(t, 1.0, actualAll)

actualSLO, err := test.GetCounterValue(sloCounter.WithLabelValues("tenant"))
require.NoError(t, err)
if tt.withInSLO {
require.Equal(t, 1.0, actualSLO)
} else {
require.Equal(t, 0.0, actualSLO)
}
})
}
}
Loading
Loading