Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: implement IsRetryableErr for S3ObjectClient #14174

Merged
merged 5 commits into from
Sep 19, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
63 changes: 61 additions & 2 deletions pkg/storage/chunk/client/aws/s3_storage_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,12 +21,15 @@ import (
"github.com/aws/aws-sdk-go/service/s3"
"github.com/aws/aws-sdk-go/service/s3/s3iface"
awscommon "github.com/grafana/dskit/aws"

"github.com/grafana/dskit/backoff"
"github.com/grafana/dskit/flagext"
"github.com/grafana/dskit/instrument"
"github.com/pkg/errors"
"github.com/prometheus/client_golang/prometheus"

amnet "k8s.io/apimachinery/pkg/util/net"

bucket_s3 "github.com/grafana/loki/v3/pkg/storage/bucket/s3"
"github.com/grafana/loki/v3/pkg/storage/chunk/client"
"github.com/grafana/loki/v3/pkg/storage/chunk/client/hedging"
Expand Down Expand Up @@ -532,5 +535,61 @@ func (a *S3ObjectClient) IsObjectNotFoundErr(err error) bool {
return false
}

// TODO(dannyk): implement for client
func (a *S3ObjectClient) IsRetryableErr(error) bool { return false }
func isTimeoutError(err error) bool {
var netErr net.Error
return errors.As(err, &netErr) && netErr.Timeout()
}

func isContextErr(err error) bool {
return errors.Is(err, context.DeadlineExceeded) ||
errors.Is(err, context.Canceled)
}

// IsStorageTimeoutErr returns true if error means that object cannot be retrieved right now due to server-side timeouts.
func (a *S3ObjectClient) IsStorageTimeoutErr(err error) bool {
// TODO(dannyk): move these out to be generic
// context errors are all client-side
if isContextErr(err) {
return false
}

// connection misconfiguration, or writing on a closed connection
// do NOT retry; this is not a server-side issue
if errors.Is(err, net.ErrClosed) || amnet.IsConnectionRefused(err) {
return false
}

// this is a server-side timeout
if isTimeoutError(err) {
return true
}

// connection closed (closed before established) or reset (closed after established)
// this is a server-side issue
if errors.Is(err, io.EOF) || amnet.IsConnectionReset(err) {
return true
}

if rerr, ok := err.(awserr.RequestFailure); ok {
// https://docs.aws.amazon.com/sdkref/latest/guide/feature-retry-behavior.html
return rerr.StatusCode() == http.StatusRequestTimeout ||
rerr.StatusCode() == http.StatusGatewayTimeout
}

return false
}

// IsStorageThrottledErr returns true if error means that object cannot be retrieved right now due to throttling.
func (a *S3ObjectClient) IsStorageThrottledErr(err error) bool {
if rerr, ok := err.(awserr.RequestFailure); ok {

// https://docs.aws.amazon.com/sdkref/latest/guide/feature-retry-behavior.html
return rerr.StatusCode() == http.StatusTooManyRequests ||
(rerr.StatusCode()/100 == 5) // all 5xx errors are retryable
}

return false
}
func (a *S3ObjectClient) IsRetryableErr(err error) bool {
return a.IsStorageTimeoutErr(err) || a.IsStorageThrottledErr(err)
}
104 changes: 104 additions & 0 deletions pkg/storage/chunk/client/aws/s3_storage_client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,9 +6,11 @@ import (
"errors"
"fmt"
"io"
"net"
"net/http"
"net/http/httptest"
"strings"
"syscall"
"testing"
"time"

Expand Down Expand Up @@ -73,6 +75,108 @@ func TestIsObjectNotFoundErr(t *testing.T) {
}
}

func TestIsRetryableErr(t *testing.T) {
tests := []struct {
err error
expected bool
name string
}{
{
name: "IsStorageThrottledErr - Too Many Requests",
err: awserr.NewRequestFailure(
awserr.New("TooManyRequests", "TooManyRequests", nil), 429, "reqId",
),
expected: true,
},
{
name: "IsStorageThrottledErr - 500",
err: awserr.NewRequestFailure(
awserr.New("500", "500", nil), 500, "reqId",
),
expected: true,
},
DylanGuedes marked this conversation as resolved.
Show resolved Hide resolved
{
name: "IsStorageThrottledErr - 5xx",
err: awserr.NewRequestFailure(
awserr.New("501", "501", nil), 501, "reqId",
),
expected: true,
},
{
name: "IsStorageTimeoutErr - Request Timeout",
err: awserr.NewRequestFailure(
awserr.New("Request Timeout", "Request Timeout", nil), 408, "reqId",
),
expected: true,
},
{
name: "IsStorageTimeoutErr - Gateway Timeout",
err: awserr.NewRequestFailure(
awserr.New("Gateway Timeout", "Gateway Timeout", nil), 504, "reqId",
),
expected: true,
},
{
name: "IsStorageTimeoutErr - EOF",
err: io.EOF,
expected: true,
},
{
name: "IsStorageTimeoutErr - Connection Reset",
err: syscall.ECONNRESET,
expected: true,
},
{
name: "IsStorageTimeoutErr - Timeout Error",
err: awserr.NewRequestFailure(
awserr.New("RequestCanceled", "request canceled due to timeout", nil), 408, "request-id",
),
expected: true,
},
{
name: "IsStorageTimeoutErr - Closed",
err: net.ErrClosed,
expected: false,
},
{
name: "IsStorageTimeoutErr - Connection Refused",
err: syscall.ECONNREFUSED,
expected: false,
},
{
name: "IsStorageTimeoutErr - Context Deadline Exceeded",
err: context.DeadlineExceeded,
expected: false,
},
{
name: "IsStorageTimeoutErr - Context Canceled",
err: context.Canceled,
expected: false,
},
{
name: "Not a retryable error",
err: syscall.EINVAL,
expected: false,
},
{
name: "Not found 404",
err: awserr.NewRequestFailure(
awserr.New("404", "404", nil), 404, "reqId",
),
expected: false,
},
}

for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
client, err := NewS3ObjectClient(S3Config{BucketNames: "mybucket"}, hedging.Config{})
require.NoError(t, err)

require.Equal(t, tt.expected, client.IsRetryableErr(tt.err))
})
}
}

func TestRequestMiddleware(t *testing.T) {
ts := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
fmt.Fprintln(w, r.Header.Get("echo-me"))
Expand Down
Loading