Skip to content

Commit

Permalink
fix(artifacts): only retry on transient S3 errors (#5579)
Browse files Browse the repository at this point in the history
  • Loading branch information
dinever authored Apr 13, 2021
1 parent defbd60 commit b7d6905
Show file tree
Hide file tree
Showing 4 changed files with 486 additions and 63 deletions.
29 changes: 29 additions & 0 deletions workflow/artifacts/s3/errors.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
package s3

import argos3 "github.com/argoproj/pkg/s3"

// s3TransientErrorCodes is a list of S3 error codes that are transient (retryable)
// Reference: https://github.com/minio/minio-go/blob/92fe50d14294782d96402deb861d442992038109/retry.go#L90-L102
var s3TransientErrorCodes = []string{
"InternalError",
"RequestTimeout",
"Throttling",
"ThrottlingException",
"RequestLimitExceeded",
"RequestThrottled",
"InternalError",
"SlowDown",
}

// isTransientS3Err checks if an minio.ErrorResponse error is transient (retryable)
func isTransientS3Err(err error) bool {
if err == nil {
return false
}
for _, transientErrCode := range s3TransientErrorCodes {
if argos3.IsS3ErrCode(err, transientErrCode) {
return true
}
}
return false
}
22 changes: 22 additions & 0 deletions workflow/artifacts/s3/errors_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
package s3

import (
"errors"
"testing"

"github.com/minio/minio-go/v7"
"github.com/stretchr/testify/assert"
)

func TestIsTransientOSSErr(t *testing.T) {
for _, errCode := range s3TransientErrorCodes {
err := minio.ErrorResponse{Code: errCode}
assert.True(t, isTransientS3Err(err))
}

err := minio.ErrorResponse{Code: "NoSuchBucket"}
assert.False(t, isTransientS3Err(err))

nonOSSErr := errors.New("UnseenError")
assert.False(t, isTransientS3Err(nonOSSErr))
}
137 changes: 74 additions & 63 deletions workflow/artifacts/s3/s3.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package s3

import (
"context"
"fmt"
"os"
"time"

Expand All @@ -13,6 +14,7 @@ import (

"github.com/argoproj/argo-workflows/v3/errors"
wfv1 "github.com/argoproj/argo-workflows/v3/pkg/apis/workflow/v1alpha1"
waitutil "github.com/argoproj/argo-workflows/v3/util/wait"
artifactscommon "github.com/argoproj/argo-workflows/v3/workflow/artifacts/common"
"github.com/argoproj/argo-workflows/v3/workflow/common"
)
Expand All @@ -29,7 +31,10 @@ type ArtifactDriver struct {
Context context.Context
}

var _ artifactscommon.ArtifactDriver = &ArtifactDriver{}
var (
_ artifactscommon.ArtifactDriver = &ArtifactDriver{}
defaultRetry = wait.Backoff{Duration: time.Second * 2, Factor: 2.0, Steps: 5, Jitter: 0.1}
)

// newMinioClient instantiates a new minio client object.
func (s3Driver *ArtifactDriver) newS3Client(ctx context.Context) (argos3.S3Client, error) {
Expand All @@ -51,104 +56,110 @@ func (s3Driver *ArtifactDriver) Load(inputArtifact *wfv1.Artifact, path string)
ctx, cancel := context.WithCancel(context.Background())
defer cancel()

err := wait.ExponentialBackoff(wait.Backoff{Duration: time.Second * 2, Factor: 2.0, Steps: 5, Jitter: 0.1},
err := waitutil.Backoff(defaultRetry,
func() (bool, error) {
log.Infof("S3 Load path: %s, key: %s", path, inputArtifact.S3.Key)
s3cli, err := s3Driver.newS3Client(ctx)
if err != nil {
log.Warnf("Failed to create new S3 client: %v", err)
return false, nil
}
origErr := s3cli.GetFile(inputArtifact.S3.Bucket, inputArtifact.S3.Key, path)
if origErr == nil {
return true, nil
}
if !argos3.IsS3ErrCode(origErr, "NoSuchKey") {
log.Warnf("Failed get file: %v", origErr)
return false, nil
}
// If we get here, the error was a NoSuchKey. The key might be a s3 "directory"
isDir, err := s3cli.IsDirectory(inputArtifact.S3.Bucket, inputArtifact.S3.Key)
if err != nil {
log.Warnf("Failed to test if %s is a directory: %v", inputArtifact.S3.Bucket, err)
return false, nil
}
if !isDir {
// It's neither a file, nor a directory. Return the original NoSuchKey error
return false, errors.New(errors.CodeNotFound, origErr.Error())
}

if err = s3cli.GetDirectory(inputArtifact.S3.Bucket, inputArtifact.S3.Key, path); err != nil {
log.Warnf("Failed get directory: %v", err)
return false, nil
return !isTransientS3Err(err), fmt.Errorf("failed to create new S3 client: %v", err)
}
return true, nil
return loadS3Artifact(s3cli, inputArtifact, path)
})

return err
}

// loadS3Artifact downloads artifacts from an S3 compliant storage
// returns true if the download is completed or can't be retried (non-transient error)
// returns false if it can be retried (transient error)
func loadS3Artifact(s3cli argos3.S3Client, inputArtifact *wfv1.Artifact, path string) (bool, error) {
origErr := s3cli.GetFile(inputArtifact.S3.Bucket, inputArtifact.S3.Key, path)
if origErr == nil {
return true, nil
}
if !argos3.IsS3ErrCode(origErr, "NoSuchKey") {
return !isTransientS3Err(origErr), fmt.Errorf("failed to get file: %v", origErr)
}
// If we get here, the error was a NoSuchKey. The key might be a s3 "directory"
isDir, err := s3cli.IsDirectory(inputArtifact.S3.Bucket, inputArtifact.S3.Key)
if err != nil {
return !isTransientS3Err(err), fmt.Errorf("failed to test if %s is a directory: %v", inputArtifact.S3.Key, err)
}
if !isDir {
// It's neither a file, nor a directory. Return the original NoSuchKey error
return true, errors.New(errors.CodeNotFound, origErr.Error())
}

if err = s3cli.GetDirectory(inputArtifact.S3.Bucket, inputArtifact.S3.Key, path); err != nil {
return !isTransientS3Err(err), fmt.Errorf("failed to get directory: %v", err)
}
return true, nil
}

// Save saves an artifact to S3 compliant storage
func (s3Driver *ArtifactDriver) Save(path string, outputArtifact *wfv1.Artifact) error {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()

err := wait.ExponentialBackoff(wait.Backoff{Duration: time.Second * 2, Factor: 2.0, Steps: 5, Jitter: 0.1},
err := waitutil.Backoff(defaultRetry,
func() (bool, error) {
log.Infof("S3 Save path: %s, key: %s", path, outputArtifact.S3.Key)
s3cli, err := s3Driver.newS3Client(ctx)
if err != nil {
log.Warnf("Failed to create new S3 client: %v", err)
return false, nil
}
isDir, err := file.IsDirectory(path)
if err != nil {
log.Warnf("Failed to test if %s is a directory: %v", path, err)
return false, nil
return !isTransientS3Err(err), fmt.Errorf("failed to create new S3 client: %v", err)
}
return saveS3Artifact(s3cli, path, outputArtifact)
})
return err
}

createBucketIfNotPresent := outputArtifact.S3.CreateBucketIfNotPresent
if createBucketIfNotPresent != nil {
log.Infof("Trying to create bucket: %s", outputArtifact.S3.Bucket)
err := s3cli.MakeBucket(outputArtifact.S3.Bucket, minio.MakeBucketOptions{
Region: outputArtifact.S3.Region,
ObjectLocking: outputArtifact.S3.CreateBucketIfNotPresent.ObjectLocking,
})
if err != nil {
log.Warnf("Failed to create bucket: %v. Error: %v", outputArtifact.S3.Bucket, err)
}
}
// saveS3Artifact uploads artifacts to an S3 compliant storage
// returns true if the upload is completed or can't be retried (non-transient error)
// returns false if it can be retried (transient error)
func saveS3Artifact(s3cli argos3.S3Client, path string, outputArtifact *wfv1.Artifact) (bool, error) {
isDir, err := file.IsDirectory(path)
if err != nil {
return true, fmt.Errorf("failed to test if %s is a directory: %v", path, err)
}

if isDir {
if err = s3cli.PutDirectory(outputArtifact.S3.Bucket, outputArtifact.S3.Key, path); err != nil {
log.Warnf("Failed to put directory: %v", err)
return false, nil
}
} else {
if err = s3cli.PutFile(outputArtifact.S3.Bucket, outputArtifact.S3.Key, path); err != nil {
log.Warnf("Failed to put file: %v", err)
return false, nil
}
}
return true, nil
createBucketIfNotPresent := outputArtifact.S3.CreateBucketIfNotPresent
if createBucketIfNotPresent != nil {
log.Infof("Trying to create bucket: %s", outputArtifact.S3.Bucket)
err := s3cli.MakeBucket(outputArtifact.S3.Bucket, minio.MakeBucketOptions{
Region: outputArtifact.S3.Region,
ObjectLocking: outputArtifact.S3.CreateBucketIfNotPresent.ObjectLocking,
})
return err
if err != nil {
return !isTransientS3Err(err), fmt.Errorf("failed to create bucket %s: %v", outputArtifact.S3.Bucket, err)
}
}

if isDir {
if err = s3cli.PutDirectory(outputArtifact.S3.Bucket, outputArtifact.S3.Key, path); err != nil {
return !isTransientS3Err(err), fmt.Errorf("failed to put directory: %v", err)
}
} else {
if err = s3cli.PutFile(outputArtifact.S3.Bucket, outputArtifact.S3.Key, path); err != nil {
return !isTransientS3Err(err), fmt.Errorf("failed to put file: %v", err)
}
}
return true, nil
}

func (s3Driver *ArtifactDriver) ListObjects(artifact *wfv1.Artifact) ([]string, error) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()

var files []string
err := wait.ExponentialBackoff(wait.Backoff{Duration: time.Second * 2, Factor: 2.0, Steps: 5, Jitter: 0.1},
err := waitutil.Backoff(defaultRetry,
func() (bool, error) {
s3cli, err := s3Driver.newS3Client(ctx)
if err != nil {
return false, err
return !isTransientS3Err(err), fmt.Errorf("failed to create new S3 client: %v", err)
}
files, err = s3cli.ListDirectory(artifact.S3.Bucket, artifact.S3.Key)
if err != nil {
return false, err
return !isTransientS3Err(err), fmt.Errorf("failed to list directory: %v", err)
}
return true, nil
})
Expand Down
Loading

0 comments on commit b7d6905

Please sign in to comment.