Skip to content

Commit

Permalink
kv: give non-transactional requests uncertainty intervals
Browse files Browse the repository at this point in the history
Fixes cockroachdb#58459.
Informs cockroachdb#36431.

This commit fixes a long-standing correctness issue where non-transactional
requests did not ensure single-key linearizability even if they deferred their
timestamp allocation to the leaseholder of their (single) range. They still
don't entirely, because of cockroachdb#36431, but this change brings us one step closer to
the fix we plan to land for cockroachdb#36431 also applying to non-transactional requests.

The change addresses this by giving non-transactional requests uncertainty
intervals. This ensures that they also guarantee single-key linearizability even
with only loose (but bounded) clock synchronization. Non-transactional requests
that use a client-provided timestamp do not have uncertainty intervals and do
not make real-time ordering guarantees.

Unlike transactions, which establish an uncertainty interval on their
coordinator node during initialization, non-transactional requests receive
uncertainty intervals from their target leaseholder, using a clock reading
from the leaseholder's local HLC as the local limit and this clock reading +
the cluster's maximum clock skew as the global limit.

It is somewhat non-intuitive that non-transactional requests need uncertainty
intervals — after all, they receive their timestamp to the leaseholder of the
only range that they talk to, so isn't every value with a commit timestamp
above their read timestamp certainly concurrent? The answer is surprisingly
"no" for the following reasons, so they cannot forgo the use of uncertainty
interval:

1. the request timestamp is allocated before consulting the replica's lease.
   This means that there are times when the replica is not the leaseholder at
   the point of timestamp allocation, and only becomes the leaseholder later.
   In such cases, the timestamp assigned to the request is not guaranteed to
   be greater than the written_timestamp of all writes served by the range at
   the time of allocation. This is true despite invariants 1 & 2 from above,
   because the replica allocating the timestamp is not yet the leaseholder.

   In cases where the replica that assigned the non-transactional request's
   timestamp takes over as the leaseholder after the timestamp allocation, we
   expect minimumLocalLimitForLeaseholder to forward the local uncertainty
   limit above TimestampFromServerClock, to the lease start time.

2. ignoring reason #1, the request timestamp assigned by the leaseholder is
   equal to its local uncertainty limit and is guaranteed to lead the
   written_timestamp of all writes served by the range at the time the
   request arrived at the leaseholder node. However, this timestamp is still
   not guaranteed to lead the commit timestamp of all of these writes. As a
   result, the non-transactional request needs an uncertainty interval with a
   global uncertainty limit far enough in advance of the local HLC clock to
   ensure that it considers any value that was part of a transaction which
   could have committed before the request was received by the leaseholder to
   be uncertain. Concretely, the non-transactional request needs to consider
   values of the following form to be uncertain:

     written_timestamp < local_limit && commit_timestamp < global_limit

   The value that the non-transactional request is observing may have been
   written on the local leaseholder at time 10, its transaction may have been
   committed remotely at time 20, acknowledged, then the non-transactional
   request may have begun and received a timestamp of 15 from the local
   leaseholder, then finally the value may have been resolved asynchronously
   and moved to timestamp 20 (written_timestamp: 10, commit_timestamp: 20).
   The failure of the non-transactional request to observe this value would
   be a stale read.

----

Conveniently, because non-transactional requests are always scoped to a
single-range, those that hit uncertainty errors can always retry on the server,
so these errors never bubble up to the client that initiated the request.

However, before this commit, this wasn't actually possible because we did not
support server-side retries of `ReadWithinUncertaintyIntervalError`. The second
part of this commit adds support for this. This new support allows us to make
the guarantee we want about non-transactional requests never returning these
errors to the client, even though they use uncertainty intervals. It also serves
as a performance optimization for transactional requests, which now benefit from
this new capability. There's some complexity around supporting this form of
server-side retry, because it must be done above latching instead of below, but
recent refactors in cockroachdb#73557 and cockroachdb#73717 have made this possible to support
cleanly.

----

This change is related to cockroachdb#36431 in two ways. First, it allows non-transactional
requests to benefit from our incoming fix for that issue. Second, it unblocks
some of the clock refactors proposed in cockroachdb#72121 (comment),
and by extension cockroachdb#72663. Even though there are clear bugs today, I still don't
feel comfortable removing the `hlc.Clock.Update` in `Store.Send` until we make
this change. We know from cockroachdb#36431 that invariant 1 from
[`uncertainty.D6`](https://github.com/cockroachdb/cockroach/blob/22df0a6658a927e7b65dafbdfc9d790500791993/pkg/kv/kvserver/uncertainty/doc.go#L131)
doesn't hold, and yet I still do think the `hlc.Clock.Update` in `Store.Send`
masks the bugs in this area in many cases. Removing that clock update (I don't
actually plan to remove it, but I plan to disconnect it entirely from operation
timestamps) without first giving non-transactional requests uncertainty intervals
seems like it may reveal these bugs in ways we haven't seen in the past. So I'd
like to land this fix before making that change.

----

Release note (performance improvement): Certain forms of automatically retried
"read uncertainty" errors are now retried more efficiently, avoiding a network
round trip.
  • Loading branch information
nvanbenschoten committed Dec 13, 2021
1 parent 644d144 commit 64eb718
Show file tree
Hide file tree
Showing 19 changed files with 1,616 additions and 159 deletions.
27 changes: 14 additions & 13 deletions pkg/kv/kvclient/kvcoord/dist_sender_server_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2185,9 +2185,9 @@ func TestTxnCoordSenderRetries(t *testing.T) {
}
return txn.Run(ctx, b)
},
filter: newUncertaintyFilter(roachpb.Key([]byte("a"))),
// Expect a transaction coord retry, which should succeed.
txnCoordRetry: true,
filter: newUncertaintyFilter(roachpb.Key("a")),
// We expect the request to succeed after a server-side retry.
txnCoordRetry: false,
},
{
// Even if accounting for the refresh spans would have exhausted the
Expand Down Expand Up @@ -2875,8 +2875,9 @@ func TestTxnCoordSenderRetries(t *testing.T) {
retryable: func(ctx context.Context, txn *kv.Txn) error {
return txn.CPut(ctx, "a", "cput", kvclientutils.StrToCPutExistingValue("value"))
},
filter: newUncertaintyFilter(roachpb.Key([]byte("a"))),
txnCoordRetry: true,
filter: newUncertaintyFilter(roachpb.Key("a")),
// We expect the request to succeed after a server-side retry.
txnCoordRetry: false,
},
{
name: "cput within uncertainty interval after timestamp leaked",
Expand All @@ -2886,7 +2887,7 @@ func TestTxnCoordSenderRetries(t *testing.T) {
retryable: func(ctx context.Context, txn *kv.Txn) error {
return txn.CPut(ctx, "a", "cput", kvclientutils.StrToCPutExistingValue("value"))
},
filter: newUncertaintyFilter(roachpb.Key([]byte("a"))),
filter: newUncertaintyFilter(roachpb.Key("a")),
clientRetry: true,
tsLeaked: true,
},
Expand All @@ -2907,7 +2908,7 @@ func TestTxnCoordSenderRetries(t *testing.T) {
}
return txn.CPut(ctx, "a", "cput", kvclientutils.StrToCPutExistingValue("value"))
},
filter: newUncertaintyFilter(roachpb.Key([]byte("ac"))),
filter: newUncertaintyFilter(roachpb.Key("ac")),
txnCoordRetry: true,
},
{
Expand All @@ -2930,7 +2931,7 @@ func TestTxnCoordSenderRetries(t *testing.T) {
}
return nil
},
filter: newUncertaintyFilter(roachpb.Key([]byte("ac"))),
filter: newUncertaintyFilter(roachpb.Key("ac")),
clientRetry: true, // note this txn is read-only but still restarts
},
{
Expand All @@ -2946,7 +2947,7 @@ func TestTxnCoordSenderRetries(t *testing.T) {
b.CPut("c", "cput", kvclientutils.StrToCPutExistingValue("value"))
return txn.CommitInBatch(ctx, b)
},
filter: newUncertaintyFilter(roachpb.Key([]byte("c"))),
filter: newUncertaintyFilter(roachpb.Key("c")),
txnCoordRetry: true,
},
{
Expand All @@ -2968,7 +2969,7 @@ func TestTxnCoordSenderRetries(t *testing.T) {
b.CPut("a", "cput", kvclientutils.StrToCPutExistingValue("value"))
return txn.CommitInBatch(ctx, b)
},
filter: newUncertaintyFilter(roachpb.Key([]byte("a"))),
filter: newUncertaintyFilter(roachpb.Key("a")),
clientRetry: true, // will fail because of conflict on refresh span for the Get
},
{
Expand All @@ -2982,7 +2983,7 @@ func TestTxnCoordSenderRetries(t *testing.T) {
b.CPut("c", "cput", kvclientutils.StrToCPutExistingValue("value"))
return txn.CommitInBatch(ctx, b)
},
filter: newUncertaintyFilter(roachpb.Key([]byte("c"))),
filter: newUncertaintyFilter(roachpb.Key("c")),
// Expect a transaction coord retry, which should succeed.
txnCoordRetry: true,
},
Expand All @@ -2992,7 +2993,7 @@ func TestTxnCoordSenderRetries(t *testing.T) {
_, err := txn.Scan(ctx, "a", "d", 0)
return err
},
filter: newUncertaintyFilter(roachpb.Key([]byte("c"))),
filter: newUncertaintyFilter(roachpb.Key("c")),
// Expect a transaction coord retry, which should succeed.
txnCoordRetry: true,
},
Expand All @@ -3002,7 +3003,7 @@ func TestTxnCoordSenderRetries(t *testing.T) {
_, err := txn.DelRange(ctx, "a", "d", false /* returnKeys */)
return err
},
filter: newUncertaintyFilter(roachpb.Key([]byte("c"))),
filter: newUncertaintyFilter(roachpb.Key("c")),
// Expect a transaction coord retry, which should succeed.
txnCoordRetry: true,
},
Expand Down
42 changes: 24 additions & 18 deletions pkg/kv/kvserver/batcheval/declare.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ import (
"time"

"github.com/cockroachdb/cockroach/pkg/keys"
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/kvserverpb"
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/spanset"
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/uncertainty"
"github.com/cockroachdb/cockroach/pkg/roachpb"
Expand Down Expand Up @@ -52,24 +53,29 @@ func DefaultDeclareIsolatedKeys(
timestamp := header.Timestamp
if roachpb.IsReadOnly(req) && !roachpb.IsLocking(req) {
access = spanset.SpanReadOnly
if header.Txn != nil {
// For transactional reads, acquire read latches all the way up to
// the transaction's uncertainty limit, because reads may observe
// writes all the way up to this timestamp.
//
// It is critical that reads declare latches up through their
// uncertainty interval so that they are properly synchronized with
// earlier writes that may have a happened-before relationship with
// the read. These writes could not have completed and returned to
// the client until they were durable in the Range's Raft log.
// However, they may not have been applied to the replica's state
// machine by the time the write was acknowledged, because Raft
// entry application occurs asynchronously with respect to the
// writer (see AckCommittedEntriesBeforeApplication). Latching is
// the only mechanism that ensures that any observers of the write
// wait for the write apply before reading.
timestamp.Forward(header.Txn.GlobalUncertaintyLimit)
}

// For non-locking reads, acquire read latches all the way up to the
// request's worst-case (i.e. global) uncertainty limit, because reads may
// observe writes all the way up to this timestamp.
//
// It is critical that reads declare latches up through their uncertainty
// interval so that they are properly synchronized with earlier writes that
// may have a happened-before relationship with the read. These writes could
// not have completed and returned to the client until they were durable in
// the Range's Raft log. However, they may not have been applied to the
// replica's state machine by the time the write was acknowledged, because
// Raft entry application occurs asynchronously with respect to the writer
// (see AckCommittedEntriesBeforeApplication). Latching is the only
// mechanism that ensures that any observers of the write wait for the write
// apply before reading.
//
// NOTE: we pass an empty lease status here, which means that observed
// timestamps collected by transactions will not be used. The actual
// uncertainty interval used by the request may be smaller (i.e. contain a
// local limit), but we can't determine that until after we have declared
// keys, acquired latches, and consulted the replica's lease.
in := uncertainty.ComputeInterval(header, kvserverpb.LeaseStatus{}, maxOffset)
timestamp.Forward(in.GlobalLimit)
}
latchSpans.AddMVCC(access, req.Header().Span(), timestamp)
lockSpans.AddNonMVCC(access, req.Header().Span())
Expand Down
171 changes: 171 additions & 0 deletions pkg/kv/kvserver/client_replica_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -699,6 +699,177 @@ func TestTxnReadWithinUncertaintyIntervalAfterLeaseTransfer(t *testing.T) {
require.IsType(t, &roachpb.ReadWithinUncertaintyIntervalError{}, pErr.GetDetail())
}

// TestNonTxnReadWithinUncertaintyIntervalAfterLeaseTransfer tests a case where
// a non-transactional request defers its timestamp allocation to a replica that
// does not hold the lease at the time of receiving the request, but does by the
// time that the request consults the lease. In the test, a value is written on
// the previous leaseholder at a higher timestamp than that assigned to the non-
// transactional request. After the lease transfer, the non-txn request is
// required by uncertainty.ComputeInterval to forward its local uncertainty
// limit to the new lease start time. This prevents the read from ignoring the
// previous write, which avoids a stale read. Instead, the non-txn read hits an
// uncertainty error, performs a server-side retry, and re-evaluates with a
// timestamp above the write.
//
// This test exercises the hazard described in "reason #1" of uncertainty.D7.
func TestNonTxnReadWithinUncertaintyIntervalAfterLeaseTransfer(t *testing.T) {
defer leaktest.AfterTest(t)()
defer log.Scope(t).Close(t)
ctx := context.Background()

// Inject a request filter, which intercepts the server-assigned timestamp
// of a non-transactional request and then blocks that request until after
// the lease has been transferred to the server.
type nonTxnGetKey struct{}
nonTxnOrigTsC := make(chan hlc.Timestamp, 1)
nonTxnBlockerC := make(chan struct{})
requestFilter := func(ctx context.Context, ba roachpb.BatchRequest) *roachpb.Error {
if ctx.Value(nonTxnGetKey{}) != nil {
// Give the test the server-assigned timestamp.
require.NotNil(t, ba.TimestampFromServerClock)
nonTxnOrigTsC <- ba.Timestamp
// Wait for the test to give the go-ahead.
select {
case <-nonTxnBlockerC:
case <-ctx.Done():
}
}
return nil
}
var uncertaintyErrs int32
concurrencyRetryFilter := func(ctx context.Context, _ roachpb.BatchRequest, pErr *roachpb.Error) {
if ctx.Value(nonTxnGetKey{}) != nil {
if _, ok := pErr.GetDetail().(*roachpb.ReadWithinUncertaintyIntervalError); ok {
atomic.AddInt32(&uncertaintyErrs, 1)
}
}
}

const numNodes = 2
var manuals []*hlc.HybridManualClock
for i := 0; i < numNodes; i++ {
manuals = append(manuals, hlc.NewHybridManualClock())
}
serverArgs := make(map[int]base.TestServerArgs)
for i := 0; i < numNodes; i++ {
serverArgs[i] = base.TestServerArgs{
Knobs: base.TestingKnobs{
Server: &server.TestingKnobs{
ClockSource: manuals[i].UnixNano,
},
Store: &kvserver.StoreTestingKnobs{
TestingRequestFilter: requestFilter,
TestingConcurrencyRetryFilter: concurrencyRetryFilter,
},
},
}
}
tc := testcluster.StartTestCluster(t, numNodes, base.TestClusterArgs{
ReplicationMode: base.ReplicationManual,
ServerArgsPerNode: serverArgs,
})
defer tc.Stopper().Stop(ctx)

// Split off a scratch range and upreplicate to node 2.
key := tc.ScratchRange(t)
desc := tc.LookupRangeOrFatal(t, key)
tc.AddVotersOrFatal(t, key, tc.Target(1))

// Pause the servers' clocks going forward.
var maxNanos int64
for _, m := range manuals {
m.Pause()
if cur := m.UnixNano(); cur > maxNanos {
maxNanos = cur
}
}
// After doing so, perfectly synchronize them.
for _, m := range manuals {
m.Increment(maxNanos - m.UnixNano())
}

// Initiate a non-txn read on node 2. The request will be intercepted after
// the request has received a server-assigned timestamp, but before it has
// consulted the lease. We'll transfer the lease to node 2 before the request
// checks, so that it ends up evaluating on node 2.
type resp struct {
*roachpb.BatchResponse
*roachpb.Error
}
nonTxnRespC := make(chan resp, 1)
_ = tc.Stopper().RunAsyncTask(ctx, "non-txn get", func(ctx context.Context) {
ctx = context.WithValue(ctx, nonTxnGetKey{}, "foo")
ba := roachpb.BatchRequest{}
ba.RangeID = desc.RangeID
ba.Add(getArgs(key))
br, pErr := tc.GetFirstStoreFromServer(t, 1).Send(ctx, ba)
nonTxnRespC <- resp{br, pErr}
})

// Wait for the non-txn read to get stuck.
var nonTxnOrigTs hlc.Timestamp
select {
case nonTxnOrigTs = <-nonTxnOrigTsC:
case nonTxnResp := <-nonTxnRespC:
t.Fatalf("unexpected response %+v", nonTxnResp)
}

// Advance the clock on node 1.
manuals[0].Increment(100)

// Perform a non-txn write on node 1. This will grab a timestamp from node 1's
// clock, which leads the clock on node 2 and the timestamp assigned to the
// non-txn read.
//
// NOTE: we perform the clock increment and write _after_ sending the non-txn
// read. Ideally, we would write this test such that we did this before
// beginning the read on node 2, so that the absence of an uncertainty error
// would be a true "stale read". However, doing so causes the test to be flaky
// because background operations can leak the clock signal from node 1 to node
// 2 between the time that we write and the time that the non-txn read request
// is sent. If we had a way to disable all best-effort HLC clock stabilization
// channels and only propagate clock signals when strictly necessary then it's
// possible that we could avoid flakiness. For now, we just re-order the
// operations and assert that we observe an uncertainty error even though its
// absence would not be a true stale read.
ba := roachpb.BatchRequest{}
ba.Add(putArgs(key, []byte("val")))
br, pErr := tc.Servers[0].DistSender().Send(ctx, ba)
require.Nil(t, pErr)
writeTs := br.Timestamp
require.True(t, nonTxnOrigTs.Less(writeTs))

// Then transfer the lease to node 2. The new lease should end up with a start
// time above the timestamp assigned to the non-txn read.
var lease roachpb.Lease
tc.TransferRangeLeaseOrFatal(t, desc, tc.Target(1))
testutils.SucceedsSoon(t, func() error {
repl := tc.GetFirstStoreFromServer(t, 1).LookupReplica(desc.StartKey)
lease, _ = repl.GetLease()
if lease.Replica.NodeID != repl.NodeID() {
return errors.Errorf("expected lease transfer to node 2: %s", lease)
}
return nil
})
require.True(t, nonTxnOrigTs.Less(lease.Start.ToTimestamp()))

// Let the non-txn read proceed. It should complete, but only after hitting a
// ReadWithinUncertaintyInterval, performing a server-side retry, reading
// again at a higher timestamp, and returning the written value.
close(nonTxnBlockerC)
nonTxnResp := <-nonTxnRespC
require.Nil(t, nonTxnResp.Error)
br = nonTxnResp.BatchResponse
require.NotNil(t, br)
require.True(t, nonTxnOrigTs.Less(br.Timestamp))
require.True(t, writeTs.LessEq(br.Timestamp))
require.Len(t, br.Responses, 1)
require.NotNil(t, br.Responses[0].GetGet())
require.NotNil(t, br.Responses[0].GetGet().Value)
require.Equal(t, writeTs, br.Responses[0].GetGet().Value.Timestamp)
require.Equal(t, int32(1), atomic.LoadInt32(&uncertaintyErrs))
}

// TestRangeLookupUseReverse tests whether the results and the results count
// are correct when scanning in reverse order.
func TestRangeLookupUseReverse(t *testing.T) {
Expand Down
13 changes: 5 additions & 8 deletions pkg/kv/kvserver/replica_batch_updates.go
Original file line number Diff line number Diff line change
Expand Up @@ -206,7 +206,7 @@ func maybeBumpReadTimestampToWriteTimestamp(
func tryBumpBatchTimestamp(
ctx context.Context, ba *roachpb.BatchRequest, ts hlc.Timestamp, latchSpans *spanset.SpanSet,
) bool {
if len(latchSpans.GetSpans(spanset.SpanReadOnly, spanset.SpanGlobal)) > 0 {
if latchSpans != nil && len(latchSpans.GetSpans(spanset.SpanReadOnly, spanset.SpanGlobal)) > 0 {
// If the batch acquired any read latches with bounded (MVCC) timestamps
// then we can not trivially bump the batch's timestamp without dropping
// and re-acquiring those latches. Doing so could allow the request to
Expand All @@ -222,27 +222,24 @@ func tryBumpBatchTimestamp(
// table that they should have seen or discovering replicated intents in
// MVCC that they should not have seen (from the perspective of the lock
// table's AddDiscoveredLock method).
//
// NOTE: we could consider adding a retry-loop above the latch
// acquisition to allow this to be retried, but given that we try not to
// mix read-only and read-write requests, doing so doesn't seem worth
// it.
return false
}
if ts.Less(ba.Timestamp) {
log.Fatalf(ctx, "trying to bump to %s <= ba.Timestamp: %s", ts, ba.Timestamp)
}
ba.Timestamp = ts
if ba.Txn == nil {
log.VEventf(ctx, 2, "bumping batch timestamp to %s from %s", ts, ba.Timestamp)
ba.Timestamp = ts
return true
}
if ts.Less(ba.Txn.ReadTimestamp) || ts.Less(ba.Txn.WriteTimestamp) {
log.Fatalf(ctx, "trying to bump to %s inconsistent with ba.Txn.ReadTimestamp: %s, "+
"ba.Txn.WriteTimestamp: %s", ts, ba.Txn.ReadTimestamp, ba.Txn.WriteTimestamp)
}
log.VEventf(ctx, 2, "bumping batch timestamp to: %s from read: %s, write: %s)",
log.VEventf(ctx, 2, "bumping batch timestamp to: %s from read: %s, write: %s",
ts, ba.Txn.ReadTimestamp, ba.Txn.WriteTimestamp)
ba.Txn = ba.Txn.Clone()
ba.Txn.Refresh(ts)
ba.Timestamp = ba.Txn.ReadTimestamp
return true
}
Loading

0 comments on commit 64eb718

Please sign in to comment.