From fccb91c77abc2e36850d0b95bbc5955e9adf57df Mon Sep 17 00:00:00 2001 From: Paul Rogers Date: Thu, 5 Sep 2024 10:51:51 -0400 Subject: [PATCH 1/5] Add retry support for ObjectExists in S3 --- .../chunk/client/aws/s3_storage_client.go | 31 +++++++--- .../client/aws/s3_storage_client_test.go | 58 +++++++++++++++++++ 2 files changed, 80 insertions(+), 9 deletions(-) diff --git a/pkg/storage/chunk/client/aws/s3_storage_client.go b/pkg/storage/chunk/client/aws/s3_storage_client.go index 11696f67eddb..b96a10965784 100644 --- a/pkg/storage/chunk/client/aws/s3_storage_client.go +++ b/pkg/storage/chunk/client/aws/s3_storage_client.go @@ -307,16 +307,29 @@ func buckets(cfg S3Config) ([]string, error) { func (a *S3ObjectClient) Stop() {} func (a *S3ObjectClient) ObjectExists(ctx context.Context, objectKey string) (bool, error) { - err := instrument.CollectedRequest(ctx, "S3.ObjectExists", s3RequestDuration, instrument.ErrorCode, func(ctx context.Context) error { - headObjectInput := &s3.HeadObjectInput{ - Bucket: aws.String(a.bucketFromKey(objectKey)), - Key: aws.String(objectKey), + var lastErr error + + retries := backoff.New(ctx, a.cfg.BackoffConfig) + for retries.Ongoing() { + if ctx.Err() != nil { + return false, errors.Wrap(ctx.Err(), "ctx related error during s3 getObject") } - _, err := a.S3.HeadObject(headObjectInput) - return err - }) - if err != nil { - return false, err + lastErr = instrument.CollectedRequest(ctx, "S3.ObjectExists", s3RequestDuration, instrument.ErrorCode, func(ctx context.Context) error { + headObjectInput := &s3.HeadObjectInput{ + Bucket: aws.String(a.bucketFromKey(objectKey)), + Key: aws.String(objectKey), + } + _, requestErr := a.S3.HeadObject(headObjectInput) + return requestErr + }) + if lastErr == nil { + return true, nil + } + retries.Wait() + } + + if lastErr != nil { + return false, lastErr } return true, nil diff --git a/pkg/storage/chunk/client/aws/s3_storage_client_test.go b/pkg/storage/chunk/client/aws/s3_storage_client_test.go index d966f1a2f9f9..dfa5cd48661e 100644 --- a/pkg/storage/chunk/client/aws/s3_storage_client_test.go +++ b/pkg/storage/chunk/client/aws/s3_storage_client_test.go @@ -196,6 +196,64 @@ func Test_Hedging(t *testing.T) { } } +func Test_RetryLogic(t *testing.T) { + for _, tc := range []struct { + name string + maxRetries int + do func(c *S3ObjectClient) error + }{ + { + "get object with retries", + 3, + func(c *S3ObjectClient) error { + _, _, err := c.GetObject(context.Background(), "foo") + return err + }, + }, + { + "object exists with retries", + 3, + func(c *S3ObjectClient) error { + _, err := c.ObjectExists(context.Background(), "foo") + return err + }, + }, + } { + t.Run(tc.name, func(t *testing.T) { + callCount := atomic.NewInt32(0) + + c, err := NewS3ObjectClient(S3Config{ + AccessKeyID: "foo", + SecretAccessKey: flagext.SecretWithValue("bar"), + BackoffConfig: backoff.Config{MaxRetries: tc.maxRetries}, + BucketNames: "foo", + Inject: func(next http.RoundTripper) http.RoundTripper { + return RoundTripperFunc(func(req *http.Request) (*http.Response, error) { + // Increment the call counter + callNum := callCount.Inc() + + // Fail the first set of calls + if int(callNum) <= tc.maxRetries-1 { + time.Sleep(200 * time.Millisecond) // Simulate latency + return nil, errors.New("simulated error on call") + } + + // Succeed on the last call + return &http.Response{ + StatusCode: http.StatusOK, + Body: io.NopCloser(bytes.NewReader([]byte("object content"))), + }, nil + }) + }, + }, hedging.Config{}) + require.NoError(t, err) + err = tc.do(c) + require.NoError(t, err) + require.Equal(t, tc.maxRetries, int(callCount.Load())) + }) + } +} + func Test_ConfigRedactsCredentials(t *testing.T) { underTest := S3Config{ AccessKeyID: "access key id", From 00e47f83d56f22c60fd2c506e0fe921f2cd409f2 Mon Sep 17 00:00:00 2001 From: Paul Rogers Date: Thu, 5 Sep 2024 12:45:30 -0400 Subject: [PATCH 2/5] Text updates, start of discerning 404 vs other errors --- .../chunk/client/aws/s3_storage_client.go | 4 ++-- .../client/aws/s3_storage_client_test.go | 24 +++++++++++++++++-- 2 files changed, 24 insertions(+), 4 deletions(-) diff --git a/pkg/storage/chunk/client/aws/s3_storage_client.go b/pkg/storage/chunk/client/aws/s3_storage_client.go index b96a10965784..a6d9501616f1 100644 --- a/pkg/storage/chunk/client/aws/s3_storage_client.go +++ b/pkg/storage/chunk/client/aws/s3_storage_client.go @@ -124,7 +124,7 @@ func (cfg *S3Config) RegisterFlagsWithPrefix(prefix string, f *flag.FlagSet) { f.DurationVar(&cfg.BackoffConfig.MinBackoff, prefix+"s3.min-backoff", 100*time.Millisecond, "Minimum backoff time when s3 get Object") f.DurationVar(&cfg.BackoffConfig.MaxBackoff, prefix+"s3.max-backoff", 3*time.Second, "Maximum backoff time when s3 get Object") - f.IntVar(&cfg.BackoffConfig.MaxRetries, prefix+"s3.max-retries", 5, "Maximum number of times to retry when s3 get Object") + f.IntVar(&cfg.BackoffConfig.MaxRetries, prefix+"s3.max-retries", 5, "Maximum number of times to retry for s3 GetObject or ObjectExists") } // Validate config and returns error on failure @@ -312,7 +312,7 @@ func (a *S3ObjectClient) ObjectExists(ctx context.Context, objectKey string) (bo retries := backoff.New(ctx, a.cfg.BackoffConfig) for retries.Ongoing() { if ctx.Err() != nil { - return false, errors.Wrap(ctx.Err(), "ctx related error during s3 getObject") + return false, errors.Wrap(ctx.Err(), "ctx related error during s3 objectExists") } lastErr = instrument.CollectedRequest(ctx, "S3.ObjectExists", s3RequestDuration, instrument.ErrorCode, func(ctx context.Context) error { headObjectInput := &s3.HeadObjectInput{ diff --git a/pkg/storage/chunk/client/aws/s3_storage_client_test.go b/pkg/storage/chunk/client/aws/s3_storage_client_test.go index dfa5cd48661e..361441df77a4 100644 --- a/pkg/storage/chunk/client/aws/s3_storage_client_test.go +++ b/pkg/storage/chunk/client/aws/s3_storage_client_test.go @@ -200,11 +200,13 @@ func Test_RetryLogic(t *testing.T) { for _, tc := range []struct { name string maxRetries int + exists bool do func(c *S3ObjectClient) error }{ { "get object with retries", 3, + true, func(c *S3ObjectClient) error { _, _, err := c.GetObject(context.Background(), "foo") return err @@ -213,6 +215,16 @@ func Test_RetryLogic(t *testing.T) { { "object exists with retries", 3, + true, + func(c *S3ObjectClient) error { + _, err := c.ObjectExists(context.Background(), "foo") + return err + }, + }, + { + "object doesn't exist with retries", + 3, + false, func(c *S3ObjectClient) error { _, err := c.ObjectExists(context.Background(), "foo") return err @@ -231,6 +243,9 @@ func Test_RetryLogic(t *testing.T) { return RoundTripperFunc(func(req *http.Request) (*http.Response, error) { // Increment the call counter callNum := callCount.Inc() + if !tc.exists { + return nil, awserr.New(s3.ErrCodeNoSuchKey, "NoSuchKey", nil) + } // Fail the first set of calls if int(callNum) <= tc.maxRetries-1 { @@ -248,8 +263,13 @@ func Test_RetryLogic(t *testing.T) { }, hedging.Config{}) require.NoError(t, err) err = tc.do(c) - require.NoError(t, err) - require.Equal(t, tc.maxRetries, int(callCount.Load())) + if tc.exists { + require.NoError(t, err) + require.Equal(t, tc.maxRetries, int(callCount.Load())) + } else { + //require.True(t, errors.As(err, ¬FoundErr)) + require.Equal(t, 1, int(callCount.Load())) + } }) } } From 5371da6b7b8910145fb6a3b2e02f2bc51e9e0fd2 Mon Sep 17 00:00:00 2001 From: Paul Rogers Date: Thu, 5 Sep 2024 13:04:45 -0400 Subject: [PATCH 3/5] Hacky way of testing error --- pkg/storage/chunk/client/aws/s3_storage_client.go | 6 ++++++ 1 file changed, 6 insertions(+) diff --git a/pkg/storage/chunk/client/aws/s3_storage_client.go b/pkg/storage/chunk/client/aws/s3_storage_client.go index a6d9501616f1..b4a63216d862 100644 --- a/pkg/storage/chunk/client/aws/s3_storage_client.go +++ b/pkg/storage/chunk/client/aws/s3_storage_client.go @@ -325,6 +325,12 @@ func (a *S3ObjectClient) ObjectExists(ctx context.Context, objectKey string) (bo if lastErr == nil { return true, nil } + // AWS SDK v1 doesn't properly support error unwrapping, so we have to check the error message + // https://github.com/aws/aws-sdk-go/issues/2820#issuecomment-822767966 + if strings.Contains(lastErr.Error(), "NoSuchKey") { + return false, lastErr + } + retries.Wait() } From a22b36990ffa8b59a32a6a20fcbba3e6b628305e Mon Sep 17 00:00:00 2001 From: Paul Rogers Date: Thu, 5 Sep 2024 13:22:38 -0400 Subject: [PATCH 4/5] make doc --- docs/sources/shared/configuration.md | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/docs/sources/shared/configuration.md b/docs/sources/shared/configuration.md index 75b3e85749e7..171845994227 100644 --- a/docs/sources/shared/configuration.md +++ b/docs/sources/shared/configuration.md @@ -1562,7 +1562,7 @@ backoff_config: # CLI flag: -s3.max-backoff [max_period: | default = 3s] - # Maximum number of times to retry when s3 get Object + # Maximum number of times to retry for s3 GetObject or ObjectExists # CLI flag: -s3.max-retries [max_retries: | default = 5] @@ -5430,7 +5430,7 @@ backoff_config: # CLI flag: -.storage.s3.max-backoff [max_period: | default = 3s] - # Maximum number of times to retry when s3 get Object + # Maximum number of times to retry for s3 GetObject or ObjectExists # CLI flag: -.storage.s3.max-retries [max_retries: | default = 5] From 5461e19820b91f00aebb485ba2fa29dc61566147 Mon Sep 17 00:00:00 2001 From: Paul Rogers Date: Thu, 5 Sep 2024 15:16:52 -0400 Subject: [PATCH 5/5] Mock around the issue of the underlying error getting wrapped weirdly --- .../chunk/client/aws/s3_storage_client.go | 5 ++- .../client/aws/s3_storage_client_test.go | 36 ++++++++++++++++--- 2 files changed, 34 insertions(+), 7 deletions(-) diff --git a/pkg/storage/chunk/client/aws/s3_storage_client.go b/pkg/storage/chunk/client/aws/s3_storage_client.go index b4a63216d862..4785d4667bd3 100644 --- a/pkg/storage/chunk/client/aws/s3_storage_client.go +++ b/pkg/storage/chunk/client/aws/s3_storage_client.go @@ -325,9 +325,8 @@ func (a *S3ObjectClient) ObjectExists(ctx context.Context, objectKey string) (bo if lastErr == nil { return true, nil } - // AWS SDK v1 doesn't properly support error unwrapping, so we have to check the error message - // https://github.com/aws/aws-sdk-go/issues/2820#issuecomment-822767966 - if strings.Contains(lastErr.Error(), "NoSuchKey") { + + if a.IsObjectNotFoundErr(lastErr) { return false, lastErr } diff --git a/pkg/storage/chunk/client/aws/s3_storage_client_test.go b/pkg/storage/chunk/client/aws/s3_storage_client_test.go index 361441df77a4..1cf020e7b9ec 100644 --- a/pkg/storage/chunk/client/aws/s3_storage_client_test.go +++ b/pkg/storage/chunk/client/aws/s3_storage_client_test.go @@ -196,6 +196,15 @@ func Test_Hedging(t *testing.T) { } } +type MockS3Client struct { + s3.S3 + HeadObjectFunc func(*s3.HeadObjectInput) (*s3.HeadObjectOutput, error) +} + +func (m *MockS3Client) HeadObject(input *s3.HeadObjectInput) (*s3.HeadObjectOutput, error) { + return m.HeadObjectFunc(input) +} + func Test_RetryLogic(t *testing.T) { for _, tc := range []struct { name string @@ -234,6 +243,27 @@ func Test_RetryLogic(t *testing.T) { t.Run(tc.name, func(t *testing.T) { callCount := atomic.NewInt32(0) + mockS3 := &MockS3Client{ + HeadObjectFunc: func(input *s3.HeadObjectInput) (*s3.HeadObjectOutput, error) { + callNum := callCount.Inc() + if !tc.exists { + rfIn := awserr.NewRequestFailure( + awserr.New("NotFound", "Not Found", nil), 404, "abc", + ) + return nil, rfIn + } + + // Fail the first set of calls + if int(callNum) <= tc.maxRetries-1 { + time.Sleep(200 * time.Millisecond) // Simulate latency + return nil, errors.New("simulated error on mock call") + } + + // Succeed on the last call + return &s3.HeadObjectOutput{}, nil + }, + } + c, err := NewS3ObjectClient(S3Config{ AccessKeyID: "foo", SecretAccessKey: flagext.SecretWithValue("bar"), @@ -243,9 +273,6 @@ func Test_RetryLogic(t *testing.T) { return RoundTripperFunc(func(req *http.Request) (*http.Response, error) { // Increment the call counter callNum := callCount.Inc() - if !tc.exists { - return nil, awserr.New(s3.ErrCodeNoSuchKey, "NoSuchKey", nil) - } // Fail the first set of calls if int(callNum) <= tc.maxRetries-1 { @@ -262,12 +289,13 @@ func Test_RetryLogic(t *testing.T) { }, }, hedging.Config{}) require.NoError(t, err) + c.S3 = mockS3 err = tc.do(c) if tc.exists { require.NoError(t, err) require.Equal(t, tc.maxRetries, int(callCount.Load())) } else { - //require.True(t, errors.As(err, ¬FoundErr)) + require.True(t, c.IsObjectNotFoundErr(err)) require.Equal(t, 1, int(callCount.Load())) } })