Skip to content

Commit

Permalink
kv: unit test PrepareTransactionForRetry and TransactionRefreshTimestamp
Browse files Browse the repository at this point in the history
Informs #104233.

This commit adds a pair of new unit tests to verify the behavior of
`PrepareTransactionForRetry` and `TransactionRefreshTimestamp`. These
functions will be getting more complex for #104233, so it will be
helpful to have these tests in place. The tests also serve as good
documentation.

Release note: None
  • Loading branch information
nvanbenschoten committed Aug 10, 2023
1 parent c5b6392 commit a4005e3
Show file tree
Hide file tree
Showing 4 changed files with 257 additions and 18 deletions.
9 changes: 5 additions & 4 deletions pkg/kv/kvclient/kvcoord/txn_coord_sender.go
Original file line number Diff line number Diff line change
Expand Up @@ -804,9 +804,7 @@ func (tc *TxnCoordSender) UpdateStateOnRemoteRetryableErr(
// not be usable afterwards (in case of TransactionAbortedError). The caller is
// expected to check the ID of the resulting transaction. If the TxnCoordSender
// can still be used, it will have been prepared for a new epoch.
func (tc *TxnCoordSender) handleRetryableErrLocked(
ctx context.Context, pErr *kvpb.Error,
) *kvpb.TransactionRetryWithProtoRefreshError {
func (tc *TxnCoordSender) handleRetryableErrLocked(ctx context.Context, pErr *kvpb.Error) error {
// If the error is a transaction retry error, update metrics to
// reflect the reason for the restart. More details about the
// different error types are documented above on the metaRestart
Expand Down Expand Up @@ -842,7 +840,10 @@ func (tc *TxnCoordSender) handleRetryableErrLocked(
tc.metrics.RestartsUnknown.Inc()
}
errTxnID := pErr.GetTxn().ID
newTxn := kvpb.PrepareTransactionForRetry(ctx, pErr, tc.mu.userPriority, tc.clock)
newTxn, assertErr := kvpb.PrepareTransactionForRetry(pErr, tc.mu.userPriority, tc.clock)
if assertErr != nil {
return assertErr
}

// We'll pass a TransactionRetryWithProtoRefreshError up to the next layer.
retErr := kvpb.NewTransactionRetryWithProtoRefreshError(
Expand Down
2 changes: 2 additions & 0 deletions pkg/kv/kvpb/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,7 @@ go_test(
srcs = [
"api_test.go",
"batch_test.go",
"data_test.go",
"errors_test.go",
"node_decommissioned_error_test.go",
"replica_unavailable_error_test.go",
Expand All @@ -74,6 +75,7 @@ go_test(
"//pkg/util/buildutil",
"//pkg/util/hlc",
"//pkg/util/protoutil",
"//pkg/util/timeutil",
"//pkg/util/uuid",
"@com_github_cockroachdb_errors//:errors",
"@com_github_cockroachdb_redact//:redact",
Expand Down
31 changes: 17 additions & 14 deletions pkg/kv/kvpb/data.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,11 +12,9 @@
package kvpb

import (
"context"

"github.com/cockroachdb/cockroach/pkg/roachpb"
"github.com/cockroachdb/cockroach/pkg/util/hlc"
"github.com/cockroachdb/cockroach/pkg/util/log"
"github.com/cockroachdb/errors"
)

// PrepareTransactionForRetry returns a new Transaction to be used for retrying
Expand All @@ -34,14 +32,18 @@ import (
// In case retryErr tells us that a new Transaction needs to be created,
// isolation and name help initialize this new transaction.
func PrepareTransactionForRetry(
ctx context.Context, pErr *Error, pri roachpb.UserPriority, clock *hlc.Clock,
) roachpb.Transaction {
pErr *Error, pri roachpb.UserPriority, clock *hlc.Clock,
) (roachpb.Transaction, error) {
if pErr == nil {
return roachpb.Transaction{}, errors.AssertionFailedf("nil error")
}
if pErr.TransactionRestart() == TransactionRestart_NONE {
log.Fatalf(ctx, "invalid retryable err (%T): %s", pErr.GetDetail(), pErr)
return roachpb.Transaction{}, errors.AssertionFailedf(
"invalid retryable error (%T): %s", pErr.GetDetail(), pErr)
}

if pErr.GetTxn() == nil {
log.Fatalf(ctx, "missing txn for retryable error: %s", pErr)
return roachpb.Transaction{}, errors.AssertionFailedf(
"missing txn for retryable error: %s", pErr)
}

txn := *pErr.GetTxn()
Expand Down Expand Up @@ -108,19 +110,20 @@ func PrepareTransactionForRetry(
// IntentMissingErrors are not expected to be handled at this level;
// We instead expect the txnPipeliner to transform them into a
// TransactionRetryErrors(RETRY_ASYNC_WRITE_FAILURE) error.
log.Fatalf(
ctx, "unexpected intent missing error (%T); should be transformed into retry error", pErr.GetDetail(),
)
return roachpb.Transaction{}, errors.AssertionFailedf(
"unexpected intent missing error (%T); should be transformed into retry error", pErr.GetDetail())
default:
log.Fatalf(ctx, "invalid retryable err (%T): %s", pErr.GetDetail(), pErr)
return roachpb.Transaction{}, errors.AssertionFailedf(
"invalid retryable err (%T): %s", pErr.GetDetail(), pErr)
}
if !aborted {
if txn.Status.IsFinalized() {
log.Fatalf(ctx, "transaction unexpectedly finalized in (%T): %s", pErr.GetDetail(), pErr)
return roachpb.Transaction{}, errors.AssertionFailedf(
"transaction unexpectedly finalized in (%T): %s", pErr.GetDetail(), pErr)
}
txn.Restart(pri, txn.Priority, txn.WriteTimestamp)
}
return txn
return txn, nil
}

// TransactionRefreshTimestamp returns whether the supplied error is a retry
Expand Down
233 changes: 233 additions & 0 deletions pkg/kv/kvpb/data_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,233 @@
// Copyright 2023 The Cockroach Authors.
//
// Use of this software is governed by the Business Source License
// included in the file licenses/BSL.txt.
//
// As of the Change Date specified in that file, in accordance with
// the Business Source License, use of this software will be governed
// by the Apache License, Version 2.0, included in the file
// licenses/APL.txt.

package kvpb

import (
"testing"

"github.com/cockroachdb/cockroach/pkg/kv/kvserver/concurrency/isolation"
"github.com/cockroachdb/cockroach/pkg/roachpb"
"github.com/cockroachdb/cockroach/pkg/storage/enginepb"
"github.com/cockroachdb/cockroach/pkg/util/hlc"
"github.com/cockroachdb/cockroach/pkg/util/timeutil"
"github.com/cockroachdb/cockroach/pkg/util/uuid"
"github.com/cockroachdb/errors"
"github.com/stretchr/testify/require"
)

func TestPrepareTransactionForRetry(t *testing.T) {
ts1 := hlc.Timestamp{WallTime: 1}
ts2 := hlc.Timestamp{WallTime: 2}
tsClock := hlc.Timestamp{WallTime: 3}
txn := roachpb.MakeTransaction("test", nil, isolation.Serializable, -1, ts1, 0, 99)
txn2ID := uuid.MakeV4() // used if txn is aborted
tests := []struct {
name string
err *Error
expTxn roachpb.Transaction
expErr bool
}{
{
name: "no error",
err: nil,
expErr: true,
},
{
name: "no txn",
err: NewError(errors.New("random")),
expErr: true,
},
{
name: "random error",
err: NewErrorWithTxn(errors.New("random"), &txn),
expErr: true,
},
{
name: "txn aborted error",
err: NewErrorWithTxn(&TransactionAbortedError{}, &txn),
expTxn: func() roachpb.Transaction {
nextTxn := txn
nextTxn.ID = txn2ID
nextTxn.ReadTimestamp = tsClock
nextTxn.WriteTimestamp = tsClock
nextTxn.MinTimestamp = tsClock
nextTxn.LastHeartbeat = tsClock
nextTxn.GlobalUncertaintyLimit = tsClock
return nextTxn
}(),
},
{
name: "read within uncertainty error",
err: NewErrorWithTxn(&ReadWithinUncertaintyIntervalError{ValueTimestamp: ts2}, &txn),
expTxn: func() roachpb.Transaction {
nextTxn := txn
nextTxn.Epoch++
nextTxn.ReadTimestamp = ts2.Next()
nextTxn.WriteTimestamp = ts2.Next()
return nextTxn
}(),
},
{
name: "txn push error",
err: NewErrorWithTxn(&TransactionPushError{
PusheeTxn: roachpb.Transaction{TxnMeta: enginepb.TxnMeta{WriteTimestamp: ts2, Priority: 3}},
}, &txn),
expTxn: func() roachpb.Transaction {
nextTxn := txn
nextTxn.Epoch++
nextTxn.ReadTimestamp = ts2
nextTxn.WriteTimestamp = ts2
nextTxn.Priority = 2
return nextTxn
}(),
},
{
name: "txn retry error (reason: write too old)",
err: NewErrorWithTxn(&TransactionRetryError{Reason: RETRY_WRITE_TOO_OLD}, &txn),
expTxn: func() roachpb.Transaction {
nextTxn := txn
nextTxn.Epoch++
return nextTxn
}(),
},
{
name: "txn retry error (reason: serializable)",
err: NewErrorWithTxn(&TransactionRetryError{Reason: RETRY_SERIALIZABLE}, &txn),
expTxn: func() roachpb.Transaction {
nextTxn := txn
nextTxn.Epoch++
nextTxn.ReadTimestamp = tsClock
nextTxn.WriteTimestamp = tsClock
return nextTxn
}(),
},
{
name: "write too old error",
err: NewErrorWithTxn(&WriteTooOldError{ActualTimestamp: ts2}, &txn),
expTxn: func() roachpb.Transaction {
nextTxn := txn
nextTxn.Epoch++
nextTxn.ReadTimestamp = ts2
nextTxn.WriteTimestamp = ts2
return nextTxn
}(),
},
{
name: "intent missing error",
err: NewErrorWithTxn(&IntentMissingError{}, &txn),
expErr: true,
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
clock := hlc.NewClockForTesting(timeutil.NewManualTime(timeutil.Unix(0, tsClock.WallTime)))
nextTxn, err := PrepareTransactionForRetry(tt.err, -1 /* pri */, clock)
if tt.expErr {
require.Error(t, err)
require.True(t, errors.IsAssertionFailure(err))
require.Zero(t, nextTxn)
} else {
require.NoError(t, err)
if nextTxn.ID != txn.ID {
// Eliminate randomness from ID generation.
nextTxn.ID = txn2ID
}
require.Equal(t, tt.expTxn, nextTxn)
}
})
}
}

func TestTransactionRefreshTimestamp(t *testing.T) {
ts1 := hlc.Timestamp{WallTime: 1}
ts2 := hlc.Timestamp{WallTime: 2}
txn := roachpb.MakeTransaction("test", nil, isolation.Serializable, 1, ts1, 0, 99)
tests := []struct {
name string
err *Error
expOk bool
expTs hlc.Timestamp
}{
{
name: "no error",
err: nil,
expOk: false,
expTs: hlc.Timestamp{},
},
{
name: "no txn",
err: NewError(errors.New("random")),
expOk: false,
expTs: hlc.Timestamp{},
},
{
name: "random error",
err: NewErrorWithTxn(errors.New("random"), &txn),
expOk: false,
expTs: hlc.Timestamp{},
},
{
name: "txn aborted error",
err: NewErrorWithTxn(&TransactionAbortedError{}, &txn),
expOk: false,
expTs: hlc.Timestamp{},
},
{
name: "txn retry error (reason: unknown)",
err: NewErrorWithTxn(&TransactionRetryError{Reason: RETRY_REASON_UNKNOWN}, &txn),
expOk: false,
expTs: hlc.Timestamp{},
},
{
name: "txn retry error (reason: write too old)",
err: NewErrorWithTxn(&TransactionRetryError{Reason: RETRY_WRITE_TOO_OLD}, &txn),
expOk: true,
expTs: ts1,
},
{
name: "txn retry error (reason: serializable)",
err: NewErrorWithTxn(&TransactionRetryError{Reason: RETRY_SERIALIZABLE}, &txn),
expOk: true,
expTs: ts1,
},
{
name: "txn retry error (reason: async write failure)",
err: NewErrorWithTxn(&TransactionRetryError{Reason: RETRY_ASYNC_WRITE_FAILURE}, &txn),
expOk: false,
expTs: hlc.Timestamp{},
},
{
name: "txn retry error (reason: commit deadline exceeded)",
err: NewErrorWithTxn(&TransactionRetryError{Reason: RETRY_COMMIT_DEADLINE_EXCEEDED}, &txn),
expOk: false,
expTs: hlc.Timestamp{},
},
{
name: "write too old error",
err: NewErrorWithTxn(&WriteTooOldError{ActualTimestamp: ts2}, &txn),
expOk: true,
expTs: ts2,
},
{
name: "read within uncertainty error",
err: NewErrorWithTxn(&ReadWithinUncertaintyIntervalError{ValueTimestamp: ts2}, &txn),
expOk: true,
expTs: ts2.Next(),
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
ok, ts := TransactionRefreshTimestamp(tt.err)
require.Equal(t, tt.expOk, ok)
require.Equal(t, tt.expTs, ts)
})
}
}

0 comments on commit a4005e3

Please sign in to comment.