diff --git a/internal/datastore/crdb/crdb.go b/internal/datastore/crdb/crdb.go index b4efe3ebe8..ec2689054d 100644 --- a/internal/datastore/crdb/crdb.go +++ b/internal/datastore/crdb/crdb.go @@ -198,18 +198,18 @@ func (cds *crdbDatastore) HeadRevision(ctx context.Context) (datastore.Revision, ctx, span := tracer.Start(datastore.SeparateContextWithTracing(ctx), "HeadRevision") defer span.End() - tx, err := cds.conn.BeginTx(ctx, pgx.TxOptions{AccessMode: pgx.ReadOnly}) - if err != nil { - return datastore.NoRevision, fmt.Errorf(errRevision, err) - } - defer tx.Rollback(ctx) - - hlcNow, err := readCRDBNow(ctx, tx) - if err != nil { - return datastore.NoRevision, fmt.Errorf(errRevision, err) - } + var hlcNow datastore.Revision + err := cds.execute(ctx, cds.conn, pgx.TxOptions{AccessMode: pgx.ReadOnly}, func(tx pgx.Tx) error { + var fnErr error + hlcNow, fnErr = readCRDBNow(ctx, tx) + if fnErr != nil { + hlcNow = datastore.NoRevision + return fmt.Errorf(errRevision, fnErr) + } + return nil + }) - return hlcNow, nil + return hlcNow, err } func (cds *crdbDatastore) AddOverlapKey(keySet map[string]struct{}, namespace string) { diff --git a/internal/datastore/crdb/namespace.go b/internal/datastore/crdb/namespace.go index c028bc18a9..661e0e528e 100644 --- a/internal/datastore/crdb/namespace.go +++ b/internal/datastore/crdb/namespace.go @@ -79,21 +79,23 @@ func (cds *crdbDatastore) ReadNamespace( ) (*core.NamespaceDefinition, datastore.Revision, error) { ctx = datastore.SeparateContextWithTracing(ctx) - tx, err := cds.conn.BeginTx(ctx, pgx.TxOptions{AccessMode: pgx.ReadOnly}) - if err != nil { - return nil, datastore.NoRevision, fmt.Errorf(errUnableToReadConfig, err) - } - defer tx.Rollback(ctx) - - if err := prepareTransaction(ctx, tx, revision); err != nil { - return nil, datastore.NoRevision, fmt.Errorf(errUnableToReadConfig, err) - } + var config *core.NamespaceDefinition + var timestamp time.Time + if err := cds.execute(ctx, cds.conn, pgx.TxOptions{AccessMode: pgx.ReadOnly}, func(tx pgx.Tx) error { + var err error + if err := prepareTransaction(ctx, tx, revision); err != nil { + return fmt.Errorf(errUnableToReadConfig, err) + } - config, timestamp, err := loadNamespace(ctx, tx, nsName) - if err != nil { - if errors.As(err, &datastore.ErrNamespaceNotFound{}) { - return nil, datastore.NoRevision, err + config, timestamp, err = loadNamespace(ctx, tx, nsName) + if err != nil { + if errors.As(err, &datastore.ErrNamespaceNotFound{}) { + return err + } + return fmt.Errorf(errUnableToReadConfig, err) } + return nil + }); err != nil { return nil, datastore.NoRevision, fmt.Errorf(errUnableToReadConfig, err) } diff --git a/internal/datastore/crdb/tx.go b/internal/datastore/crdb/tx.go index 96c961163f..eba9e3deca 100644 --- a/internal/datastore/crdb/tx.go +++ b/internal/datastore/crdb/tx.go @@ -3,7 +3,6 @@ package crdb import ( "context" "errors" - "fmt" "strings" "github.com/jackc/pgconn" @@ -23,18 +22,8 @@ const ( crdbClockSkewMessage = "cannot specify timestamp in the future" errReachedMaxRetry = "maximum retries reached" - - sqlRollback = "ROLLBACK TO SAVEPOINT cockroach_restart" - sqlSavepoint = "SAVEPOINT cockroach_restart" - sqlReleaseSavepoint = "RELEASE SAVEPOINT cockroach_restart" ) -var retryHistogram = prometheus.NewHistogram(prometheus.HistogramOpts{ - Name: "crdb_client_retries", - Help: "cockroachdb client-side retry distribution", - Buckets: []float64{0, 1, 2, 5, 10, 20, 50}, -}) - var resetHistogram = prometheus.NewHistogram(prometheus.HistogramOpts{ Name: "crdb_client_resets", Help: "cockroachdb client-side tx reset distribution", @@ -42,7 +31,6 @@ var resetHistogram = prometheus.NewHistogram(prometheus.HistogramOpts{ }) func init() { - prometheus.MustRegister(retryHistogram) prometheus.MustRegister(resetHistogram) } @@ -55,21 +43,6 @@ type transactionFn func(tx pgx.Tx) error type executeTxRetryFunc func(context.Context, conn, pgx.TxOptions, transactionFn) error -// RetryError wraps an error that prevented the transaction function from being retried. -type RetryError struct { - err error -} - -// Error returns the wrapped error -func (e RetryError) Error() string { - return fmt.Errorf("unable to retry conflicted transaction: %w", e.err).Error() -} - -// Unwrap returns the wrapped, non-retriable error -func (e RetryError) Unwrap() error { - return e.err -} - func executeWithMaxRetries(max int) executeTxRetryFunc { return func(ctx context.Context, conn conn, txOptions pgx.TxOptions, fn transactionFn) (err error) { return executeWithResets(ctx, conn, txOptions, fn, max) @@ -78,9 +51,8 @@ func executeWithMaxRetries(max int) executeTxRetryFunc { // executeWithResets executes transactionFn and resets the tx when ambiguous crdb errors are encountered. func executeWithResets(ctx context.Context, conn conn, txOptions pgx.TxOptions, fn transactionFn, maxRetries int) (err error) { - var retries, resets int + var resets int defer func() { - retryHistogram.Observe(float64(retries)) resetHistogram.Observe(float64(resets)) }() @@ -102,24 +74,19 @@ func executeWithResets(ctx context.Context, conn conn, txOptions pgx.TxOptions, }() // NOTE: n maxRetries can yield n+1 executions of the transaction fn - for retries = 0; retries <= maxRetries; retries++ { + for resets = 0; resets <= maxRetries; resets++ { tx, err = resetExecution(ctx, conn, tx, txOptions) if err != nil { log.Err(err).Msg("error resetting transaction") - resets++ - continue - } - - retryAttempts, txErr := executeWithRetries(ctx, tx, fn, maxRetries-retries) - retries += retryAttempts - if txErr == nil { - return + if resetable(ctx, err) { + continue + } else { + return + } } - err = txErr - if resetable(ctx, err) { + if err = fn(tx); resetable(ctx, err) { log.Err(err).Msg("resettable error, will attempt to reset tx") - resets++ continue } @@ -129,42 +96,7 @@ func executeWithResets(ctx context.Context, conn conn, txOptions pgx.TxOptions, return } -// executeWithRetries executes the transaction fn and attempts to retry up to maxRetries. -// adapted from https://github.com/cockroachdb/cockroach-go/blob/05d7aaec086fe3288377923bf9a98648d29c44c6/crdb/tx.go#L95 -func executeWithRetries(ctx context.Context, currentTx pgx.Tx, fn transactionFn, maxRetries int) (retries int, err error) { - releasedFn := func(tx pgx.Tx) error { - if err := fn(tx); err != nil { - return err - } - - // RELEASE acts like COMMIT in CockroachDB. We use it since it gives us an - // opportunity to react to retryable errors, whereas tx.Commit() doesn't. - - // RELEASE SAVEPOINT itself can fail, in which case the entire - // transaction needs to be retried - if _, err := tx.Exec(ctx, sqlReleaseSavepoint); err != nil { - return err - } - return nil - } - - var i int - for i = 0; i <= maxRetries; i++ { - if err = releasedFn(currentTx); err != nil { - if retriable(ctx, err) { - if _, retryErr := currentTx.Exec(ctx, sqlRollback); retryErr != nil { - return i, RetryError{err: retryErr} - } - continue - } - return i, err - } - return i, nil - } - return i, errors.New(errReachedMaxRetry) -} - -// resetExecution attempts to rollback the given tx, begins a new tx with a new connection, and creates a savepoint. +// resetExecution attempts to rollback the given tx and begins a new tx with a new connection. func resetExecution(ctx context.Context, conn conn, tx pgx.Tx, txOptions pgx.TxOptions) (newTx pgx.Tx, err error) { log.Info().Msg("attempting to initialize new tx") if tx != nil { @@ -177,31 +109,26 @@ func resetExecution(ctx context.Context, conn conn, tx pgx.Tx, txOptions pgx.TxO if err != nil { return nil, err } - if _, err = newTx.Exec(ctx, sqlSavepoint); err != nil { - return nil, err - } return newTx, nil } -func retriable(ctx context.Context, err error) bool { - return sqlErrorCode(ctx, err) == crdbRetryErrCode -} - func resetable(ctx context.Context, err error) bool { sqlState := sqlErrorCode(ctx, err) // Ambiguous result error includes connection closed errors // https://www.cockroachlabs.com/docs/stable/common-errors.html#result-is-ambiguous return sqlState == crdbAmbiguousErrorCode || + // Reset for retriable errors + sqlState == crdbRetryErrCode || // Error encountered when crdb nodes have large clock skew (sqlState == crdbUnknownSQLState && strings.Contains(err.Error(), crdbClockSkewMessage)) } -// sqlErrorCode attenmpts to extract the crdb error code from the error state. +// sqlErrorCode attempts to extract the crdb error code from the error state. func sqlErrorCode(ctx context.Context, err error) string { var pgerr *pgconn.PgError if !errors.As(err, &pgerr) { - log.Info().Err(err).Msg("couldn't determine a sqlstate error code") + log.Debug().Err(err).Msg("couldn't determine a sqlstate error code") return "" } diff --git a/internal/datastore/crdb/tx_test.go b/internal/datastore/crdb/tx_test.go index 762d5d6884..e57f6f524d 100644 --- a/internal/datastore/crdb/tx_test.go +++ b/internal/datastore/crdb/tx_test.go @@ -25,13 +25,13 @@ const ( var testUserNS = namespace.Namespace(testUserNamespace) -func executeWithErrors(errors *[]pgconn.PgError, maxRetries int) executeTxRetryFunc { +func executeWithErrors(errors *[]error, maxRetries int) executeTxRetryFunc { return func(ctx context.Context, conn conn, txOptions pgx.TxOptions, fn transactionFn) (err error) { wrappedFn := func(tx pgx.Tx) error { if len(*errors) > 0 { retErr := (*errors)[0] (*errors) = (*errors)[1:] - return &retErr + return retErr } return fn(tx) @@ -45,17 +45,17 @@ func TestTxReset(t *testing.T) { cases := []struct { name string maxRetries int - errors []pgconn.PgError + errors []error expectError bool expectedError error }{ { name: "retryable", maxRetries: 4, - errors: []pgconn.PgError{ - {Code: crdbRetryErrCode}, - {Code: crdbRetryErrCode}, - {Code: crdbRetryErrCode}, + errors: []error{ + &pgconn.PgError{Code: crdbRetryErrCode}, + &pgconn.PgError{Code: crdbRetryErrCode}, + &pgconn.PgError{Code: crdbRetryErrCode}, }, expectError: false, expectedError: nil, @@ -63,10 +63,10 @@ func TestTxReset(t *testing.T) { { name: "resettable", maxRetries: 4, - errors: []pgconn.PgError{ - {Code: crdbAmbiguousErrorCode}, - {Code: crdbAmbiguousErrorCode}, - {Code: crdbAmbiguousErrorCode}, + errors: []error{ + &pgconn.PgError{Code: crdbAmbiguousErrorCode}, + &pgconn.PgError{Code: crdbAmbiguousErrorCode}, + &pgconn.PgError{Code: crdbAmbiguousErrorCode}, }, expectError: false, expectedError: nil, @@ -74,10 +74,10 @@ func TestTxReset(t *testing.T) { { name: "mixed", maxRetries: 50, - errors: []pgconn.PgError{ - {Code: crdbRetryErrCode}, - {Code: crdbAmbiguousErrorCode}, - {Code: crdbRetryErrCode}, + errors: []error{ + &pgconn.PgError{Code: crdbRetryErrCode}, + &pgconn.PgError{Code: crdbAmbiguousErrorCode}, + &pgconn.PgError{Code: crdbRetryErrCode}, }, expectError: false, expectedError: nil, @@ -85,16 +85,16 @@ func TestTxReset(t *testing.T) { { name: "noErrors", maxRetries: 50, - errors: []pgconn.PgError{}, + errors: []error{}, expectError: false, expectedError: nil, }, { name: "nonRecoverable", maxRetries: 1, - errors: []pgconn.PgError{ - {Code: crdbRetryErrCode}, - {Code: crdbAmbiguousErrorCode}, + errors: []error{ + &pgconn.PgError{Code: crdbRetryErrCode}, + &pgconn.PgError{Code: crdbAmbiguousErrorCode}, }, expectError: true, expectedError: errors.New(errReachedMaxRetry), @@ -102,8 +102,8 @@ func TestTxReset(t *testing.T) { { name: "clockSkew", maxRetries: 1, - errors: []pgconn.PgError{ - {Code: crdbUnknownSQLState, Message: crdbClockSkewMessage}, + errors: []error{ + &pgconn.PgError{Code: crdbUnknownSQLState, Message: crdbClockSkewMessage}, }, expectError: false, expectedError: nil,