diff --git a/pkg/ccl/kvccl/kvfollowerreadsccl/followerreads_test.go b/pkg/ccl/kvccl/kvfollowerreadsccl/followerreads_test.go index a5b90d7911b6..e04fee709b2b 100644 --- a/pkg/ccl/kvccl/kvfollowerreadsccl/followerreads_test.go +++ b/pkg/ccl/kvccl/kvfollowerreadsccl/followerreads_test.go @@ -113,7 +113,7 @@ func TestCanSendToFollower(t *testing.T) { } withServerSideBatchTimestamp := func(ba roachpb.BatchRequest, ts hlc.Timestamp) roachpb.BatchRequest { ba = withBatchTimestamp(ba, ts) - ba.TimestampFromServerClock = true + ba.TimestampFromServerClock = (*hlc.ClockTimestamp)(&ts) return ba } diff --git a/pkg/kv/kvserver/client_raft_test.go b/pkg/kv/kvserver/client_raft_test.go index 1956bc869711..e951bfd32685 100644 --- a/pkg/kv/kvserver/client_raft_test.go +++ b/pkg/kv/kvserver/client_raft_test.go @@ -2252,7 +2252,7 @@ func TestQuotaPool(t *testing.T) { value := bytes.Repeat([]byte("v"), (3*quota)/4) var ba roachpb.BatchRequest ba.Add(putArgs(keyToWrite, value)) - if err := ba.SetActiveTimestamp(tc.Servers[0].Clock().Now); err != nil { + if err := ba.SetActiveTimestamp(tc.Servers[0].Clock()); err != nil { t.Fatal(err) } if _, pErr := leaderRepl.Send(ctx, ba); pErr != nil { @@ -2273,7 +2273,7 @@ func TestQuotaPool(t *testing.T) { go func() { var ba roachpb.BatchRequest ba.Add(putArgs(keyToWrite, value)) - if err := ba.SetActiveTimestamp(tc.Servers[0].Clock().Now); err != nil { + if err := ba.SetActiveTimestamp(tc.Servers[0].Clock()); err != nil { ch <- roachpb.NewError(err) return } @@ -2363,7 +2363,7 @@ func TestWedgedReplicaDetection(t *testing.T) { value := []byte("value") var ba roachpb.BatchRequest ba.Add(putArgs(key, value)) - if err := ba.SetActiveTimestamp(tc.Servers[0].Clock().Now); err != nil { + if err := ba.SetActiveTimestamp(tc.Servers[0].Clock()); err != nil { t.Fatal(err) } if _, pErr := leaderRepl.Send(ctx, ba); pErr != nil { diff --git a/pkg/kv/kvserver/closed_timestamp_test.go b/pkg/kv/kvserver/closed_timestamp_test.go index 42f760dfb975..55e703585350 100644 --- a/pkg/kv/kvserver/closed_timestamp_test.go +++ b/pkg/kv/kvserver/closed_timestamp_test.go @@ -107,7 +107,7 @@ func TestClosedTimestampCanServe(t *testing.T) { baWrite.Txn = &txn baWrite.Add(r) baWrite.RangeID = repls[0].RangeID - if err := baWrite.SetActiveTimestamp(tc.Server(0).Clock().Now); err != nil { + if err := baWrite.SetActiveTimestamp(tc.Server(0).Clock()); err != nil { t.Fatal(err) } @@ -589,7 +589,7 @@ func TestClosedTimestampCantServeForNonTransactionalBatch(t *testing.T) { // Otherwise, all three should succeed. baRead.Txn = nil if tsFromServer { - baRead.TimestampFromServerClock = true + baRead.TimestampFromServerClock = (*hlc.ClockTimestamp)(&ts) verifyNotLeaseHolderErrors(t, baRead, repls, 2) } else { testutils.SucceedsSoon(t, func() error { diff --git a/pkg/kv/kvserver/replica_follower_read.go b/pkg/kv/kvserver/replica_follower_read.go index 21615ade3096..a189a6798280 100644 --- a/pkg/kv/kvserver/replica_follower_read.go +++ b/pkg/kv/kvserver/replica_follower_read.go @@ -49,7 +49,7 @@ func BatchCanBeEvaluatedOnFollower(ba roachpb.BatchRequest) bool { // propose writes to Raft. // 4. the batch needs to be non-locking, because unreplicated locks are only // held on the leaseholder. - tsFromServerClock := ba.Txn == nil && (ba.Timestamp.IsEmpty() || ba.TimestampFromServerClock) + tsFromServerClock := ba.Txn == nil && (ba.Timestamp.IsEmpty() || ba.TimestampFromServerClock != nil) if tsFromServerClock { return false } diff --git a/pkg/kv/kvserver/replica_test.go b/pkg/kv/kvserver/replica_test.go index 36cb73be835e..938a58d7a340 100644 --- a/pkg/kv/kvserver/replica_test.go +++ b/pkg/kv/kvserver/replica_test.go @@ -226,7 +226,7 @@ func (tc *testContext) Sender() kv.Sender { ba.RangeID = 1 } if ba.Timestamp.IsEmpty() { - if err := ba.SetActiveTimestamp(tc.Clock().Now); err != nil { + if err := ba.SetActiveTimestamp(tc.Clock()); err != nil { tc.Fatal(err) } } @@ -4511,7 +4511,7 @@ func TestEndTxnRollbackAbortedTransaction(t *testing.T) { var ba roachpb.BatchRequest gArgs := getArgs(key) ba.Add(&gArgs) - if err := ba.SetActiveTimestamp(tc.Clock().Now); err != nil { + if err := ba.SetActiveTimestamp(tc.Clock()); err != nil { t.Fatal(err) } _, pErr := tc.Sender().Send(ctx, ba) @@ -4677,7 +4677,7 @@ func TestBatchRetryCantCommitIntents(t *testing.T) { ba.Header = roachpb.Header{Txn: txn} ba.Add(&put) assignSeqNumsForReqs(txn, &put) - if err := ba.SetActiveTimestamp(tc.Clock().Now); err != nil { + if err := ba.SetActiveTimestamp(tc.Clock()); err != nil { t.Fatal(err) } br, pErr := tc.Sender().Send(ctx, ba) @@ -4845,7 +4845,7 @@ func setupResolutionTest( var ba roachpb.BatchRequest ba.Header = h ba.RangeID = newRepl.RangeID - if err := ba.SetActiveTimestamp(newRepl.store.Clock().Now); err != nil { + if err := ba.SetActiveTimestamp(newRepl.store.Clock()); err != nil { t.Fatal(err) } pArgs := putArgs(splitKey.AsRawKey(), []byte("value")) @@ -4898,7 +4898,7 @@ func TestEndTxnResolveOnlyLocalIntents(t *testing.T) { ba.Header.RangeID = newRepl.RangeID gArgs := getArgs(splitKey) ba.Add(&gArgs) - if err := ba.SetActiveTimestamp(tc.Clock().Now); err != nil { + if err := ba.SetActiveTimestamp(tc.Clock()); err != nil { t.Fatal(err) } _, pErr := newRepl.Send(ctx, ba) @@ -7501,7 +7501,7 @@ func TestReplicaCancelRaft(t *testing.T) { ba.Add(&roachpb.GetRequest{ RequestHeader: roachpb.RequestHeader{Key: key}, }) - if err := ba.SetActiveTimestamp(tc.Clock().Now); err != nil { + if err := ba.SetActiveTimestamp(tc.Clock()); err != nil { t.Fatal(err) } _, pErr := tc.repl.executeBatchWithConcurrencyRetries(ctx, &ba, (*Replica).executeWriteBatch) @@ -9128,7 +9128,7 @@ func TestNoopRequestsNotProposed(t *testing.T) { ba.Header.RangeID = repl.RangeID ba.Add(req) ba.Txn = txn - if err := ba.SetActiveTimestamp(repl.Clock().Now); err != nil { + if err := ba.SetActiveTimestamp(repl.Clock()); err != nil { t.Fatal(err) } _, pErr := repl.Send(ctx, ba) @@ -9421,7 +9421,7 @@ func TestErrorInRaftApplicationClearsIntents(t *testing.T) { ba.Header.Txn = txn ba.Add(&etArgs) assignSeqNumsForReqs(txn, &etArgs) - require.NoError(t, ba.SetActiveTimestamp(func() hlc.Timestamp { return hlc.Timestamp{} })) + require.NoError(t, ba.SetActiveTimestamp(s.Clock())) // Get a reference to the txn's replica. stores := s.GetStores().(*Stores) store, err := stores.GetStore(s.GetFirstStoreID()) diff --git a/pkg/kv/kvserver/store_send.go b/pkg/kv/kvserver/store_send.go index fdbc40f2a876..d908beda3461 100644 --- a/pkg/kv/kvserver/store_send.go +++ b/pkg/kv/kvserver/store_send.go @@ -69,7 +69,7 @@ func (s *Store) Send( ba = newBa } - if err := ba.SetActiveTimestamp(s.Clock().Now); err != nil { + if err := ba.SetActiveTimestamp(s.Clock()); err != nil { return nil, roachpb.NewError(err) } diff --git a/pkg/roachpb/api.proto b/pkg/roachpb/api.proto index 4e5a6ffe7dec..e258d9a0076b 100644 --- a/pkg/roachpb/api.proto +++ b/pkg/roachpb/api.proto @@ -2135,14 +2135,24 @@ message Header { // (txn.WriteTimestamp). util.hlc.Timestamp timestamp = 1 [(gogoproto.nullable) = false]; // timestamp_from_server_clock is set when a non-transactional BatchRequest - // has its timestamp initialized to the wall time of the server node. This - // flag is needed to ensure that such requests are never served as follower + // has its timestamp initialized to the wall time of the server node. When + // non-nil, this denotes the clock timestamp that the timestamp field was + // initially assigned upon arriving at the server node. + // + // It is needed to ensure that such requests are never served as follower // reads. Initializing the timestamp of a request on a node that holds a // follower instead of the leaseholder for a range and then using this // timestamp to deem a follower read safe could allow for consistency // violations where a non-transactional read following a write could fail to // observe the write. - bool timestamp_from_server_clock = 20; + // + // It is also needed to record the time at which the request was received by + // the server node. The operation timestamp cannot serve this role because it + // can change due to server-side uncertainty retries. By remembering a stable + // reference to the initial timestamp, we ensure that a non-transactional + // request's uncertainty interval remains fixed across retries. + util.hlc.Timestamp timestamp_from_server_clock = 27 [ + (gogoproto.casttype) = "github.com/cockroachdb/cockroach/pkg/util/hlc.ClockTimestamp"]; // replica specifies the destination of the request. ReplicaDescriptor replica = 2 [(gogoproto.nullable) = false]; // range_id specifies the ID of the Raft consensus group which the key @@ -2376,7 +2386,7 @@ message Header { util.tracing.tracingpb.TraceInfo trace_info = 25 [(gogoproto.nullable) = false]; - reserved 7, 10, 12, 14; + reserved 7, 10, 12, 14, 20; } // BoundedStalenessHeader contains configuration values pertaining to bounded diff --git a/pkg/roachpb/batch.go b/pkg/roachpb/batch.go index b18ceeec5c1e..ae2dfdc3a4af 100644 --- a/pkg/roachpb/batch.go +++ b/pkg/roachpb/batch.go @@ -54,8 +54,8 @@ func (h Header) RequiredFrontier() hlc.Timestamp { // carried out. For transactional requests, ba.Timestamp must be zero initially // and it will be set to txn.ReadTimestamp (note though this mostly impacts // reads; writes use txn.WriteTimestamp). For non-transactional requests, if no -// timestamp is specified, nowFn is used to create and set one. -func (ba *BatchRequest) SetActiveTimestamp(nowFn func() hlc.Timestamp) error { +// timestamp is specified, clock is used to create and set one. +func (ba *BatchRequest) SetActiveTimestamp(clock *hlc.Clock) error { if txn := ba.Txn; txn != nil { if !ba.Timestamp.IsEmpty() { return errors.New("transactional request must not set batch timestamp") @@ -71,10 +71,11 @@ func (ba *BatchRequest) SetActiveTimestamp(nowFn func() hlc.Timestamp) error { // txn.WriteTimestamp, regardless of the batch timestamp. ba.Timestamp = txn.ReadTimestamp } else { - // When not transactional, allow empty timestamp and use nowFn instead + // When not transactional, allow empty timestamp and use clock instead. if ba.Timestamp.IsEmpty() { - ba.Timestamp = nowFn() - ba.TimestampFromServerClock = true + now := clock.NowAsClockTimestamp() + ba.Timestamp = now.ToTimestamp() // copies value, not aliasing reference + ba.TimestampFromServerClock = &now } } return nil