Skip to content

Commit

Permalink
fix: retry ExecutorPlugin invocation on transient network errors Fixes:
Browse files Browse the repository at this point in the history
argoproj#9664 (argoproj#9665)

Signed-off-by: juchao <juchao@coscene.io>
  • Loading branch information
Suckzoo authored and juchaosong committed Nov 3, 2022
1 parent 0afec97 commit c3a9527
Show file tree
Hide file tree
Showing 3 changed files with 54 additions and 19 deletions.
20 changes: 10 additions & 10 deletions util/errors/errors.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,19 +59,16 @@ func isResourceQuotaConflictErr(err error) bool {

func isTransientNetworkErr(err error) bool {
switch err.(type) {
case net.Error:
switch err.(type) {
case *net.DNSError, *net.OpError, net.UnknownNetworkError:
return true
case *url.Error:
// For a URL error, where it replies back "connection closed"
// retry again.
return strings.Contains(err.Error(), "Connection closed by foreign host")
}
case *net.DNSError, *net.OpError, net.UnknownNetworkError:
return true
}

errorString := generateErrorString(err)
if strings.Contains(errorString, "net/http: TLS handshake timeout") {
if strings.Contains(errorString, "Connection closed by foreign host") {
// For a URL error, where it replies back "connection closed"
// retry again.
return true
} else if strings.Contains(errorString, "net/http: TLS handshake timeout") {
// If error is - tlsHandshakeTimeoutError, retry.
return true
} else if strings.Contains(errorString, "i/o timeout") {
Expand All @@ -83,6 +80,9 @@ func isTransientNetworkErr(err error) bool {
} else if strings.Contains(errorString, "connection reset by peer") {
// If err is a ECONNRESET, retry.
return true
} else if _, ok := err.(*url.Error); ok && strings.Contains(errorString, "EOF") {
// If err is EOF, retry.
return true
}

return false
Expand Down
51 changes: 43 additions & 8 deletions util/errors/errors_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,16 +19,31 @@ func (n netError) Error() string { return string(n) }
func (n netError) Timeout() bool { return false }
func (n netError) Temporary() bool { return false }

func urlError(errString string) *url.Error {
return &url.Error{
Op: "Get",
URL: "https://argoproj.github.io",
Err: errors.New(errString),
}
}

var (
tlsHandshakeTimeoutErr net.Error = netError("net/http: TLS handshake timeout")
ioTimeoutErr net.Error = netError("i/o timeout")
connectionTimedout net.Error = netError("connection timed out")
connectionReset net.Error = netError("connection reset by peer")
connectionTimedoutErr net.Error = netError("connection timed out")
connectionResetErr net.Error = netError("connection reset by peer")
transientErr net.Error = netError("this error is transient")
transientExitErr = exec.ExitError{
ProcessState: &os.ProcessState{},
Stderr: []byte("this error is transient"),
}

connectionClosedUErr *url.Error = urlError("Connection closed by foreign host")
tlsHandshakeTimeoutUErr *url.Error = urlError("net/http: TLS handshake timeout")
ioTimeoutUErr *url.Error = urlError("i/o timeout")
connectionTimedoutUErr *url.Error = urlError("connection timed out")
connectionResetUErr *url.Error = urlError("connection reset by peer")
EOFUErr *url.Error = urlError("EOF")
)

const transientEnvVarKey = "TRANSIENT_ERROR_PATTERN"
Expand Down Expand Up @@ -57,21 +72,17 @@ func TestIsTransientErr(t *testing.T) {
t.Run("UnknownNetworkError", func(t *testing.T) {
assert.True(t, IsTransientErr(net.UnknownNetworkError("")))
})
t.Run("ConnectionClosedErr", func(t *testing.T) {
assert.False(t, IsTransientErr(&url.Error{Err: errors.New("")}))
assert.True(t, IsTransientErr(&url.Error{Err: errors.New("Connection closed by foreign host")}))
})
t.Run("TLSHandshakeTimeout", func(t *testing.T) {
assert.True(t, IsTransientErr(tlsHandshakeTimeoutErr))
})
t.Run("IOHandshakeTimeout", func(t *testing.T) {
assert.True(t, IsTransientErr(ioTimeoutErr))
})
t.Run("ConnectionTimeout", func(t *testing.T) {
assert.True(t, IsTransientErr(connectionTimedout))
assert.True(t, IsTransientErr(connectionTimedoutErr))
})
t.Run("ConnectionReset", func(t *testing.T) {
assert.True(t, IsTransientErr(connectionReset))
assert.True(t, IsTransientErr(connectionResetErr))
})
t.Run("TransientErrorPattern", func(t *testing.T) {
_ = os.Setenv(transientEnvVarKey, "this error is transient")
Expand All @@ -91,3 +102,27 @@ func TestIsTransientErr(t *testing.T) {
assert.True(t, IsTransientErr(NewErrTransient("")))
})
}

func TestIsTransientUErr(t *testing.T) {
t.Run("NonExceptionalUErr", func(t *testing.T) {
assert.False(t, IsTransientErr(&url.Error{Err: errors.New("")}))
})
t.Run("ConnectionClosedUErr", func(t *testing.T) {
assert.True(t, IsTransientErr(connectionClosedUErr))
})
t.Run("TLSHandshakeTimeoutUErr", func(t *testing.T) {
assert.True(t, IsTransientErr(tlsHandshakeTimeoutUErr))
})
t.Run("IOHandshakeTimeoutUErr", func(t *testing.T) {
assert.True(t, IsTransientErr(ioTimeoutUErr))
})
t.Run("ConnectionTimeoutUErr", func(t *testing.T) {
assert.True(t, IsTransientErr(connectionTimedoutUErr))
})
t.Run("ConnectionResetUErr", func(t *testing.T) {
assert.True(t, IsTransientErr(connectionResetUErr))
})
t.Run("EOFUErr", func(t *testing.T) {
assert.True(t, IsTransientErr(EOFUErr))
})
}
2 changes: 1 addition & 1 deletion workflow/util/plugins/plugin.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ func (p *Client) Call(ctx context.Context, method string, args interface{}, repl
return true
}
}
return strings.Contains(err.Error(), "connection refused")
return strings.Contains(err.Error(), "connection refused") || errors.IsTransientErr(err)
}, func() error {
log.Debug("Calling plugin")
req, err := http.NewRequestWithContext(ctx, "POST", fmt.Sprintf("%s/api/v1/%s", p.address, method), bytes.NewBuffer(body))
Expand Down

0 comments on commit c3a9527

Please sign in to comment.