Skip to content

Commit

Permalink
Revisit retry logic (#465)
Browse files Browse the repository at this point in the history
Restrict what's retried, invalidate server more often

Remove unneeded Sleep in tests

Simplify retry loop 💄

Co-authored-by: Rouven Bauer <rouven.bauer@neo4j.com>
  • Loading branch information
fbiville and robsdedude authored Apr 13, 2023
1 parent 4c13aab commit c25cc31
Show file tree
Hide file tree
Showing 29 changed files with 403 additions and 468 deletions.
112 changes: 4 additions & 108 deletions neo4j/error.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,17 +21,9 @@ package neo4j

import (
"context"
"errors"
"fmt"
"github.com/neo4j/neo4j-go-driver/v5/neo4j/db"
"github.com/neo4j/neo4j-go-driver/v5/neo4j/internal/bolt"
"github.com/neo4j/neo4j-go-driver/v5/neo4j/internal/connector"
"github.com/neo4j/neo4j-go-driver/v5/neo4j/internal/errorutil"
"github.com/neo4j/neo4j-go-driver/v5/neo4j/internal/pool"
"github.com/neo4j/neo4j-go-driver/v5/neo4j/internal/retry"
"github.com/neo4j/neo4j-go-driver/v5/neo4j/internal/router"
"io"
"net"
)

// IsRetryable determines whether an operation can be retried based on the error
Expand All @@ -42,12 +34,6 @@ func IsRetryable(err error) bool {
if err == nil {
return false
}
var connectivityErr *ConnectivityError
var commitFailedError *retry.CommitFailedDeadError
if errors.As(err, &connectivityErr) && !errors.As(connectivityErr.inner, &commitFailedError) {
// all connectivity errors are safe to retry except during transaction commit
return true
}
return retry.IsRetryable(err)
}

Expand All @@ -56,55 +42,11 @@ func IsRetryable(err error) bool {
// used internally.
type Neo4jError = db.Neo4jError

// UsageError represents errors caused by incorrect usage of the driver API.
// This does not include Cypher syntax (those errors will be Neo4jError).
type UsageError struct {
Message string
}

func (e *UsageError) Error() string {
return e.Message
}

// TransactionExecutionLimit error indicates that a retryable transaction has
// failed due to reaching a limit like a timeout or maximum number of attempts.
type TransactionExecutionLimit struct {
Errors []error
Causes []string
}

func newTransactionExecutionLimit(errors []error, causes []string) *TransactionExecutionLimit {
tel := &TransactionExecutionLimit{Errors: make([]error, len(errors)), Causes: causes}
for i, err := range errors {
tel.Errors[i] = wrapError(err)
}

return tel
}

func (e *TransactionExecutionLimit) Error() string {
cause := "Unknown cause"
l := len(e.Causes)
if l > 0 {
cause = e.Causes[l-1]
}
var err error
l = len(e.Errors)
if l > 0 {
err = e.Errors[l-1]
}
return fmt.Sprintf("TransactionExecutionLimit: %s after %d attempts, last error: %s", cause, len(e.Errors), err)
}
type UsageError = errorutil.UsageError

// ConnectivityError represent errors caused by the driver not being able to connect to Neo4j services,
// or lost connections.
type ConnectivityError struct {
inner error
}
type ConnectivityError = errorutil.ConnectivityError

func (e *ConnectivityError) Error() string {
return fmt.Sprintf("ConnectivityError: %s", e.inner.Error())
}
type TransactionExecutionLimit = errorutil.TransactionExecutionLimit

// IsNeo4jError returns true if the provided error is an instance of Neo4jError.
func IsNeo4jError(err error) bool {
Expand All @@ -130,53 +72,7 @@ func IsTransactionExecutionLimit(err error) bool {
return is
}

// TokenExpiredError represent errors caused by the driver not being able to connect to Neo4j services,
// or lost connections.
type TokenExpiredError struct {
Code string
Message string
}

func (e *TokenExpiredError) Error() string {
return fmt.Sprintf("TokenExpiredError: %s (%s)", e.Code, e.Message)
}

func wrapError(err error) error {
if err == nil {
return nil
}
if err == io.EOF {
return &ConnectivityError{inner: err}
}
switch e := err.(type) {
case *db.UnsupportedTypeError, *db.FeatureNotSupportedError:
// Usage of a type not supported by database network protocol or feature
// not supported by current version or edition.
return &UsageError{Message: err.Error()}
case *pool.PoolClosed:
return &UsageError{Message: err.Error()}
case *connector.TlsError, net.Error:
return &ConnectivityError{inner: err}
case *pool.PoolTimeout, *pool.PoolFull:
return &ConnectivityError{inner: err}
case *router.ReadRoutingTableError:
return &ConnectivityError{inner: err}
case *retry.CommitFailedDeadError:
return &ConnectivityError{inner: err}
case *bolt.ConnectionReadTimeout:
return &ConnectivityError{inner: err}
case *bolt.ConnectionWriteTimeout:
return &ConnectivityError{inner: err}
case *db.Neo4jError:
if e.Code == "Neo.ClientError.Security.TokenExpired" {
return &TokenExpiredError{Code: e.Code, Message: e.Msg}
}
}
if err != nil && err.Error() == bolt.InvalidTransactionError {
return &UsageError{Message: bolt.InvalidTransactionError}
}
return err
}
type TokenExpiredError = errorutil.TokenExpiredError

type ctxCloser interface {
Close(ctx context.Context) error
Expand Down
8 changes: 4 additions & 4 deletions neo4j/error_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ package neo4j
import (
"fmt"
"github.com/neo4j/neo4j-go-driver/v5/neo4j/db"
"github.com/neo4j/neo4j-go-driver/v5/neo4j/internal/retry"
"github.com/neo4j/neo4j-go-driver/v5/neo4j/internal/errorutil"
"testing"
)

Expand All @@ -34,7 +34,7 @@ func TestIsRetryable(outer *testing.T) {

testCases := []retryableTestCase{
{true, &ConnectivityError{
inner: fmt.Errorf("hello, is it me you are looking for"),
Inner: fmt.Errorf("hello, is it me you are looking for"),
}},
{true, &db.Neo4jError{
Code: "Neo.TransientError.No.Stress",
Expand All @@ -50,7 +50,7 @@ func TestIsRetryable(outer *testing.T) {
}},
{false, nil},
{false, &ConnectivityError{
inner: &retry.CommitFailedDeadError{},
Inner: &errorutil.CommitFailedDeadError{},
}},
{false, &db.Neo4jError{
Code: "Neo.TransientError.Transaction.Terminated",
Expand All @@ -68,7 +68,7 @@ func TestIsRetryable(outer *testing.T) {
}

for _, testCase := range testCases {
outer.Run(fmt.Sprintf("is error %s retryable?", testCase.err), func(t *testing.T) {
outer.Run(fmt.Sprintf("is error %v retryable?", testCase.err), func(t *testing.T) {
expected := testCase.isRetryable

actual := IsRetryable(testCase.err)
Expand Down
3 changes: 2 additions & 1 deletion neo4j/internal/bolt/bolt3.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import (
"errors"
"fmt"
idb "github.com/neo4j/neo4j-go-driver/v5/neo4j/internal/db"
"github.com/neo4j/neo4j-go-driver/v5/neo4j/internal/errorutil"
"net"
"time"

Expand Down Expand Up @@ -271,7 +272,7 @@ func (b *bolt3) TxBegin(
// misuse from clients that stick to their connections when they shouldn't.
func (b *bolt3) assertTxHandle(h1, h2 idb.TxHandle) error {
if h1 != h2 {
err := errors.New(InvalidTransactionError)
err := errors.New(errorutil.InvalidTransactionError)
b.log.Error(log.Bolt3, b.logId, err)
return err
}
Expand Down
3 changes: 2 additions & 1 deletion neo4j/internal/bolt/bolt4.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ import (
"fmt"
"github.com/neo4j/neo4j-go-driver/v5/neo4j/internal/collections"
idb "github.com/neo4j/neo4j-go-driver/v5/neo4j/internal/db"
"github.com/neo4j/neo4j-go-driver/v5/neo4j/internal/errorutil"
"net"
"time"

Expand Down Expand Up @@ -311,7 +312,7 @@ func (b *bolt4) TxBegin(
// misuse from clients that stick to their connections when they shouldn't.
func (b *bolt4) assertTxHandle(h1, h2 idb.TxHandle) error {
if h1 != h2 {
err := errors.New(InvalidTransactionError)
err := errors.New(errorutil.InvalidTransactionError)
b.log.Error(log.Bolt4, b.logId, err)
return err
}
Expand Down
3 changes: 2 additions & 1 deletion neo4j/internal/bolt/bolt5.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import (
"errors"
"fmt"
idb "github.com/neo4j/neo4j-go-driver/v5/neo4j/internal/db"
"github.com/neo4j/neo4j-go-driver/v5/neo4j/internal/errorutil"
"net"
"time"

Expand Down Expand Up @@ -300,7 +301,7 @@ func (b *bolt5) TxBegin(
// misuse from clients that stick to their connections when they shouldn't.
func (b *bolt5) assertTxHandle(h1, h2 idb.TxHandle) error {
if h1 != h2 {
err := errors.New(InvalidTransactionError)
err := errors.New(errorutil.InvalidTransactionError)
b.log.Error(log.Bolt5, b.logId, err)
return err
}
Expand Down
13 changes: 7 additions & 6 deletions neo4j/internal/bolt/chunker.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ package bolt
import (
"context"
"encoding/binary"
"github.com/neo4j/neo4j-go-driver/v5/neo4j/internal/errorutil"
rio "github.com/neo4j/neo4j-go-driver/v5/neo4j/internal/racing"
"io"
)
Expand Down Expand Up @@ -111,15 +112,15 @@ func (c *chunker) send(ctx context.Context, wr io.Writer) error {
}

func processWriteError(err error, ctx context.Context) error {
if IsTimeoutError(err) {
return &ConnectionWriteTimeout{
userContext: ctx,
err: err,
if errorutil.IsTimeoutError(err) {
return &errorutil.ConnectionWriteTimeout{
UserContext: ctx,
Err: err,
}
}
if err == context.Canceled {
return &ConnectionWriteCanceled{
err: err,
return &errorutil.ConnectionWriteCanceled{
Err: err,
}
}
return err
Expand Down
8 changes: 4 additions & 4 deletions neo4j/internal/bolt/connections.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,13 +37,13 @@ func handleTerminatedContextError(err error, connection net.Conn) error {

func contextTerminatedErr(err error) bool {
switch err.(type) {
case *ConnectionWriteTimeout:
case *errorutil.ConnectionWriteTimeout:
return true
case *ConnectionReadTimeout:
case *errorutil.ConnectionReadTimeout:
return true
case *ConnectionWriteCanceled:
case *errorutil.ConnectionWriteCanceled:
return true
case *ConnectionReadCanceled:
case *errorutil.ConnectionReadCanceled:
return true
}
return false
Expand Down
15 changes: 8 additions & 7 deletions neo4j/internal/bolt/dechunker.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ package bolt
import (
"context"
"encoding/binary"
"github.com/neo4j/neo4j-go-driver/v5/neo4j/internal/errorutil"
rio "github.com/neo4j/neo4j-go-driver/v5/neo4j/internal/racing"
"net"
"time"
Expand Down Expand Up @@ -86,16 +87,16 @@ func newContext(ctx context.Context, readTimeout time.Duration) (context.Context
}

func processReadError(err error, ctx context.Context, readTimeout time.Duration) error {
if IsTimeoutError(err) {
return &ConnectionReadTimeout{
userContext: ctx,
readTimeout: readTimeout,
err: err,
if errorutil.IsTimeoutError(err) {
return &errorutil.ConnectionReadTimeout{
UserContext: ctx,
ReadTimeout: readTimeout,
Err: err,
}
}
if err == context.Canceled {
return &ConnectionReadCanceled{
err: err,
return &errorutil.ConnectionReadCanceled{
Err: err,
}
}
return err
Expand Down
15 changes: 2 additions & 13 deletions neo4j/internal/connector/connector.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ import (
"errors"
"github.com/neo4j/neo4j-go-driver/v5/neo4j/config"
"github.com/neo4j/neo4j-go-driver/v5/neo4j/internal/db"
"github.com/neo4j/neo4j-go-driver/v5/neo4j/internal/errorutil"
"io"
"net"
"time"
Expand Down Expand Up @@ -96,7 +97,7 @@ func (c Connector) Connect(ctx context.Context, address string, boltLogger log.B
err = errors.New("remote end closed the connection, check that TLS is enabled on the server")
}
conn.Close()
return nil, &TlsError{inner: err}
return nil, &errorutil.TlsError{Inner: err}
}
connection, err := bolt.Connect(ctx,
address,
Expand Down Expand Up @@ -141,15 +142,3 @@ func (c Connector) tlsConfig(serverName string) *tls.Config {
config.ServerName = serverName
return config
}

// TlsError encapsulates all errors related to TLS connection creation
// This is needed since the tls package does not provide a common error type
// à la net.Error, and a common type is needed to properly classify the error
// for Testkit
type TlsError struct {
inner error
}

func (e *TlsError) Error() string {
return e.inner.Error()
}
Loading

0 comments on commit c25cc31

Please sign in to comment.