Skip to content

Commit

Permalink
Fix 429s tripping SLOs (grafana#3469)
Browse files Browse the repository at this point in the history
* fix 429s tripping SLOs

Signed-off-by: Joe Elliott <number101010@gmail.com>

* changelog

Signed-off-by: Joe Elliott <number101010@gmail.com>

* test fix

Signed-off-by: Joe Elliott <number101010@gmail.com>

---------

Signed-off-by: Joe Elliott <number101010@gmail.com>
  • Loading branch information
joe-elliott authored Mar 6, 2024
1 parent 7540d57 commit ae083c3
Show file tree
Hide file tree
Showing 8 changed files with 38 additions and 11 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
* [BUGFIX] Fix metrics query results when filtering and rating on the same attribute [#3428](https://github.com/grafana/tempo/issues/3428) (@mdisibio)
* [BUGFIX] Fix metrics query results when series contain empty strings or nil values [#3429](https://github.com/grafana/tempo/issues/3429) (@mdisibio)
* [BUGFIX] Return unfiltered results when a bad TraceQL query is provided in autocomplete. [#3426](https://github.com/grafana/tempo/pull/3426) (@mapno)
* [BUGFIX] Correctly handle 429s in GRPC search streaming. [#3469](https://github.com/grafana/tempo/pull/3469) (@joe-ellitot)

## v2.4.0

Expand Down
2 changes: 2 additions & 0 deletions modules/frontend/combiner/common.go
Original file line number Diff line number Diff line change
Expand Up @@ -152,6 +152,8 @@ func (c *genericCombiner[T]) erroredResponse() (*http.Response, error) {
var grpcErr error
if c.httpStatusCode/100 == 5 {
grpcErr = status.Error(codes.Internal, c.httpRespBody)
} else if c.httpStatusCode == http.StatusTooManyRequests {
grpcErr = status.Error(codes.ResourceExhausted, c.httpRespBody)
} else {
grpcErr = status.Error(codes.InvalidArgument, c.httpRespBody)
}
Expand Down
2 changes: 1 addition & 1 deletion modules/frontend/combiner/search_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -167,7 +167,7 @@ func TestSearchResponseCombiner(t *testing.T) {
response1: toHTTPResponse(t, &tempopb.SearchResponse{Metrics: &tempopb.SearchMetrics{}}, 200),
response2: toHTTPResponse(t, nil, 429),
expectedStatus: 429,
expectedGRPCError: status.Error(codes.InvalidArgument, ""),
expectedGRPCError: status.Error(codes.ResourceExhausted, ""),
},
{
name: "500+404",
Expand Down
18 changes: 18 additions & 0 deletions modules/frontend/pipeline/sync_handler_retry.go
Original file line number Diff line number Diff line change
@@ -1,10 +1,14 @@
package pipeline

import (
"errors"
"fmt"
"io"
"net/http"
"strings"

"github.com/grafana/dskit/httpgrpc"
"github.com/grafana/tempo/modules/frontend/queue"
"github.com/opentracing/opentracing-go"
"github.com/opentracing/opentracing-go/ext"
ot_log "github.com/opentracing/opentracing-go/log"
Expand Down Expand Up @@ -64,11 +68,25 @@ func (r retryWare) RoundTrip(req *http.Request) (*http.Response, error) {
return resp, nil
}

/* ---- HTTP GRPC translation ---- */
// the following 2 blocks translate httpgrpc errors into something
// the rest of the http pipeline can understand. these really should
// be there own pipeline item independent of the retry middleware, but
// they've always been here and for safety we're not moving them right now.
if errors.Is(err, queue.ErrTooManyRequests) {
return &http.Response{
StatusCode: http.StatusTooManyRequests,
Status: http.StatusText(http.StatusTooManyRequests),
Body: io.NopCloser(strings.NewReader("job queue full")),
}, nil
}

// do not retry if GRPC error contains response that is not HTTP 5xx
httpResp, ok := httpgrpc.HTTPResponseFromError(err)
if ok && !shouldRetry(int(httpResp.Code)) {
return resp, err
}
/* ---- HTTP GRPC translation ---- */

// reached max retries
tries++
Expand Down
2 changes: 1 addition & 1 deletion modules/frontend/search_handlers_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -448,7 +448,7 @@ func TestSearchFailurePropagatesFromQueriers(t *testing.T) {
querierMessage: "too fast!",
expectedCode: 429,
expectedMessage: "too fast!",
expectedErr: status.Error(codes.InvalidArgument, "too fast!"),
expectedErr: status.Error(codes.ResourceExhausted, "too fast!"),
},
}

Expand Down
8 changes: 7 additions & 1 deletion modules/frontend/slos.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,10 @@ import (
"net/http"
"time"

"github.com/gogo/status"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/promauto"
"google.golang.org/grpc/codes"
)

const (
Expand Down Expand Up @@ -62,8 +64,12 @@ func sloHook(allByTenantCounter, withinSLOByTenantCounter *prometheus.CounterVec
// first record all queries
allByTenantCounter.WithLabelValues(tenant).Inc()

// now check conditions to see if we should record within SLO
// most errors are SLO violations
if err != nil {
// however, if this is a grpc resource exhausted error (429) then we are within SLO
if status.Code(err) == codes.ResourceExhausted {
withinSLOByTenantCounter.WithLabelValues(tenant).Inc()
}
return
}

Expand Down
7 changes: 7 additions & 0 deletions modules/frontend/slos_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,8 @@ import (
"github.com/grafana/tempo/pkg/util/test"
"github.com/prometheus/client_golang/prometheus"
"github.com/stretchr/testify/require"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"
)

func TestSLOHook(t *testing.T) {
Expand All @@ -30,6 +32,11 @@ func TestSLOHook(t *testing.T) {
name: "no slo fails : error",
err: errors.New("foo"),
},
{
name: "no slo passes : resource exhausted grpc error",
err: status.Error(codes.ResourceExhausted, "foo"),
expectedWithSLO: 1.0,
},
{
name: "no slo fails : 5XX status code",
httpStatusCode: http.StatusInternalServerError,
Expand Down
9 changes: 1 addition & 8 deletions modules/frontend/v1/frontend.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@ import (
"errors"
"flag"
"fmt"
"net/http"
"time"

"github.com/grafana/dskit/flagext"
Expand All @@ -26,8 +25,6 @@ import (
"github.com/grafana/tempo/pkg/validation"
)

var errTooManyRequest = httpgrpc.Errorf(http.StatusTooManyRequests, "too many outstanding requests")

// Config for a Frontend.
type Config struct {
MaxOutstandingPerTenant int `yaml:"max_outstanding_per_tenant"`
Expand Down Expand Up @@ -377,11 +374,7 @@ func (f *Frontend) queueRequest(ctx context.Context, req *request) error {
joinedTenantID := tenant.JoinTenantIDs(tenantIDs)
f.activeUsers.UpdateUserTimestamp(joinedTenantID, now)

err = f.requestQueue.EnqueueRequest(joinedTenantID, req, maxQueriers)
if errors.Is(err, queue.ErrTooManyRequests) {
return errTooManyRequest
}
return err
return f.requestQueue.EnqueueRequest(joinedTenantID, req, maxQueriers)
}

// CheckReady determines if the query frontend is ready. Function parameters/return
Expand Down

0 comments on commit ae083c3

Please sign in to comment.