Skip to content

Commit

Permalink
Consolidate crdb tx retry into resets
Browse files Browse the repository at this point in the history
  • Loading branch information
samkim committed Mar 24, 2022
1 parent f4f8147 commit 9eb9d4f
Show file tree
Hide file tree
Showing 4 changed files with 60 additions and 131 deletions.
22 changes: 11 additions & 11 deletions internal/datastore/crdb/crdb.go
Original file line number Diff line number Diff line change
Expand Up @@ -198,18 +198,18 @@ func (cds *crdbDatastore) HeadRevision(ctx context.Context) (datastore.Revision,
ctx, span := tracer.Start(datastore.SeparateContextWithTracing(ctx), "HeadRevision")
defer span.End()

tx, err := cds.conn.BeginTx(ctx, pgx.TxOptions{AccessMode: pgx.ReadOnly})
if err != nil {
return datastore.NoRevision, fmt.Errorf(errRevision, err)
}
defer tx.Rollback(ctx)

hlcNow, err := readCRDBNow(ctx, tx)
if err != nil {
return datastore.NoRevision, fmt.Errorf(errRevision, err)
}
var hlcNow datastore.Revision
err := cds.execute(ctx, cds.conn, pgx.TxOptions{AccessMode: pgx.ReadOnly}, func(tx pgx.Tx) error {
var fnErr error
hlcNow, fnErr = readCRDBNow(ctx, tx)
if fnErr != nil {
hlcNow = datastore.NoRevision
return fmt.Errorf(errRevision, fnErr)
}
return nil
})

return hlcNow, nil
return hlcNow, err
}

func (cds *crdbDatastore) AddOverlapKey(keySet map[string]struct{}, namespace string) {
Expand Down
28 changes: 15 additions & 13 deletions internal/datastore/crdb/namespace.go
Original file line number Diff line number Diff line change
Expand Up @@ -79,21 +79,23 @@ func (cds *crdbDatastore) ReadNamespace(
) (*core.NamespaceDefinition, datastore.Revision, error) {
ctx = datastore.SeparateContextWithTracing(ctx)

tx, err := cds.conn.BeginTx(ctx, pgx.TxOptions{AccessMode: pgx.ReadOnly})
if err != nil {
return nil, datastore.NoRevision, fmt.Errorf(errUnableToReadConfig, err)
}
defer tx.Rollback(ctx)

if err := prepareTransaction(ctx, tx, revision); err != nil {
return nil, datastore.NoRevision, fmt.Errorf(errUnableToReadConfig, err)
}
var config *core.NamespaceDefinition
var timestamp time.Time
if err := cds.execute(ctx, cds.conn, pgx.TxOptions{AccessMode: pgx.ReadOnly}, func(tx pgx.Tx) error {
var err error
if err := prepareTransaction(ctx, tx, revision); err != nil {
return fmt.Errorf(errUnableToReadConfig, err)
}

config, timestamp, err := loadNamespace(ctx, tx, nsName)
if err != nil {
if errors.As(err, &datastore.ErrNamespaceNotFound{}) {
return nil, datastore.NoRevision, err
config, timestamp, err = loadNamespace(ctx, tx, nsName)
if err != nil {
if errors.As(err, &datastore.ErrNamespaceNotFound{}) {
return err
}
return fmt.Errorf(errUnableToReadConfig, err)
}
return nil
}); err != nil {
return nil, datastore.NoRevision, fmt.Errorf(errUnableToReadConfig, err)
}

Expand Down
99 changes: 13 additions & 86 deletions internal/datastore/crdb/tx.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@ package crdb
import (
"context"
"errors"
"fmt"
"strings"

"github.com/jackc/pgconn"
Expand All @@ -23,26 +22,15 @@ const (
crdbClockSkewMessage = "cannot specify timestamp in the future"

errReachedMaxRetry = "maximum retries reached"

sqlRollback = "ROLLBACK TO SAVEPOINT cockroach_restart"
sqlSavepoint = "SAVEPOINT cockroach_restart"
sqlReleaseSavepoint = "RELEASE SAVEPOINT cockroach_restart"
)

var retryHistogram = prometheus.NewHistogram(prometheus.HistogramOpts{
Name: "crdb_client_retries",
Help: "cockroachdb client-side retry distribution",
Buckets: []float64{0, 1, 2, 5, 10, 20, 50},
})

var resetHistogram = prometheus.NewHistogram(prometheus.HistogramOpts{
Name: "crdb_client_resets",
Help: "cockroachdb client-side tx reset distribution",
Buckets: []float64{0, 1, 2, 5, 10, 20, 50},
})

func init() {
prometheus.MustRegister(retryHistogram)
prometheus.MustRegister(resetHistogram)
}

Expand All @@ -55,21 +43,6 @@ type transactionFn func(tx pgx.Tx) error

type executeTxRetryFunc func(context.Context, conn, pgx.TxOptions, transactionFn) error

// RetryError wraps an error that prevented the transaction function from being retried.
type RetryError struct {
err error
}

// Error returns the wrapped error
func (e RetryError) Error() string {
return fmt.Errorf("unable to retry conflicted transaction: %w", e.err).Error()
}

// Unwrap returns the wrapped, non-retriable error
func (e RetryError) Unwrap() error {
return e.err
}

func executeWithMaxRetries(max int) executeTxRetryFunc {
return func(ctx context.Context, conn conn, txOptions pgx.TxOptions, fn transactionFn) (err error) {
return executeWithResets(ctx, conn, txOptions, fn, max)
Expand All @@ -78,9 +51,8 @@ func executeWithMaxRetries(max int) executeTxRetryFunc {

// executeWithResets executes transactionFn and resets the tx when ambiguous crdb errors are encountered.
func executeWithResets(ctx context.Context, conn conn, txOptions pgx.TxOptions, fn transactionFn, maxRetries int) (err error) {
var retries, resets int
var resets int
defer func() {
retryHistogram.Observe(float64(retries))
resetHistogram.Observe(float64(resets))
}()

Expand All @@ -102,24 +74,19 @@ func executeWithResets(ctx context.Context, conn conn, txOptions pgx.TxOptions,
}()

// NOTE: n maxRetries can yield n+1 executions of the transaction fn
for retries = 0; retries <= maxRetries; retries++ {
for resets = 0; resets <= maxRetries; resets++ {
tx, err = resetExecution(ctx, conn, tx, txOptions)
if err != nil {
log.Err(err).Msg("error resetting transaction")
resets++
continue
}

retryAttempts, txErr := executeWithRetries(ctx, tx, fn, maxRetries-retries)
retries += retryAttempts
if txErr == nil {
return
if resetable(ctx, err) {
continue
} else {
return
}
}
err = txErr

if resetable(ctx, err) {
if err = fn(tx); resetable(ctx, err) {
log.Err(err).Msg("resettable error, will attempt to reset tx")
resets++
continue
}

Expand All @@ -129,42 +96,7 @@ func executeWithResets(ctx context.Context, conn conn, txOptions pgx.TxOptions,
return
}

// executeWithRetries executes the transaction fn and attempts to retry up to maxRetries.
// adapted from https://github.com/cockroachdb/cockroach-go/blob/05d7aaec086fe3288377923bf9a98648d29c44c6/crdb/tx.go#L95
func executeWithRetries(ctx context.Context, currentTx pgx.Tx, fn transactionFn, maxRetries int) (retries int, err error) {
releasedFn := func(tx pgx.Tx) error {
if err := fn(tx); err != nil {
return err
}

// RELEASE acts like COMMIT in CockroachDB. We use it since it gives us an
// opportunity to react to retryable errors, whereas tx.Commit() doesn't.

// RELEASE SAVEPOINT itself can fail, in which case the entire
// transaction needs to be retried
if _, err := tx.Exec(ctx, sqlReleaseSavepoint); err != nil {
return err
}
return nil
}

var i int
for i = 0; i <= maxRetries; i++ {
if err = releasedFn(currentTx); err != nil {
if retriable(ctx, err) {
if _, retryErr := currentTx.Exec(ctx, sqlRollback); retryErr != nil {
return i, RetryError{err: retryErr}
}
continue
}
return i, err
}
return i, nil
}
return i, errors.New(errReachedMaxRetry)
}

// resetExecution attempts to rollback the given tx, begins a new tx with a new connection, and creates a savepoint.
// resetExecution attempts to rollback the given tx and begins a new tx with a new connection.
func resetExecution(ctx context.Context, conn conn, tx pgx.Tx, txOptions pgx.TxOptions) (newTx pgx.Tx, err error) {
log.Info().Msg("attempting to initialize new tx")
if tx != nil {
Expand All @@ -177,31 +109,26 @@ func resetExecution(ctx context.Context, conn conn, tx pgx.Tx, txOptions pgx.TxO
if err != nil {
return nil, err
}
if _, err = newTx.Exec(ctx, sqlSavepoint); err != nil {
return nil, err
}

return newTx, nil
}

func retriable(ctx context.Context, err error) bool {
return sqlErrorCode(ctx, err) == crdbRetryErrCode
}

func resetable(ctx context.Context, err error) bool {
sqlState := sqlErrorCode(ctx, err)
// Ambiguous result error includes connection closed errors
// https://www.cockroachlabs.com/docs/stable/common-errors.html#result-is-ambiguous
return sqlState == crdbAmbiguousErrorCode ||
// Reset for retriable errors
sqlState == crdbRetryErrCode ||
// Error encountered when crdb nodes have large clock skew
(sqlState == crdbUnknownSQLState && strings.Contains(err.Error(), crdbClockSkewMessage))
}

// sqlErrorCode attenmpts to extract the crdb error code from the error state.
// sqlErrorCode attempts to extract the crdb error code from the error state.
func sqlErrorCode(ctx context.Context, err error) string {
var pgerr *pgconn.PgError
if !errors.As(err, &pgerr) {
log.Info().Err(err).Msg("couldn't determine a sqlstate error code")
log.Debug().Err(err).Msg("couldn't determine a sqlstate error code")
return ""
}

Expand Down
42 changes: 21 additions & 21 deletions internal/datastore/crdb/tx_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,13 +25,13 @@ const (

var testUserNS = namespace.Namespace(testUserNamespace)

func executeWithErrors(errors *[]pgconn.PgError, maxRetries int) executeTxRetryFunc {
func executeWithErrors(errors *[]error, maxRetries int) executeTxRetryFunc {
return func(ctx context.Context, conn conn, txOptions pgx.TxOptions, fn transactionFn) (err error) {
wrappedFn := func(tx pgx.Tx) error {
if len(*errors) > 0 {
retErr := (*errors)[0]
(*errors) = (*errors)[1:]
return &retErr
return retErr
}

return fn(tx)
Expand All @@ -45,65 +45,65 @@ func TestTxReset(t *testing.T) {
cases := []struct {
name string
maxRetries int
errors []pgconn.PgError
errors []error
expectError bool
expectedError error
}{
{
name: "retryable",
maxRetries: 4,
errors: []pgconn.PgError{
{Code: crdbRetryErrCode},
{Code: crdbRetryErrCode},
{Code: crdbRetryErrCode},
errors: []error{
&pgconn.PgError{Code: crdbRetryErrCode},
&pgconn.PgError{Code: crdbRetryErrCode},
&pgconn.PgError{Code: crdbRetryErrCode},
},
expectError: false,
expectedError: nil,
},
{
name: "resettable",
maxRetries: 4,
errors: []pgconn.PgError{
{Code: crdbAmbiguousErrorCode},
{Code: crdbAmbiguousErrorCode},
{Code: crdbAmbiguousErrorCode},
errors: []error{
&pgconn.PgError{Code: crdbAmbiguousErrorCode},
&pgconn.PgError{Code: crdbAmbiguousErrorCode},
&pgconn.PgError{Code: crdbAmbiguousErrorCode},
},
expectError: false,
expectedError: nil,
},
{
name: "mixed",
maxRetries: 50,
errors: []pgconn.PgError{
{Code: crdbRetryErrCode},
{Code: crdbAmbiguousErrorCode},
{Code: crdbRetryErrCode},
errors: []error{
&pgconn.PgError{Code: crdbRetryErrCode},
&pgconn.PgError{Code: crdbAmbiguousErrorCode},
&pgconn.PgError{Code: crdbRetryErrCode},
},
expectError: false,
expectedError: nil,
},
{
name: "noErrors",
maxRetries: 50,
errors: []pgconn.PgError{},
errors: []error{},
expectError: false,
expectedError: nil,
},
{
name: "nonRecoverable",
maxRetries: 1,
errors: []pgconn.PgError{
{Code: crdbRetryErrCode},
{Code: crdbAmbiguousErrorCode},
errors: []error{
&pgconn.PgError{Code: crdbRetryErrCode},
&pgconn.PgError{Code: crdbAmbiguousErrorCode},
},
expectError: true,
expectedError: errors.New(errReachedMaxRetry),
},
{
name: "clockSkew",
maxRetries: 1,
errors: []pgconn.PgError{
{Code: crdbUnknownSQLState, Message: crdbClockSkewMessage},
errors: []error{
&pgconn.PgError{Code: crdbUnknownSQLState, Message: crdbClockSkewMessage},
},
expectError: false,
expectedError: nil,
Expand Down

0 comments on commit 9eb9d4f

Please sign in to comment.