Skip to content

Commit

Permalink
kv/storage: disable observed timestamps for async resolved intents th…
Browse files Browse the repository at this point in the history
…at are pushed

Fixes #36431.
Fixes #49360.

This commit fixes the potential for a stale read as detailed in #36431 using
the approach detailed in #36431 (comment).
This bug requires a combination of skewed clocks, multi-key transactions split
across ranges whose leaseholders are stored on different nodes, a transaction
read refresh, and the use of observed timestamps to avoid an uncertainty
restart. With the combination of these four factors, it was possible to
construct an ordering of events that violated real-time ordering and allowed a
transaction to observe a stale read. Upon the discovery of the bug, we
[introduced](cockroachdb/jepsen#19) the `multi-register`
test to the Jepsen test suite, and have since observed the test fail when
combined with the `strobe-skews` nemesis due to this bug in #49360 (and a few
issues linked to that one). This commit stabilizes that test.

\### Explanation

The combination of all of the factors listed above can lead to the stale read
because it breaks one of the invariants that the observed timestamp
infrastructure[^1] relied upon for correctness. Specifically, observed
timestamps relied on the guarantee that a leaseholder's clock must always be
equal to or greater than the timestamp of all writes that it has served.
However, this guarantee did not always hold. It does hold for non-transactional
writes. It also holds for transactions that perform all of their intent writes
at the same timestamp and then commit at this timestamp. However, it does not
hold for transactions which move their commit timestamp forward over their
lifetime before committing, writing intents at different timestamps along the
way and "pulling them up" to the commit timestamp after committing.

In violating the invariant, this third case reveals an ambiguity in what it
means for a leaseholder to "serve a write at a timestamp". The meaning of this
phrase is straightforward for non-transactional writes. However, for an intent
write whose original timestamp is provisional and whose eventual commit
timestamp is stored indirectly in its transaction record at its time of commit,
the meaning is less clear. This reconciliation to move the intent write's
timestamp up to its transaction's commit timestamp is asynchronous from the
transaction commit (and after it has been externally acknowledged). So even if a
leaseholder has only served writes with provisional timestamps up to timestamp
100 (placing a lower bound on its clock of 100), it can be in possession of
intents that, when resolved, will carry a timestamp of 200. To uphold the
real-time ordering property, this value must be observed by any transaction that
begins after the value's transaction committed and was acknowledged. So for
observed timestamps to be correct as currently written, we would need a
guarantee that this value's leaseholder would never return an observed timestamp
< 200 at any point after the transaction commits. But with the transaction
commit possibly occurring on another node and with communication to resolve the
intent occurring asynchronously, this seems like an impossible guarantee to
make.

This would appear to undermine observed timestamps to the point where they
cannot be used. However, we can claw back some utility by recognizing that only
a small fraction[^2] of transactions commit at a different timestamps than the
one they used while writing intents. We can also recognize that if we were to
compare observed timestamps against the timestamp that a committed value was
originally written (its provisional value if it was once an intent) instead of
the timestamp that it had been moved to on commit, then the invariant would
hold.

This commit does not take full advantage of this second observation, because we
do not retain "written timestamps" in committed values (though we do in
intents). Instead, we do something less optimal but cheaper and more convenient.
Any intent whose timestamp is changed during asynchronous intent resolution is
marked as "synthetic". Doing so is a compressed way of saying that the value
could have originally been written as an intent at any time (even min_timestamp)
and so observed timestamps cannot be used to limit uncertainty by pulling a
read's uncertainty interval below the value's timestamp.

This application of the synthetic bit prevents observed timestamps from being
used to avoid uncertainty restarts with the set of committed values whose
timestamps do not reflect their original write time and therefore do not make a
claim about the clock of their leaseholder at the time that they were committed.
It does not change the interaction between observed timestamps and any other
committed value.

\### Correctness testing

I was not able to stress `jepsen/multi-register/strobe-skews` hard enough to
cause it to fail, even on master. We've only seen the test fail a handful of
times over the past few years, so this isn't much of a surprise. Still, this
prevents us from saying anything concrete about an reduced failure rate.

However, the commit does add a new test called `TestTxnReadWithinUncertaintyIntervalAfterIntentResolution`
which controls manual clocks directly and was able to deterministically
reproduce the stale read before this fix in a few different ways. After this
fix, the test passes.

\### Performance analysis

This correctness fix will lead to an increased rate of transaction retries under
some workloads.

TODO(nvanbenschoten):
- observe TPC-C performance
-- top-line performance
-- uncertainty retry rate
-- commit-wait rate (should be zero)
- compare YCSB performance

----

Release note (bug fix): fixed a rare race condition that could allow for a
transaction to serve a stale read and violate real-time ordering under moderate
clock skew.

[^1]: see [pkg/kv/kvserver/observedts/doc.go](https://github.com/cockroachdb/cockroach/blob/master/pkg/kv/kvserver/observedts/doc.go)
for an explanation of the role of observed timestamps in the transaction model.
This commit updates that documentation to include this fix.

[^2]: see analysis in #36431 (comment).
  • Loading branch information
nvanbenschoten committed Oct 28, 2021
1 parent a1cf8e9 commit 6f4942f
Show file tree
Hide file tree
Showing 23 changed files with 653 additions and 141 deletions.
10 changes: 10 additions & 0 deletions pkg/ccl/changefeedccl/kvevent/event.go
Original file line number Diff line number Diff line change
Expand Up @@ -187,6 +187,11 @@ func (b *Event) DetachAlloc() Alloc {
func MakeResolvedEvent(
span roachpb.Span, ts hlc.Timestamp, boundaryType jobspb.ResolvedSpan_BoundaryType,
) Event {
// Strip synthetic bit from CDC events. It can confuse consumers.
// TODO DURING PR: this is needed to fix a few tests, but hints at a larger
// question of whether we like that the synthetic timestamp flag can escape
// outside the system.
ts.Synthetic = false
return Event{
resolved: &jobspb.ResolvedSpan{
Span: span,
Expand All @@ -201,6 +206,11 @@ func MakeResolvedEvent(
func MakeKVEvent(
kv roachpb.KeyValue, prevVal roachpb.Value, backfillTimestamp hlc.Timestamp,
) Event {
// Strip synthetic bit from CDC events. It can confuse consumers.
// TODO DURING PR: this is needed to fix a few tests, but hints at a larger
// question of whether we like that the synthetic timestamp flag can escape
// outside the system.
kv.Value.Timestamp.Synthetic = false
return Event{
kv: kv,
prevVal: prevVal,
Expand Down
2 changes: 1 addition & 1 deletion pkg/cli/debug.go
Original file line number Diff line number Diff line change
Expand Up @@ -1299,7 +1299,7 @@ func removeDeadReplicas(
Txn: intent.Txn,
Status: roachpb.ABORTED,
}
if _, err := storage.MVCCResolveWriteIntent(ctx, batch, &ms, update); err != nil {
if _, err := storage.MVCCResolveWriteIntent(ctx, batch, &ms, update, false /* asyncResolution */); err != nil {
return nil, err
}
// With the intent resolved, we can try again.
Expand Down
2 changes: 1 addition & 1 deletion pkg/kv/kvserver/batch_spanset_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -596,7 +596,7 @@ func TestSpanSetMVCCResolveWriteIntentRange(t *testing.T) {
Status: roachpb.PENDING,
}
if _, _, err := storage.MVCCResolveWriteIntentRange(
ctx, batch, nil /* ms */, intent, 0, eng.IsSeparatedIntentsEnabledForTesting(ctx),
ctx, batch, nil /* ms */, intent, 0, false, eng.IsSeparatedIntentsEnabledForTesting(ctx),
); err != nil {
t.Fatal(err)
}
Expand Down
8 changes: 5 additions & 3 deletions pkg/kv/kvserver/batcheval/cmd_end_transaction.go
Original file line number Diff line number Diff line change
Expand Up @@ -451,7 +451,9 @@ func resolveLocalLocks(
desc = &mergeTrigger.LeftDesc
}

var resolveAllowance int64 = lockResolutionBatchSize
// Any intent resolved here is resolved synchronously with the txn commit.
const asyncResolution = false
resolveAllowance := int64(lockResolutionBatchSize)
if args.InternalCommitTrigger != nil {
// If this is a system transaction (such as a split or merge), don't enforce the resolve allowance.
// These transactions rely on having their locks resolved synchronously.
Expand Down Expand Up @@ -486,7 +488,7 @@ func resolveLocalLocks(
//
// Note that the underlying pebbleIterator will still be reused
// since readWriter is a pebbleBatch in the typical case.
ok, err := storage.MVCCResolveWriteIntent(ctx, readWriter, ms, update)
ok, err := storage.MVCCResolveWriteIntent(ctx, readWriter, ms, update, asyncResolution)
if err != nil {
return err
}
Expand All @@ -504,7 +506,7 @@ func resolveLocalLocks(
if inSpan != nil {
update.Span = *inSpan
num, resumeSpan, err := storage.MVCCResolveWriteIntentRange(
ctx, readWriter, ms, update, resolveAllowance, onlySeparatedIntents)
ctx, readWriter, ms, update, resolveAllowance, asyncResolution, onlySeparatedIntents)
if err != nil {
return err
}
Expand Down
2 changes: 1 addition & 1 deletion pkg/kv/kvserver/batcheval/cmd_refresh_range_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -86,7 +86,7 @@ func TestRefreshRangeTimeBoundIterator(t *testing.T) {
// would not have any timestamp bounds and would be selected for every read.
intent := roachpb.MakeLockUpdate(txn, roachpb.Span{Key: k})
intent.Status = roachpb.COMMITTED
if _, err := storage.MVCCResolveWriteIntent(ctx, db, nil, intent); err != nil {
if _, err := storage.MVCCResolveWriteIntent(ctx, db, nil, intent, false); err != nil {
t.Fatal(err)
}
if err := storage.MVCCPut(ctx, db, nil, roachpb.Key("unused2"), ts1, v, nil); err != nil {
Expand Down
5 changes: 4 additions & 1 deletion pkg/kv/kvserver/batcheval/cmd_resolve_intent.go
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,10 @@ func ResolveIntent(
}

update := args.AsLockUpdate()
ok, err := storage.MVCCResolveWriteIntent(ctx, readWriter, ms, update)
// This intent resolution operation is asynchronous with its transaction and
// may be occurring after the transaction has already committed.
const asyncResolution = true
ok, err := storage.MVCCResolveWriteIntent(ctx, readWriter, ms, update, asyncResolution)
if err != nil {
return result.Result{}, err
}
Expand Down
5 changes: 4 additions & 1 deletion pkg/kv/kvserver/batcheval/cmd_resolve_intent_range.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,11 +45,14 @@ func ResolveIntentRange(

update := args.AsLockUpdate()

// This intent resolution operation is asynchronous with its transaction and
// may be occurring after the transaction has already committed.
const asyncResolution = true
onlySeparatedIntents :=
cArgs.EvalCtx.ClusterSettings().Version.ActiveVersionOrEmpty(ctx).IsActive(
clusterversion.PostSeparatedIntentsMigration)
numKeys, resumeSpan, err := storage.MVCCResolveWriteIntentRange(
ctx, readWriter, ms, update, h.MaxSpanRequestKeys, onlySeparatedIntents)
ctx, readWriter, ms, update, h.MaxSpanRequestKeys, asyncResolution, onlySeparatedIntents)
if err != nil {
return result.Result{}, err
}
Expand Down
2 changes: 1 addition & 1 deletion pkg/kv/kvserver/below_raft_protos_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,7 @@ var belowRaftGoldenProtos = map[reflect.Type]fixture{
return m
},
emptySum: 7551962144604783939,
populatedSum: 6784975417727259950,
populatedSum: 4745626903798922387,
},
reflect.TypeOf(&enginepb.RangeAppliedState{}): {
populatedConstructor: func(r *rand.Rand) protoutil.Message {
Expand Down
238 changes: 238 additions & 0 deletions pkg/kv/kvserver/client_replica_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -577,6 +577,244 @@ func TestTxnReadWithinUncertaintyInterval(t *testing.T) {
})
}

// TestTxnReadWithinUncertaintyIntervalAfterIntentResolution tests cases where a
// reader transaction observes a committed value that was committed before the
// reader began, but that was resolved after the reader began. The test ensures
// that even if the reader has collected an observed timestamp from the node
// that holds the intent, and even if this observed timestamp is less than the
// timestamp that the intent is eventually committed at, the reader still
// considers the value to be in its uncertainty interval. Not doing so could
// allow for stale read, which would be a violation of linearizability.
//
// This is a regression test for #36431. Before this issue was addressed,
// it was possible for the following series of events to lead to a stale
// read:
// - txn W is coordinated by node B. It lays down an intent on node A (key k) at
// ts 95.
// - txn W gets pushed to ts 105 (taken from B's clock). It refreshes
// successfully and commits at 105. Node A's clock is at, say, 100; this is
// within clock offset bounds.
// - after all this, txn R starts on node A. It gets assigned ts 100. The txn
// has no uncertainty for node A.
// - txn W's async intent resolution comes around and resolves the intent on
// node A, moving the value fwd from ts 95 to 105.
// - txn R reads key k and doesn't see anything. There's a value at 105, but the
// txn have no uncertainty due to an observed timestamp. This is a stale read.
//
// The test's rangedResolution parameter dictates whether the intent is
// asynchronously resolved using point or ranged intent resolution.
//
// The test's movedWhilePending parameter dictates whether the intent is moved
// to a higher timestamp first by a PENDING intent resolution and then COMMITTED
// at that same timestamp, or whether it is moved to a higher timestamp at the
// same time as it is COMMITTED.
//
func TestTxnReadWithinUncertaintyIntervalAfterIntentResolution(t *testing.T) {
defer leaktest.AfterTest(t)()
defer log.Scope(t).Close(t)

testutils.RunTrueAndFalse(t, "rangedResolution", func(t *testing.T, rangedResolution bool) {
testutils.RunTrueAndFalse(t, "movedWhilePending", func(t *testing.T, movedWhilePending bool) {
testTxnReadWithinUncertaintyIntervalAfterIntentResolution(t, rangedResolution, movedWhilePending)
})
})
}

func testTxnReadWithinUncertaintyIntervalAfterIntentResolution(
t *testing.T, rangedResolution, movedWhilePending bool,
) {
const numNodes = 2
var manuals []*hlc.HybridManualClock
var clocks []*hlc.Clock
for i := 0; i < numNodes; i++ {
manuals = append(manuals, hlc.NewHybridManualClock())
}
serverArgs := make(map[int]base.TestServerArgs)
for i := 0; i < numNodes; i++ {
serverArgs[i] = base.TestServerArgs{
Knobs: base.TestingKnobs{
Server: &server.TestingKnobs{
ClockSource: manuals[i].UnixNano,
},
Store: &kvserver.StoreTestingKnobs{
IntentResolverKnobs: kvserverbase.IntentResolverTestingKnobs{
// Disable async intent resolution, so that the test can carefully
// control when intent resolution occurs.
DisableAsyncIntentResolution: true,
},
},
},
}
}
ctx := context.Background()
tc := testcluster.StartTestCluster(t, numNodes, base.TestClusterArgs{
ReplicationMode: base.ReplicationManual,
ServerArgsPerNode: serverArgs,
})
defer tc.Stopper().Stop(ctx)

// Split off two scratch ranges.
keyA, keyB := roachpb.Key("a"), roachpb.Key("b")
tc.SplitRangeOrFatal(t, keyA)
_, keyBDesc := tc.SplitRangeOrFatal(t, keyB)
// Place key A's sole replica on node 1 and key B's sole replica on node 2.
tc.AddVotersOrFatal(t, keyB, tc.Target(1))
tc.TransferRangeLeaseOrFatal(t, keyBDesc, tc.Target(1))
tc.RemoveVotersOrFatal(t, keyB, tc.Target(0))

// Pause the servers' clocks going forward.
var maxNanos int64
for i, m := range manuals {
m.Pause()
if cur := m.UnixNano(); cur > maxNanos {
maxNanos = cur
}
clocks = append(clocks, tc.Servers[i].Clock())
}
// After doing so, perfectly synchronize them.
for _, m := range manuals {
m.Increment(maxNanos - m.UnixNano())
}

// Create a new writer transaction.
maxOffset := clocks[0].MaxOffset().Nanoseconds()
require.NotZero(t, maxOffset)
writerTxn := roachpb.MakeTransaction("test_writer", keyA, 1, clocks[0].Now(), maxOffset)

// Write to key A and key B in the writer transaction.
for _, key := range []roachpb.Key{keyA, keyB} {
put := putArgs(key, []byte("val"))
resp, pErr := kv.SendWrappedWith(ctx, tc.Servers[0].DistSender(), roachpb.Header{Txn: &writerTxn}, put)
require.Nil(t, pErr)
writerTxn.Update(resp.Header().Txn)
}

// Move the clock on just the first server and bump the transaction commit
// timestamp to this value. The clock on the second server will trail behind.
manuals[0].Increment(100)
require.True(t, writerTxn.WriteTimestamp.Forward(clocks[0].Now()))

// Refresh the writer transaction's timestamp.
writerTxn.ReadTimestamp.Forward(writerTxn.WriteTimestamp)

// Commit the writer transaction. Key A will be synchronously resolved because
// it is on the same range as the transaction record. However, key B will be
// handed to the IntentResolver for asynchronous resolution. Because we
// disabled async resolution, it will not be resolved yet.
et, etH := endTxnArgs(&writerTxn, true /* commit */)
et.LockSpans = []roachpb.Span{
{Key: keyA}, {Key: keyB},
}
if rangedResolution {
for i := range et.LockSpans {
et.LockSpans[i].EndKey = et.LockSpans[i].Key.Next()
}
}
etResp, pErr := kv.SendWrappedWith(ctx, tc.Servers[0].DistSender(), etH, et)
require.Nil(t, pErr)
writerTxn.Update(etResp.Header().Txn)

// Create a new reader transaction. The reader uses the second server as a
// gateway, so its initial read timestamp actually trails the commit timestamp
// of the writer transaction due to clock skew between the two servers. This
// is the classic case where the reader's uncertainty interval is needed to
// avoid stale reads. Remember that the reader transaction began after the
// writer transaction committed and received an ack, so it must observe the
// writer's writes if it is to respect real-time ordering.
readerTxn := roachpb.MakeTransaction("test_reader", keyA, 1, clocks[1].Now(), maxOffset)
require.True(t, readerTxn.ReadTimestamp.Less(writerTxn.WriteTimestamp))
require.False(t, readerTxn.GlobalUncertaintyLimit.Less(writerTxn.WriteTimestamp))

// Collect an observed timestamp from each of the nodes. We read the key
// following each of written keys to avoid conflicting with read values.
//
// NOTE: this wasn't even a necessary step to hit #36431, because new
// transactions are always an observed timestamp from their own gateway node.
for i, key := range []roachpb.Key{keyA, keyB} {
get := getArgs(key.Next())
resp, pErr := kv.SendWrappedWith(ctx, tc.Servers[0].DistSender(), roachpb.Header{Txn: &readerTxn}, get)
require.Nil(t, pErr)
require.Nil(t, resp.(*roachpb.GetResponse).Value)
readerTxn.Update(resp.Header().Txn)
require.Len(t, readerTxn.ObservedTimestamps, i+1)
}

// Resolve the intent on key B.
{
resolveIntentArgs := func(status roachpb.TransactionStatus) roachpb.Request {
if rangedResolution {
return &roachpb.ResolveIntentRangeRequest{
RequestHeader: roachpb.RequestHeader{Key: keyB, EndKey: keyB.Next()},
IntentTxn: writerTxn.TxnMeta,
Status: status,
}
} else {
return &roachpb.ResolveIntentRequest{
RequestHeader: roachpb.RequestHeader{Key: keyB},
IntentTxn: writerTxn.TxnMeta,
Status: status,
}
}
}

if movedWhilePending {
// First change the intent's timestamp without committing it. This
// exercises the case where the intent's timestamp is moved forward by a
// PENDING intent resolution request and kept the same when the intent is
// eventually COMMITTED. This PENDING intent resolution may still be
// evaluated after the transaction commit has been acknowledged in
// real-time, so it still needs to lead to the committed value being
// marked as synthetic. This works by recording the written timestamp into
// the intent when it is originally moved and then consulting this written
// value when later committing the intent.
//
// For instance, consider the following timeline:
//
// 1. txn W writes intent on key A @ time 10
// 2. txn W writes intent on key B @ time 10
// 3. high priority reader @ 15 reads key B
// 4. high priority reader pushes txn W to time 15
// 5. txn W commits @ 15 and resolves key A synchronously
// 6. txn R begins and collects observed timestamp from key B's node @
// time 11
// 7. high priority reader moves intent on key B to time 15
// 8. async intent resolution commits intent on key B, still @ time 15
// 9. txn R reads key B with read ts 11, observed ts 11, and uncertainty
// interval [11, 21]. If step 7 did not mark the intent's timestamp as
// synthetic when changing its timestamp, step 8 also would not do so
// because it isn't changing the intent's timestamp, so txn R could
// use its observed timestamp to avoid an uncertainty error, leading
// to a stale read.
//
resolve := resolveIntentArgs(roachpb.PENDING)
_, pErr = kv.SendWrapped(ctx, tc.Servers[0].DistSender(), resolve)
require.Nil(t, pErr)
}

// Resolve the committed value on key B to COMMITTED.
resolve := resolveIntentArgs(roachpb.COMMITTED)
_, pErr = kv.SendWrapped(ctx, tc.Servers[0].DistSender(), resolve)
require.Nil(t, pErr)
}

// Read key A and B in the reader transaction. Both should produce
// ReadWithinUncertaintyIntervalErrors.
for _, key := range []roachpb.Key{keyA, keyB} {
get := getArgs(key)
_, pErr := kv.SendWrappedWith(ctx, tc.Servers[0].DistSender(), roachpb.Header{Txn: &readerTxn}, get)
require.NotNil(t, pErr)
var rwuiErr *roachpb.ReadWithinUncertaintyIntervalError
require.True(t, errors.As(pErr.GetDetail(), &rwuiErr))
require.Equal(t, readerTxn.ReadTimestamp, rwuiErr.ReadTimestamp)
require.Equal(t, readerTxn.GlobalUncertaintyLimit, rwuiErr.GlobalUncertaintyLimit)
require.Equal(t, readerTxn.ObservedTimestamps, rwuiErr.ObservedTimestamps)
// Only the value on key B should be marked as synthetic, because only the
// intent on key B was resolved asynchronously from its transaction commit.
expSyn := key.Equal(keyB)
require.Equal(t, writerTxn.WriteTimestamp.WithSynthetic(expSyn), rwuiErr.ExistingTimestamp)
}
}

// TestRangeLookupUseReverse tests whether the results and the results count
// are correct when scanning in reverse order.
func TestRangeLookupUseReverse(t *testing.T) {
Expand Down
9 changes: 9 additions & 0 deletions pkg/kv/kvserver/client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -102,6 +102,15 @@ func heartbeatArgs(
}, roachpb.Header{Txn: txn}
}

func endTxnArgs(txn *roachpb.Transaction, commit bool) (*roachpb.EndTxnRequest, roachpb.Header) {
return &roachpb.EndTxnRequest{
RequestHeader: roachpb.RequestHeader{
Key: txn.Key, // not allowed when going through TxnCoordSender, but we're not
},
Commit: commit,
}, roachpb.Header{Txn: txn}
}

func pushTxnArgs(
pusher, pushee *roachpb.Transaction, pushType roachpb.PushTxnType,
) *roachpb.PushTxnRequest {
Expand Down
Loading

0 comments on commit 6f4942f

Please sign in to comment.