Skip to content

Commit

Permalink
storage: leave intents behind after blind-writes experiencing write-t…
Browse files Browse the repository at this point in the history
…oo-old

Before this patch, any write running into a write-too-old condition
resulted in a WriteTooOldError being returned by the server. Returning
an error implies that no intents are left behind. This is unfortunate;
we'd like to leave intents (or, in the future, other types of locks)
behind so keep away other transactions. We've observed this resulting in
the starvation of a class of transactions in a user's workload.

This patch makes it so that blind writes (i.e. Puts - used by UPDATE,
not CPuts) don't return WriteTooOldErrors any more. Instead, they return
the a txn proto with the WriteTooOld flag set. This is the behavior they
had before cockroachdb#38668. This patch retains the goal of cockroachdb#38668, however: the
client now eagerly refreshes the transactions when it sees a WriteTooOld
flag, and if the refresh succeeds, it returns a WriteTooOldError to the
higher layers (SQL), allowing for automatic retries where applicable.

Unfortunately, CPuts (used by INSERT) continue to return
WriteTooOldErrors without leaving locks behind. Dealing with them
requires more tenderness because they imply a read, and the timestamp of
a read cannot be bumped as easily as that of a write.

Touches cockroachdb#44653

Release note (SQL change): UPDATEs returning a serialization failure error (code
40001) now leave behind a lock, helping the transaction succeed if it
retries. This prevents starvation of transactions whose UPDATEs are
prone to conflicts.
  • Loading branch information
andreimatei committed Feb 6, 2020
1 parent e989c21 commit 02c3c16
Show file tree
Hide file tree
Showing 14 changed files with 771 additions and 875 deletions.
55 changes: 13 additions & 42 deletions c-deps/libroach/protos/roachpb/api.pb.cc

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

31 changes: 5 additions & 26 deletions c-deps/libroach/protos/roachpb/api.pb.h

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

4 changes: 4 additions & 0 deletions pkg/kv/dist_sender.go
Original file line number Diff line number Diff line change
Expand Up @@ -648,6 +648,10 @@ func splitBatchAndCheckForRefreshSpans(
// succeeded. Where possible, the caller may be able to update spans
// encountered in the transaction and retry just the EndTxn request to
// avoid client-side serializable txn retries.
// TODO(andrei,nvanbenschoten): Get rid of this returning both error and result,
// and change the span txnSpanRefresher to not look for partial results. It's
// probably better for the span refresher to refresh less (i.e. don't refresh
// the partial successes) and retry the whole batch.
func (ds *DistSender) Send(
ctx context.Context, ba roachpb.BatchRequest,
) (*roachpb.BatchResponse, *roachpb.Error) {
Expand Down
69 changes: 2 additions & 67 deletions pkg/kv/dist_sender_server_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1772,19 +1772,6 @@ func TestTxnCoordSenderRetries(t *testing.T) {
},
txnCoordRetry: true,
},
{
name: "deferred write too old with put",
afterTxnStart: func(ctx context.Context, db *client.DB) error {
return db.Put(ctx, "a", "put")
},
retryable: func(ctx context.Context, txn *client.Txn) error {
b := txn.NewBatch()
b.Header.DeferWriteTooOldError = true
b.Put("a", "put")
return txn.Run(ctx, b)
},
// This trivially succeeds as there are no refresh spans.
},
{
name: "write too old with put timestamp leaked",
afterTxnStart: func(ctx context.Context, db *client.DB) error {
Expand Down Expand Up @@ -2149,23 +2136,6 @@ func TestTxnCoordSenderRetries(t *testing.T) {
},
txnCoordRetry: true,
},
{
name: "multi-range batch with deferred write too old",
afterTxnStart: func(ctx context.Context, db *client.DB) error {
return db.Put(ctx, "c", "value")
},
retryable: func(ctx context.Context, txn *client.Txn) error {
b := txn.NewBatch()
b.Header.DeferWriteTooOldError = true
b.Put("a", "put")
b.Put("c", "put")
// Both sub-batches will succeed, but the Put(a) will return a pushed
// timestamp, which is turned into a retriable error by the txnCommitter
// interceptor (because it's concurrent with writing the STAGING record).
return txn.CommitInBatch(ctx, b)
},
txnCoordRetry: true,
},
{
name: "multi-range batch with write too old and failed cput",
beforeTxnStart: func(ctx context.Context, db *client.DB) error {
Expand Down Expand Up @@ -2200,42 +2170,6 @@ func TestTxnCoordSenderRetries(t *testing.T) {
// We expect the request to succeed after a server-side retry.
txnCoordRetry: false,
},
{
name: "multi-range batch with deferred write too old and failed cput",
beforeTxnStart: func(ctx context.Context, db *client.DB) error {
return db.Put(ctx, "a", "orig")
},
afterTxnStart: func(ctx context.Context, db *client.DB) error {
return db.Put(ctx, "a", "value")
},
retryable: func(ctx context.Context, txn *client.Txn) error {
b := txn.NewBatch()
b.Header.DeferWriteTooOldError = true
b.CPut("a", "cput", strToValue("orig"))
b.Put("c", "put")
return txn.CommitInBatch(ctx, b)
},
txnCoordRetry: false, // non-matching value means we fail txn coord retry
expFailure: "unexpected value", // the failure we get is a condition failed error
},
{
name: "multi-range batch with deferred write too old and successful cput",
beforeTxnStart: func(ctx context.Context, db *client.DB) error {
return db.Put(ctx, "a", "orig")
},
afterTxnStart: func(ctx context.Context, db *client.DB) error {
return db.Put(ctx, "a", "orig")
},
retryable: func(ctx context.Context, txn *client.Txn) error {
b := txn.NewBatch()
b.Header.DeferWriteTooOldError = true
b.CPut("a", "cput", strToValue("orig"))
b.Put("c", "put")
return txn.CommitInBatch(ctx, b)
},
// We expect the request to succeed after a server-side retry.
txnCoordRetry: false,
},
{
name: "cput within uncertainty interval",
beforeTxnStart: func(ctx context.Context, db *client.DB) error {
Expand Down Expand Up @@ -2470,7 +2404,8 @@ func TestTxnCoordSenderRetries(t *testing.T) {
metrics = txn.Sender().(*kv.TxnCoordSender).TxnCoordSenderFactory.Metrics()
lastAutoRetries = metrics.AutoRetries.Count()

return tc.retryable(ctx, txn)
err := tc.retryable(ctx, txn)
return err
}); err != nil {
if len(tc.expFailure) == 0 || !testutils.IsError(err, tc.expFailure) {
t.Fatal(err)
Expand Down
60 changes: 58 additions & 2 deletions pkg/kv/txn_interceptor_span_refresher.go
Original file line number Diff line number Diff line change
Expand Up @@ -228,9 +228,57 @@ func (sr *txnSpanRefresher) sendLockedWithRefreshAttempts(
ctx context.Context, ba roachpb.BatchRequest, maxRefreshAttempts int,
) (_ *roachpb.BatchResponse, _ *roachpb.Error, largestRefreshTS hlc.Timestamp) {
br, pErr := sr.sendHelper(ctx, ba)
if pErr == nil && br.Txn.WriteTooOld {
// If we got a response with the WriteTooOld flag set, then we pretend that
// we got a WriteTooOldError, which will cause us to attempt to refresh and
// propagate the error if we failed. When it can, the server prefers to
// return the WriteTooOld flag, rather than a WriteTooOldError because, in
// the former case, it can leave intents behind. We like refreshing eagerly
// when the WriteTooOld flag is set because it's likely that the refresh
// will fail (if we previously read the key that's now causing a WTO, then
// the refresh will surely fail).
// TODO(andrei): Implement a more discerning policy based on whether we've
// read that key before.
//
// If the refresh fails, we could continue running the transaction even
// though it will not be able to commit, in order for it to lay down more
// intents. Not doing so, though, gives the SQL a chance to auto-retry.
// TODO(andrei): Implement a more discerning policy based on whether
// auto-retries are still possible.
//
// For the refresh, we have two options: either refresh everything read
// *before* this batch, and then retry to batch, or refresh the current
// batch's reads too and then, if successful, there'd be nothing to refresh.
// We take the former option by setting br = nil below to minimized the
// chances that the refresh fails.
bumpedTxn := br.Txn.Clone()
bumpedTxn.WriteTooOld = false
bumpedTxn.ReadTimestamp = bumpedTxn.WriteTimestamp
pErr = roachpb.NewErrorWithTxn(
roachpb.NewTransactionRetryError(roachpb.RETRY_WRITE_TOO_OLD, ""),
bumpedTxn)
br = nil
}
if pErr != nil && maxRefreshAttempts > 0 {
br, pErr, largestRefreshTS = sr.maybeRetrySend(ctx, ba, br, pErr, maxRefreshAttempts)
}
if pErr != nil {
// Don't confuse layers above with both a result and an error. This layer is
// the last one that benefits from looking at partial results.
br = nil
} else {
// Terminate the txn.WriteTooOld flag here. We failed to refresh it away, so
// turn it into a retriable error.
if br.Txn.WriteTooOld {
newTxn := br.Txn.Clone()
newTxn.WriteTooOld = false
newTxn.ReadTimestamp = newTxn.WriteTimestamp
pErr = roachpb.NewErrorWithTxn(
roachpb.NewTransactionRetryError(roachpb.RETRY_WRITE_TOO_OLD, ""),
newTxn)
br = nil
}
}
return br, pErr, largestRefreshTS
}

Expand All @@ -256,9 +304,17 @@ func (sr *txnSpanRefresher) maybeRetrySend(
// case is a batch split between everything up to but not including
// the EndTxn. Requests up to the EndTxn succeed, but the EndTxn
// fails with a retryable error. We want to retry only the EndTxn.
// TODO(andrei): This attempt to only retry part of the request is probably a bad idea.
// We try to refresh more (including the successful part of the current
// request) and retry less, but it'd probably be a better idea to refresh less
// and retry more - thereby trading the cost of some evaluation for a higher
// chance that the refresh succeeds.
ba.UpdateTxn(retryTxn)
retryBa := ba
if br != nil {
// If br came back with the WriteTooOld flag set, then we need to refresh the
// whole request; we don't know which part of the request encountered the
// write too old condition.
if br != nil && !br.Txn.WriteTooOld {
doneBa := ba
doneBa.Requests = ba.Requests[:len(br.Responses)]
log.VEventf(ctx, 2, "collecting refresh spans after partial batch execution of %s", doneBa)
Expand All @@ -273,7 +329,7 @@ func (sr *txnSpanRefresher) maybeRetrySend(

// Try updating the txn spans so we can retry.
if ok := sr.tryUpdatingTxnSpans(ctx, retryTxn); !ok {
return nil, pErr, hlc.Timestamp{}
return br, pErr, hlc.Timestamp{}
}

// We've refreshed all of the read spans successfully and set
Expand Down
Loading

0 comments on commit 02c3c16

Please sign in to comment.