Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

storage: don't perform one-phase commit transactions after restarts #40518

Merged
merged 5 commits into from
Sep 6, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 0 additions & 6 deletions pkg/kv/dist_sender_server_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2464,9 +2464,6 @@ func TestTxnCoordSenderRetries(t *testing.T) {
b.Put("c", "put")
return txn.CommitInBatch(ctx, b)
},
// Parallel commits do not support the canForwardSerializableTimestamp
// optimization. That's ok because we need to removed that optimization
// anyway. See #36431.
txnCoordRetry: true,
},
{
Expand Down Expand Up @@ -2549,9 +2546,6 @@ func TestTxnCoordSenderRetries(t *testing.T) {
b.Put("c", "put")
return txn.CommitInBatch(ctx, b) // both puts will succeed, et will retry
},
// Parallel commits do not support the canForwardSerializableTimestamp
// optimization. That's ok because we need to removed that optimization
// anyway. See #36431.
txnCoordRetry: true,
},
{
Expand Down
28 changes: 28 additions & 0 deletions pkg/kv/txn_coord_sender_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -608,6 +608,34 @@ func TestTxnCoordSenderCleanupOnAborted(t *testing.T) {
verifyCleanup(key, s.Eng, t, txn1.Sender().(*TxnCoordSender), txn2.Sender().(*TxnCoordSender))
}

// TestTxnCoordSenderCleanupOnCommitAfterRestart verifies that if a txn restarts
// at a higher epoch and then commits before it has written anything in the new
// epoch, the coordinator still cleans up the transaction. In #40466, we saw that
// this case could be detected as a 1PC transaction and the cleanup during the
// commit could be omitted.
func TestTxnCoordSenderCleanupOnCommitAfterRestart(t *testing.T) {
defer leaktest.AfterTest(t)()
s := createTestDB(t)
defer s.Stop()
ctx := context.Background()

// Create a transaction with intent at "a".
key := roachpb.Key("a")
txn := client.NewTxn(ctx, s.DB, 0 /* gatewayNodeID */, client.RootTxn)
if err := txn.Put(ctx, key, []byte("value")); err != nil {
t.Fatal(err)
}

// Restart the transaction with a new epoch.
txn.ManualRestart(ctx, s.Clock.Now())

// Now immediately commit.
if err := txn.CommitOrCleanup(ctx); err != nil {
t.Fatal(err)
}
verifyCleanup(key, s.Eng, t, txn.Sender().(*TxnCoordSender))
}

// TestTxnCoordSenderGCWithAmbiguousResultErr verifies that the coordinator
// cleans up extant transactions and intents after an ambiguous result error is
// observed, even if the error is on the first request.
Expand Down
33 changes: 17 additions & 16 deletions pkg/storage/batcheval/cmd_end_transaction.go
Original file line number Diff line number Diff line change
Expand Up @@ -402,33 +402,34 @@ func IsEndTransactionTriggeringRetryError(
}

// A transaction can still avoid a retry under certain conditions.
if retry && canForwardSerializableTimestamp(txn, args.NoRefreshSpans) {
if retry && CanForwardCommitTimestampWithoutRefresh(txn, args) {
retry, reason = false, 0
}

if !retry {
if IsEndTransactionExceedingDeadline(txn.Timestamp, args) {
exceededBy := txn.Timestamp.GoTime().Sub(args.Deadline.GoTime())
fromStart := txn.Timestamp.GoTime().Sub(txn.OrigTimestamp.GoTime())
extraMsg = fmt.Sprintf(
"txn timestamp pushed too much; deadline exceeded by %s (%s > %s), "+
"original timestamp %s ago (%s)",
exceededBy, txn.Timestamp, args.Deadline, fromStart, txn.OrigTimestamp)
retry, reason = true, roachpb.RETRY_COMMIT_DEADLINE_EXCEEDED
}
// However, a transaction must obey its deadline, if set.
if !retry && IsEndTransactionExceedingDeadline(txn.Timestamp, args) {
exceededBy := txn.Timestamp.GoTime().Sub(args.Deadline.GoTime())
fromStart := txn.Timestamp.GoTime().Sub(txn.OrigTimestamp.GoTime())
extraMsg = fmt.Sprintf(
"txn timestamp pushed too much; deadline exceeded by %s (%s > %s), "+
"original timestamp %s ago (%s)",
exceededBy, txn.Timestamp, args.Deadline, fromStart, txn.OrigTimestamp)
retry, reason = true, roachpb.RETRY_COMMIT_DEADLINE_EXCEEDED
}

return retry, reason, extraMsg
}

// canForwardSerializableTimestamp returns whether a serializable txn can
// be safely committed with a forwarded timestamp. This requires that
// CanForwardCommitTimestampWithoutRefresh returns whether a txn can be
// safely committed with a timestamp above its read timestamp without
// requiring a read refresh (see txnSpanRefresher). This requires that
// the transaction's timestamp has not leaked and that the transaction
// has encountered no spans which require refreshing at the forwarded
// timestamp. If either of those conditions are true, a client-side
// retry is required.
func canForwardSerializableTimestamp(txn *roachpb.Transaction, noRefreshSpans bool) bool {
return !txn.OrigTimestampWasObserved && noRefreshSpans
func CanForwardCommitTimestampWithoutRefresh(
txn *roachpb.Transaction, args *roachpb.EndTransactionRequest,
) bool {
return !txn.OrigTimestampWasObserved && args.NoRefreshSpans
}

const intentResolutionBatchSize = 500
Expand Down
123 changes: 94 additions & 29 deletions pkg/storage/replica_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -521,50 +521,115 @@ func TestMaybeStripInFlightWrites(t *testing.T) {
// transactional batch can be committed as an atomic write.
func TestIsOnePhaseCommit(t *testing.T) {
defer leaktest.AfterTest(t)()
txnReqs := make([]roachpb.RequestUnion, 3)
txnReqs[0].MustSetInner(&roachpb.BeginTransactionRequest{})
txnReqs[1].MustSetInner(&roachpb.PutRequest{})
txnReqs[2].MustSetInner(&roachpb.EndTransactionRequest{Commit: true})
txnReqsNoRefresh := make([]roachpb.RequestUnion, 3)
txnReqsNoRefresh[0].MustSetInner(&roachpb.BeginTransactionRequest{})
txnReqsNoRefresh[1].MustSetInner(&roachpb.PutRequest{})
txnReqsNoRefresh[2].MustSetInner(&roachpb.EndTransactionRequest{Commit: true, NoRefreshSpans: true})
withSeq := func(req roachpb.Request, seq enginepb.TxnSeq) roachpb.Request {
h := req.Header()
h.Sequence = seq
req.SetHeader(h)
return req
}
makeReqs := func(reqs ...roachpb.Request) []roachpb.RequestUnion {
ru := make([]roachpb.RequestUnion, len(reqs))
for i, r := range reqs {
ru[i].MustSetInner(r)
}
return ru
}

noReqs := makeReqs()
getReq := makeReqs(withSeq(&roachpb.GetRequest{}, 0))
putReq := makeReqs(withSeq(&roachpb.PutRequest{}, 1))
etReq := makeReqs(withSeq(&roachpb.EndTransactionRequest{Commit: true}, 1))
txnReqs := makeReqs(
withSeq(&roachpb.BeginTransactionRequest{}, 0),
withSeq(&roachpb.PutRequest{}, 1),
withSeq(&roachpb.EndTransactionRequest{Commit: true}, 2),
)
txnReqsNoRefresh := makeReqs(
withSeq(&roachpb.BeginTransactionRequest{}, 0),
withSeq(&roachpb.PutRequest{}, 1),
withSeq(&roachpb.EndTransactionRequest{Commit: true, NoRefreshSpans: true}, 2),
)
txnReqsRequire1PC := makeReqs(
withSeq(&roachpb.BeginTransactionRequest{}, 0),
withSeq(&roachpb.PutRequest{}, 1),
withSeq(&roachpb.EndTransactionRequest{Commit: true, Require1PC: true}, 2),
)

testCases := []struct {
bu []roachpb.RequestUnion
isTxn bool
isWTO bool
isTSOff bool
exp1PC bool
ru []roachpb.RequestUnion
isTxn bool
isRestarted bool
isWTO bool
isTSOff bool
exp1PC bool
}{
{[]roachpb.RequestUnion{}, false, false, false, false},
{[]roachpb.RequestUnion{}, true, false, false, false},
{[]roachpb.RequestUnion{{Value: &roachpb.RequestUnion_Get{Get: &roachpb.GetRequest{}}}}, true, false, false, false},
{[]roachpb.RequestUnion{{Value: &roachpb.RequestUnion_Put{Put: &roachpb.PutRequest{}}}}, true, false, false, false},
{txnReqs[0 : len(txnReqs)-1], true, false, false, false},
{txnReqs[1:], true, false, false, false},
{txnReqs, true, false, false, true},
{txnReqs, true, true, false, false},
{txnReqs, true, false, true, false},
{txnReqs, true, true, true, false},
{txnReqsNoRefresh, true, false, false, true},
{txnReqsNoRefresh, true, true, false, true},
{txnReqsNoRefresh, true, false, true, true},
{txnReqsNoRefresh, true, true, true, true},
{ru: noReqs, isTxn: false, exp1PC: false},
{ru: noReqs, isTxn: true, exp1PC: false},
{ru: getReq, isTxn: true, exp1PC: false},
{ru: putReq, isTxn: true, exp1PC: false},
{ru: etReq, isTxn: true, exp1PC: true},
{ru: etReq, isTxn: true, isTSOff: true, exp1PC: false},
{ru: etReq, isTxn: true, isWTO: true, exp1PC: false},
{ru: etReq, isTxn: true, isWTO: true, isTSOff: true, exp1PC: false},
{ru: etReq, isTxn: true, isRestarted: true, exp1PC: false},
{ru: etReq, isTxn: true, isRestarted: true, isTSOff: true, exp1PC: false},
{ru: etReq, isTxn: true, isRestarted: true, isWTO: true, exp1PC: false},
{ru: etReq, isTxn: true, isRestarted: true, isWTO: true, isTSOff: true, exp1PC: false},
{ru: txnReqs[0:2], isTxn: true, exp1PC: false},
{ru: txnReqs[1:], isTxn: true, exp1PC: true},
{ru: txnReqs[2:], isTxn: true, exp1PC: false},
{ru: txnReqs, isTxn: true, exp1PC: true},
{ru: txnReqs, isTxn: true, isTSOff: true, exp1PC: false},
{ru: txnReqs, isTxn: true, isWTO: true, exp1PC: false},
{ru: txnReqs, isTxn: true, isWTO: true, isTSOff: true, exp1PC: false},
{ru: txnReqs, isTxn: true, isRestarted: true, exp1PC: false},
{ru: txnReqs, isTxn: true, isRestarted: true, isTSOff: true, exp1PC: false},
{ru: txnReqs, isTxn: true, isRestarted: true, isWTO: true, exp1PC: false},
{ru: txnReqs, isTxn: true, isRestarted: true, isWTO: true, isTSOff: true, exp1PC: false},
{ru: txnReqsNoRefresh[0:2], isTxn: true, exp1PC: false},
{ru: txnReqsNoRefresh[1:], isTxn: true, exp1PC: true},
{ru: txnReqsNoRefresh[2:], isTxn: true, exp1PC: false},
{ru: txnReqsNoRefresh, isTxn: true, exp1PC: true},
{ru: txnReqsNoRefresh, isTxn: true, isTSOff: true, exp1PC: true},
{ru: txnReqsNoRefresh, isTxn: true, isWTO: true, exp1PC: true},
{ru: txnReqsNoRefresh, isTxn: true, isWTO: true, isTSOff: true, exp1PC: true},
{ru: txnReqsNoRefresh, isTxn: true, isRestarted: true, exp1PC: false},
{ru: txnReqsNoRefresh, isTxn: true, isRestarted: true, isTSOff: true, exp1PC: false},
{ru: txnReqsNoRefresh, isTxn: true, isRestarted: true, isWTO: true, exp1PC: false},
{ru: txnReqsNoRefresh, isTxn: true, isRestarted: true, isWTO: true, isTSOff: true, exp1PC: false},
{ru: txnReqsRequire1PC[0:2], isTxn: true, exp1PC: false},
{ru: txnReqsRequire1PC[1:], isTxn: true, exp1PC: true},
{ru: txnReqsRequire1PC[2:], isTxn: true, exp1PC: false},
{ru: txnReqsRequire1PC, isTxn: true, exp1PC: true},
{ru: txnReqsRequire1PC, isTxn: true, isTSOff: true, exp1PC: false},
{ru: txnReqsRequire1PC, isTxn: true, isWTO: true, exp1PC: false},
{ru: txnReqsRequire1PC, isTxn: true, isWTO: true, isTSOff: true, exp1PC: false},
{ru: txnReqsRequire1PC, isTxn: true, isRestarted: true, exp1PC: true},
{ru: txnReqsRequire1PC, isTxn: true, isRestarted: true, isTSOff: true, exp1PC: false},
{ru: txnReqsRequire1PC, isTxn: true, isRestarted: true, isWTO: true, exp1PC: false},
{ru: txnReqsRequire1PC, isTxn: true, isRestarted: true, isWTO: true, isTSOff: true, exp1PC: false},
}

clock := hlc.NewClock(hlc.UnixNano, time.Nanosecond)
for i, c := range testCases {
ba := roachpb.BatchRequest{Requests: c.bu}
ba := roachpb.BatchRequest{Requests: c.ru}
if c.isTxn {
ba.Txn = newTransaction("txn", roachpb.Key("a"), 1, clock)
if c.isRestarted {
ba.Txn.Restart(-1, 0, clock.Now())
}
if c.isWTO {
ba.Txn.WriteTooOld = true
}
if c.isTSOff {
ba.Txn.Timestamp = ba.Txn.OrigTimestamp.Add(1, 0)
}
} else {
require.False(t, c.isRestarted)
require.False(t, c.isWTO)
require.False(t, c.isTSOff)
}
if is1PC := isOnePhaseCommit(&ba, &StoreTestingKnobs{}); is1PC != c.exp1PC {
if is1PC := isOnePhaseCommit(&ba); is1PC != c.exp1PC {
t.Errorf("%d: expected 1pc=%t; got %t", i, c.exp1PC, is1PC)
}
}
Expand Down
47 changes: 28 additions & 19 deletions pkg/storage/replica_write.go
Original file line number Diff line number Diff line change
Expand Up @@ -261,7 +261,7 @@ func (r *Replica) evaluateWriteBatch(
ms := enginepb.MVCCStats{}
// If not transactional or there are indications that the batch's txn will
// require restart or retry, execute as normal.
if isOnePhaseCommit(ba, r.store.TestingKnobs()) {
if isOnePhaseCommit(ba) {
_, hasBegin := ba.GetArg(roachpb.BeginTransaction)
arg, _ := ba.GetArg(roachpb.EndTransaction)
etArg := arg.(*roachpb.EndTransactionRequest)
Expand All @@ -277,18 +277,18 @@ func (r *Replica) evaluateWriteBatch(
strippedBa.Requests = ba.Requests[:len(ba.Requests)-1] // strip end txn req
}

// If there were no refreshable spans earlier in the txn
// (e.g. earlier gets or scans), then the batch can be retried
// locally in the event of write too old errors.
retryLocally := etArg.NoRefreshSpans && !ba.Txn.OrigTimestampWasObserved
// Is the transaction allowed to retry locally in the event of
// write too old errors? This is only allowed if it is able to
// forward its commit timestamp without a read refresh.
canForwardTimestamp := batcheval.CanForwardCommitTimestampWithoutRefresh(ba.Txn, etArg)

// If all writes occurred at the intended timestamp, we've succeeded on the fast path.
rec := NewReplicaEvalContext(r, spans)
batch, br, res, pErr := r.evaluateWriteBatchWithLocalRetries(
ctx, idKey, rec, &ms, &strippedBa, spans, retryLocally,
ctx, idKey, rec, &ms, &strippedBa, spans, canForwardTimestamp,
)
if pErr == nil && (ba.Timestamp == br.Timestamp ||
(retryLocally && !batcheval.IsEndTransactionExceedingDeadline(br.Timestamp, etArg))) {
(canForwardTimestamp && !batcheval.IsEndTransactionExceedingDeadline(br.Timestamp, etArg))) {
clonedTxn := ba.Txn.Clone()
clonedTxn.Status = roachpb.COMMITTED
// Make sure the returned txn has the actual commit
Expand Down Expand Up @@ -431,14 +431,15 @@ func (r *Replica) evaluateWriteBatchWithLocalRetries(
return
}

// isOnePhaseCommit returns true iff the BatchRequest contains all commands in
// the transaction, starting with BeginTransaction and ending with
// EndTransaction. One phase commits are disallowed if (1) the transaction has
// already been flagged with a write too old error, or (2) if isolation is
// serializable and the commit timestamp has been forwarded, or (3) the
// transaction exceeded its deadline, or (4) the testing knobs disallow optional
// one phase commits and the BatchRequest does not require one phase commit.
func isOnePhaseCommit(ba *roachpb.BatchRequest, knobs *StoreTestingKnobs) bool {
// isOnePhaseCommit returns true iff the BatchRequest contains all writes in the
// transaction and ends with an EndTransaction. One phase commits are disallowed
// if any of the following conditions are true:
// (1) the transaction has already been flagged with a write too old error
// (2) the transaction's commit timestamp has been forwarded
// (3) the transaction exceeded its deadline
// (4) the transaction is not in its first epoch and the EndTransaction request
// does not require one phase commit.
func isOnePhaseCommit(ba *roachpb.BatchRequest) bool {
if ba.Txn == nil {
return false
}
Expand All @@ -447,13 +448,21 @@ func isOnePhaseCommit(ba *roachpb.BatchRequest, knobs *StoreTestingKnobs) bool {
}
arg, _ := ba.GetArg(roachpb.EndTransaction)
etArg := arg.(*roachpb.EndTransactionRequest)
if batcheval.IsEndTransactionExceedingDeadline(ba.Txn.Timestamp, etArg) {
return false
}
if retry, _, _ := batcheval.IsEndTransactionTriggeringRetryError(ba.Txn, etArg); retry {
return false
}
return !knobs.DisableOptional1PC || etArg.Require1PC
// If the transaction has already restarted at least once then it may have
// left intents at prior epochs that need to be cleaned up during the
// process of committing the transaction. Even if the current epoch could
// perform a one phase commit, we don't allow it to because that could
// prevent it from properly resolving intents from prior epochs and cause
// it to abandon them instead.
//
// The exception to this rule is transactions that require a one phase
// commit. We know that if they also required a one phase commit in past
// epochs then they couldn't have left any intents that they now need to
// clean up.
return ba.Txn.Epoch == 0 || etArg.Require1PC
}

// maybeStripInFlightWrites attempts to remove all point writes and query
Expand Down
5 changes: 0 additions & 5 deletions pkg/storage/testing_knobs.go
Original file line number Diff line number Diff line change
Expand Up @@ -64,11 +64,6 @@ type StoreTestingKnobs struct {
// error returned to the client, or to simulate network failures.
TestingResponseFilter storagebase.ReplicaResponseFilter

// Disables the use of optional one phase commits. Even when enabled, requests
// that set the Require1PC flag are permitted to use one phase commits. This
// prevents wedging node liveness, which requires one phase commits during
// liveness updates.
DisableOptional1PC bool
// A hack to manipulate the clock before sending a batch request to a replica.
// TODO(kaneda): This hook is not encouraged to use. Get rid of it once
// we make TestServer take a ManualClock.
Expand Down