Skip to content

Commit

Permalink
feat(controller): Retry transient offload errors. Resolves argoproj#4464
Browse files Browse the repository at this point in the history


Signed-off-by: Alex Collins <alex_collins@intuit.com>
  • Loading branch information
alexec committed Nov 6, 2020
1 parent a441a97 commit 28f9676
Show file tree
Hide file tree
Showing 5 changed files with 168 additions and 1 deletion.
76 changes: 76 additions & 0 deletions persist/sqldb/retry/offload_node_status_repo_with_retry.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,76 @@
package retry

import (
"k8s.io/apimachinery/pkg/util/wait"

"github.com/argoproj/argo/persist/sqldb"
wfv1 "github.com/argoproj/argo/pkg/apis/workflow/v1alpha1"
"github.com/argoproj/argo/util/errors"
"github.com/argoproj/argo/util/retry"
)

type offloadNodeStatusRepoWithRetry struct {
delegate sqldb.OffloadNodeStatusRepo
}

func WithRetry(delegate sqldb.OffloadNodeStatusRepo) sqldb.OffloadNodeStatusRepo {
return &offloadNodeStatusRepoWithRetry{delegate}
}

func (o *offloadNodeStatusRepoWithRetry) Save(uid, namespace string, nodes wfv1.Nodes) (string, error) {
var version string
err := wait.ExponentialBackoff(retry.DefaultRetry, func() (bool, error) {
var err error
version, err = o.delegate.Save(uid, namespace, nodes)
return done(err), err
})
return version, err
}

func (o *offloadNodeStatusRepoWithRetry) Get(uid, version string) (wfv1.Nodes, error) {
var nodes wfv1.Nodes
err := wait.ExponentialBackoff(retry.DefaultRetry, func() (bool, error) {
var err error
nodes, err = o.delegate.Get(uid, version)
return done(err), err
})
return nodes, err
}

func done(err error) bool {
return err == nil || !errors.IsTransientErr(err)
}

func (o *offloadNodeStatusRepoWithRetry) List(namespace string) (map[sqldb.UUIDVersion]wfv1.Nodes, error) {
var nodes map[sqldb.UUIDVersion]wfv1.Nodes
err := wait.ExponentialBackoff(retry.DefaultRetry, func() (bool, error) {
var err error
nodes, err = o.delegate.List(namespace)
return done(err), err
})
return nodes, err
}

func (o *offloadNodeStatusRepoWithRetry) ListOldOffloads(namespace string) ([]sqldb.UUIDVersion, error) {
var versions []sqldb.UUIDVersion
err := wait.ExponentialBackoff(retry.DefaultRetry, func() (bool, error) {
var err error
versions, err = o.delegate.ListOldOffloads(namespace)
return done(err), err
})
return versions, err
}

func (o *offloadNodeStatusRepoWithRetry) Delete(uid, version string) error {
err := wait.ExponentialBackoff(retry.DefaultRetry, func() (bool, error) {
err := o.delegate.Delete(uid, version)
return done(err), err
})
return err
}

func (o *offloadNodeStatusRepoWithRetry) IsEnabled() bool {
return o.delegate.IsEnabled()
}

var _ sqldb.OffloadNodeStatusRepo = &offloadNodeStatusRepoWithRetry{}
85 changes: 85 additions & 0 deletions persist/sqldb/retry/offload_node_status_repo_with_retry_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,85 @@
package retry

import (
"errors"
"testing"

"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/mock"
apierr "k8s.io/apimachinery/pkg/api/errors"

"github.com/argoproj/argo/persist/sqldb"
sqldbmocks "github.com/argoproj/argo/persist/sqldb/mocks"
wfv1 "github.com/argoproj/argo/pkg/apis/workflow/v1alpha1"
)

var transientErr = apierr.NewTooManyRequests("", 0)
var permanentErr = errors.New("")

func Test_offloadNodeStatusRepoWithRetry(t *testing.T) {
t.Run("PermanentError", func(t *testing.T) {
delegate := &sqldbmocks.OffloadNodeStatusRepo{}
o := WithRetry(delegate)
delegate.On("Save", mock.Anything, mock.Anything, mock.Anything).
Return("", transientErr).
Return("", permanentErr)
_, err := o.Save("my-uid", "my-ns", wfv1.Nodes{})
assert.Equal(t, permanentErr, err)
})
delegate := &sqldbmocks.OffloadNodeStatusRepo{}
o := WithRetry(delegate)
t.Run("Save", func(t *testing.T) {
delegate.On("Save", "my-uid", "my-ns", mock.Anything).
Return("", transientErr).
Return("my-version", nil)
version, err := o.Save("my-uid", "my-ns", wfv1.Nodes{})
if assert.NoError(t, err) {
assert.Equal(t, "my-version", version)
}
})
t.Run("Get", func(t *testing.T) {
delegate.On("Get", "my-uid", "my-version").
Return(nil, transientErr).
Return(wfv1.Nodes{}, nil)
nodes, err := o.Get("my-uid", "my-version")
if assert.NoError(t, err) {
assert.NotNil(t, nodes)
}
})
t.Run("List", func(t *testing.T) {
delegate.On("List", "my-ns").
Return(nil, transientErr).
Return(make(map[sqldb.UUIDVersion]wfv1.Nodes), nil)
list, err := o.List("my-ns")
if assert.NoError(t, err) {
assert.NotNil(t, list)
}
})
t.Run("ListOldOffloads", func(t *testing.T) {
delegate.On("ListOldOffloads", "my-ns").
Return(nil, transientErr).
Return(make([]sqldb.UUIDVersion, 0), nil)
list, err := o.ListOldOffloads("my-ns")
if assert.NoError(t, err) {
assert.NotNil(t, list)
}
})
t.Run("Delete", func(t *testing.T) {
delegate.On("Delete", "my-uid", "my-version").
Return(transientErr).
Return(nil)
err := o.Delete("my-uid", "my-version")
assert.NoError(t, err)
})
t.Run("IsEnabled", func(t *testing.T) {
delegate.On("IsEnabled").
Return(true)
assert.True(t, o.IsEnabled())
})
}

func Test_done(t *testing.T) {
assert.True(t, done(nil))
assert.False(t, done(transientErr))
assert.True(t, done(permanentErr))
}
2 changes: 2 additions & 0 deletions server/apiserver/argoserver.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import (
"github.com/argoproj/argo"
"github.com/argoproj/argo/config"
"github.com/argoproj/argo/persist/sqldb"
"github.com/argoproj/argo/persist/sqldb/retry"
clusterwftemplatepkg "github.com/argoproj/argo/pkg/apiclient/clusterworkflowtemplate"
cronworkflowpkg "github.com/argoproj/argo/pkg/apiclient/cronworkflow"
eventpkg "github.com/argoproj/argo/pkg/apiclient/event"
Expand Down Expand Up @@ -157,6 +158,7 @@ func (as *argoServer) Run(ctx context.Context, port int, browserOpenFunc func(st
if err != nil {
log.Fatal(err)
}
offloadRepo = retry.WithRetry(offloadRepo)
// we always enable the archive for the Argo Server, as the Argo Server does not write records, so you can
// disable the archiving - and still read old records
wfArchive = sqldb.NewWorkflowArchive(session, persistence.GetClusterName(), as.managedNamespace, instanceIDService)
Expand Down
2 changes: 2 additions & 0 deletions test/e2e/fixtures/persistence.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (

"github.com/argoproj/argo/config"
"github.com/argoproj/argo/persist/sqldb"
"github.com/argoproj/argo/persist/sqldb/retry"
"github.com/argoproj/argo/util/instanceid"
)

Expand All @@ -32,6 +33,7 @@ func newPersistence(kubeClient kubernetes.Interface, wcConfig *config.Config) *P
if err != nil {
panic(err)
}
offloadNodeStatusRepo = retry.WithRetry(offloadNodeStatusRepo)
instanceIDService := instanceid.NewService(wcConfig.InstanceID)
workflowArchive := sqldb.NewWorkflowArchive(session, persistence.GetClusterName(), Namespace, instanceIDService)
return &Persistence{session, offloadNodeStatusRepo, workflowArchive}
Expand Down
4 changes: 3 additions & 1 deletion workflow/controller/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import (
"github.com/argoproj/argo/config"
"github.com/argoproj/argo/errors"
"github.com/argoproj/argo/persist/sqldb"
"github.com/argoproj/argo/persist/sqldb/retry"
"github.com/argoproj/argo/util/instanceid"
"github.com/argoproj/argo/workflow/hydrator"
)
Expand Down Expand Up @@ -51,10 +52,11 @@ func (wfc *WorkflowController) updateConfig(v interface{}) error {

wfc.session = session
if persistence.NodeStatusOffload {
wfc.offloadNodeStatusRepo, err = sqldb.NewOffloadNodeStatusRepo(session, persistence.GetClusterName(), tableName)
offloadNodeStatusRepo, err := sqldb.NewOffloadNodeStatusRepo(session, persistence.GetClusterName(), tableName)
if err != nil {
return err
}
wfc.offloadNodeStatusRepo = retry.WithRetry(offloadNodeStatusRepo)
log.Info("Node status offloading is enabled")
} else {
log.Info("Node status offloading is disabled")
Expand Down

0 comments on commit 28f9676

Please sign in to comment.