Skip to content

Commit

Permalink
fix: Surface error during wait timeout for OSS artifact driver API ca…
Browse files Browse the repository at this point in the history
…lls (#5601)
  • Loading branch information
terrytangyuan authored and simster7 committed Apr 19, 2021
1 parent 026c127 commit 0cea612
Showing 1 changed file with 50 additions and 3 deletions.
53 changes: 50 additions & 3 deletions workflow/artifacts/oss/oss.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import (
"k8s.io/apimachinery/pkg/util/wait"

wfv1 "github.com/argoproj/argo-workflows/v3/pkg/apis/workflow/v1alpha1"
waitutil "github.com/argoproj/argo-workflows/v3/util/wait"
)

// OSSArtifactDriver is a driver for OSS
Expand All @@ -18,8 +19,16 @@ type OSSArtifactDriver struct {
SecretKey string
}

var (
defaultRetry = wait.Backoff{Duration: time.Second * 2, Factor: 2.0, Steps: 5, Jitter: 0.1}

// OSS error code reference: https://error-center.alibabacloud.com/status/product/Oss
ossTransientErrorCodes = []string{"RequestTimeout", "QuotaExceeded.Refresh", "Default", "ServiceUnavailable", "Throttling", "RequestTimeTooSkewed", "SocketException", "SocketTimeout", "ServiceBusy", "DomainNetWorkVisitedException", "ConnectionTimeout", "CachedTimeTooLarge"}
)

func (ossDriver *OSSArtifactDriver) newOSSClient() (*oss.Client, error) {
client, err := oss.New(ossDriver.Endpoint, ossDriver.AccessKey, ossDriver.SecretKey)
var options []oss.ClientOption
client, err := oss.New(ossDriver.Endpoint, ossDriver.AccessKey, ossDriver.SecretKey, options...)
if err != nil {
log.Warnf("Failed to create new OSS client: %v", err)
return nil, err
Expand All @@ -29,7 +38,7 @@ func (ossDriver *OSSArtifactDriver) newOSSClient() (*oss.Client, error) {

// Downloads artifacts from OSS compliant storage, e.g., downloading an artifact into local path
func (ossDriver *OSSArtifactDriver) Load(inputArtifact *wfv1.Artifact, path string) error {
err := wait.ExponentialBackoff(wait.Backoff{Duration: time.Second * 2, Factor: 2.0, Steps: 5, Jitter: 0.1},
err := waitutil.Backoff(defaultRetry,
func() (bool, error) {
log.Infof("OSS Load path: %s, key: %s", path, inputArtifact.OSS.Key)
osscli, err := ossDriver.newOSSClient()
Expand All @@ -53,7 +62,7 @@ func (ossDriver *OSSArtifactDriver) Load(inputArtifact *wfv1.Artifact, path stri

// Saves an artifact to OSS compliant storage, e.g., uploading a local file to OSS bucket
func (ossDriver *OSSArtifactDriver) Save(path string, outputArtifact *wfv1.Artifact) error {
err := wait.ExponentialBackoff(wait.Backoff{Duration: time.Second * 2, Factor: 2.0, Steps: 5, Jitter: 0.1},
err := waitutil.Backoff(defaultRetry,
func() (bool, error) {
log.Infof("OSS Save path: %s, key: %s", path, outputArtifact.OSS.Key)
osscli, err := ossDriver.newOSSClient()
Expand Down Expand Up @@ -87,3 +96,41 @@ func (ossDriver *OSSArtifactDriver) Save(path string, outputArtifact *wfv1.Artif
})
return err
}

func (ossDriver *OSSArtifactDriver) ListObjects(artifact *wfv1.Artifact) ([]string, error) {
var files []string
err := waitutil.Backoff(defaultRetry,
func() (bool, error) {
osscli, err := ossDriver.newOSSClient()
if err != nil {
return !isTransientOSSErr(err), err
}
bucket, err := osscli.Bucket(artifact.OSS.Bucket)
if err != nil {
return !isTransientOSSErr(err), err
}
results, err := bucket.ListObjects(oss.Prefix(artifact.OSS.Key))
if err != nil {
return !isTransientOSSErr(err), err
}
for _, object := range results.Objects {
files = append(files, object.Key)
}
return true, nil
})
return files, err
}

func isTransientOSSErr(err error) bool {
if err == nil {
return false
}
if ossErr, ok := err.(oss.ServiceError); ok {
for _, transientErrCode := range ossTransientErrorCodes {
if ossErr.Code == transientErrCode {
return true
}
}
}
return false
}

0 comments on commit 0cea612

Please sign in to comment.