Skip to content

Commit

Permalink
kv: replace timestamp_from_server_clock bool with nullable ClockTimes…
Browse files Browse the repository at this point in the history
…tamp

This commit replaces the boolean `timestamp_from_server_clock` flag on
the `BatchRequest` header with a nullable `ClockTimestamp` field. Doing
so allows for the use of the field to be expanded in a following commit.

The field will soon be needed to record the time at which the request
was received by the server node. The operation timestamp cannot serve
this role because it can change due to server-side uncertainty retries.
By remembering a stable reference to the initial timestamp, we ensure
that a non-transactional request's uncertainty interval remains fixed
across retries.

There are no backwards compatibility concerns with this change because
the previous field was only consulted on the same node that set it. It
was not consulted across RPC boundaries.
  • Loading branch information
nvanbenschoten committed Feb 8, 2022
1 parent 03c50b0 commit 4d9a372
Show file tree
Hide file tree
Showing 8 changed files with 36 additions and 25 deletions.
2 changes: 1 addition & 1 deletion pkg/ccl/kvccl/kvfollowerreadsccl/followerreads_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -113,7 +113,7 @@ func TestCanSendToFollower(t *testing.T) {
}
withServerSideBatchTimestamp := func(ba roachpb.BatchRequest, ts hlc.Timestamp) roachpb.BatchRequest {
ba = withBatchTimestamp(ba, ts)
ba.TimestampFromServerClock = true
ba.TimestampFromServerClock = (*hlc.ClockTimestamp)(&ts)
return ba
}

Expand Down
6 changes: 3 additions & 3 deletions pkg/kv/kvserver/client_raft_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2252,7 +2252,7 @@ func TestQuotaPool(t *testing.T) {
value := bytes.Repeat([]byte("v"), (3*quota)/4)
var ba roachpb.BatchRequest
ba.Add(putArgs(keyToWrite, value))
if err := ba.SetActiveTimestamp(tc.Servers[0].Clock().Now); err != nil {
if err := ba.SetActiveTimestamp(tc.Servers[0].Clock()); err != nil {
t.Fatal(err)
}
if _, pErr := leaderRepl.Send(ctx, ba); pErr != nil {
Expand All @@ -2273,7 +2273,7 @@ func TestQuotaPool(t *testing.T) {
go func() {
var ba roachpb.BatchRequest
ba.Add(putArgs(keyToWrite, value))
if err := ba.SetActiveTimestamp(tc.Servers[0].Clock().Now); err != nil {
if err := ba.SetActiveTimestamp(tc.Servers[0].Clock()); err != nil {
ch <- roachpb.NewError(err)
return
}
Expand Down Expand Up @@ -2363,7 +2363,7 @@ func TestWedgedReplicaDetection(t *testing.T) {
value := []byte("value")
var ba roachpb.BatchRequest
ba.Add(putArgs(key, value))
if err := ba.SetActiveTimestamp(tc.Servers[0].Clock().Now); err != nil {
if err := ba.SetActiveTimestamp(tc.Servers[0].Clock()); err != nil {
t.Fatal(err)
}
if _, pErr := leaderRepl.Send(ctx, ba); pErr != nil {
Expand Down
4 changes: 2 additions & 2 deletions pkg/kv/kvserver/closed_timestamp_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -107,7 +107,7 @@ func TestClosedTimestampCanServe(t *testing.T) {
baWrite.Txn = &txn
baWrite.Add(r)
baWrite.RangeID = repls[0].RangeID
if err := baWrite.SetActiveTimestamp(tc.Server(0).Clock().Now); err != nil {
if err := baWrite.SetActiveTimestamp(tc.Server(0).Clock()); err != nil {
t.Fatal(err)
}

Expand Down Expand Up @@ -589,7 +589,7 @@ func TestClosedTimestampCantServeForNonTransactionalBatch(t *testing.T) {
// Otherwise, all three should succeed.
baRead.Txn = nil
if tsFromServer {
baRead.TimestampFromServerClock = true
baRead.TimestampFromServerClock = (*hlc.ClockTimestamp)(&ts)
verifyNotLeaseHolderErrors(t, baRead, repls, 2)
} else {
testutils.SucceedsSoon(t, func() error {
Expand Down
2 changes: 1 addition & 1 deletion pkg/kv/kvserver/replica_follower_read.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ func BatchCanBeEvaluatedOnFollower(ba roachpb.BatchRequest) bool {
// propose writes to Raft.
// 4. the batch needs to be non-locking, because unreplicated locks are only
// held on the leaseholder.
tsFromServerClock := ba.Txn == nil && (ba.Timestamp.IsEmpty() || ba.TimestampFromServerClock)
tsFromServerClock := ba.Txn == nil && (ba.Timestamp.IsEmpty() || ba.TimestampFromServerClock != nil)
if tsFromServerClock {
return false
}
Expand Down
16 changes: 8 additions & 8 deletions pkg/kv/kvserver/replica_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -226,7 +226,7 @@ func (tc *testContext) Sender() kv.Sender {
ba.RangeID = 1
}
if ba.Timestamp.IsEmpty() {
if err := ba.SetActiveTimestamp(tc.Clock().Now); err != nil {
if err := ba.SetActiveTimestamp(tc.Clock()); err != nil {
tc.Fatal(err)
}
}
Expand Down Expand Up @@ -4511,7 +4511,7 @@ func TestEndTxnRollbackAbortedTransaction(t *testing.T) {
var ba roachpb.BatchRequest
gArgs := getArgs(key)
ba.Add(&gArgs)
if err := ba.SetActiveTimestamp(tc.Clock().Now); err != nil {
if err := ba.SetActiveTimestamp(tc.Clock()); err != nil {
t.Fatal(err)
}
_, pErr := tc.Sender().Send(ctx, ba)
Expand Down Expand Up @@ -4677,7 +4677,7 @@ func TestBatchRetryCantCommitIntents(t *testing.T) {
ba.Header = roachpb.Header{Txn: txn}
ba.Add(&put)
assignSeqNumsForReqs(txn, &put)
if err := ba.SetActiveTimestamp(tc.Clock().Now); err != nil {
if err := ba.SetActiveTimestamp(tc.Clock()); err != nil {
t.Fatal(err)
}
br, pErr := tc.Sender().Send(ctx, ba)
Expand Down Expand Up @@ -4845,7 +4845,7 @@ func setupResolutionTest(
var ba roachpb.BatchRequest
ba.Header = h
ba.RangeID = newRepl.RangeID
if err := ba.SetActiveTimestamp(newRepl.store.Clock().Now); err != nil {
if err := ba.SetActiveTimestamp(newRepl.store.Clock()); err != nil {
t.Fatal(err)
}
pArgs := putArgs(splitKey.AsRawKey(), []byte("value"))
Expand Down Expand Up @@ -4898,7 +4898,7 @@ func TestEndTxnResolveOnlyLocalIntents(t *testing.T) {
ba.Header.RangeID = newRepl.RangeID
gArgs := getArgs(splitKey)
ba.Add(&gArgs)
if err := ba.SetActiveTimestamp(tc.Clock().Now); err != nil {
if err := ba.SetActiveTimestamp(tc.Clock()); err != nil {
t.Fatal(err)
}
_, pErr := newRepl.Send(ctx, ba)
Expand Down Expand Up @@ -7501,7 +7501,7 @@ func TestReplicaCancelRaft(t *testing.T) {
ba.Add(&roachpb.GetRequest{
RequestHeader: roachpb.RequestHeader{Key: key},
})
if err := ba.SetActiveTimestamp(tc.Clock().Now); err != nil {
if err := ba.SetActiveTimestamp(tc.Clock()); err != nil {
t.Fatal(err)
}
_, pErr := tc.repl.executeBatchWithConcurrencyRetries(ctx, &ba, (*Replica).executeWriteBatch)
Expand Down Expand Up @@ -9128,7 +9128,7 @@ func TestNoopRequestsNotProposed(t *testing.T) {
ba.Header.RangeID = repl.RangeID
ba.Add(req)
ba.Txn = txn
if err := ba.SetActiveTimestamp(repl.Clock().Now); err != nil {
if err := ba.SetActiveTimestamp(repl.Clock()); err != nil {
t.Fatal(err)
}
_, pErr := repl.Send(ctx, ba)
Expand Down Expand Up @@ -9421,7 +9421,7 @@ func TestErrorInRaftApplicationClearsIntents(t *testing.T) {
ba.Header.Txn = txn
ba.Add(&etArgs)
assignSeqNumsForReqs(txn, &etArgs)
require.NoError(t, ba.SetActiveTimestamp(func() hlc.Timestamp { return hlc.Timestamp{} }))
require.NoError(t, ba.SetActiveTimestamp(s.Clock()))
// Get a reference to the txn's replica.
stores := s.GetStores().(*Stores)
store, err := stores.GetStore(s.GetFirstStoreID())
Expand Down
2 changes: 1 addition & 1 deletion pkg/kv/kvserver/store_send.go
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,7 @@ func (s *Store) Send(
ba = newBa
}

if err := ba.SetActiveTimestamp(s.Clock().Now); err != nil {
if err := ba.SetActiveTimestamp(s.Clock()); err != nil {
return nil, roachpb.NewError(err)
}

Expand Down
18 changes: 14 additions & 4 deletions pkg/roachpb/api.proto
Original file line number Diff line number Diff line change
Expand Up @@ -2135,14 +2135,24 @@ message Header {
// (txn.WriteTimestamp).
util.hlc.Timestamp timestamp = 1 [(gogoproto.nullable) = false];
// timestamp_from_server_clock is set when a non-transactional BatchRequest
// has its timestamp initialized to the wall time of the server node. This
// flag is needed to ensure that such requests are never served as follower
// has its timestamp initialized to the wall time of the server node. When
// non-nil, this denotes the clock timestamp that the timestamp field was
// initially assigned upon arriving at the server node.
//
// It is needed to ensure that such requests are never served as follower
// reads. Initializing the timestamp of a request on a node that holds a
// follower instead of the leaseholder for a range and then using this
// timestamp to deem a follower read safe could allow for consistency
// violations where a non-transactional read following a write could fail to
// observe the write.
bool timestamp_from_server_clock = 20;
//
// It is also needed to record the time at which the request was received by
// the server node. The operation timestamp cannot serve this role because it
// can change due to server-side uncertainty retries. By remembering a stable
// reference to the initial timestamp, we ensure that a non-transactional
// request's uncertainty interval remains fixed across retries.
util.hlc.Timestamp timestamp_from_server_clock = 27 [
(gogoproto.casttype) = "github.com/cockroachdb/cockroach/pkg/util/hlc.ClockTimestamp"];
// replica specifies the destination of the request.
ReplicaDescriptor replica = 2 [(gogoproto.nullable) = false];
// range_id specifies the ID of the Raft consensus group which the key
Expand Down Expand Up @@ -2376,7 +2386,7 @@ message Header {

util.tracing.tracingpb.TraceInfo trace_info = 25 [(gogoproto.nullable) = false];

reserved 7, 10, 12, 14;
reserved 7, 10, 12, 14, 20;
}

// BoundedStalenessHeader contains configuration values pertaining to bounded
Expand Down
11 changes: 6 additions & 5 deletions pkg/roachpb/batch.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,8 +54,8 @@ func (h Header) RequiredFrontier() hlc.Timestamp {
// carried out. For transactional requests, ba.Timestamp must be zero initially
// and it will be set to txn.ReadTimestamp (note though this mostly impacts
// reads; writes use txn.WriteTimestamp). For non-transactional requests, if no
// timestamp is specified, nowFn is used to create and set one.
func (ba *BatchRequest) SetActiveTimestamp(nowFn func() hlc.Timestamp) error {
// timestamp is specified, clock is used to create and set one.
func (ba *BatchRequest) SetActiveTimestamp(clock *hlc.Clock) error {
if txn := ba.Txn; txn != nil {
if !ba.Timestamp.IsEmpty() {
return errors.New("transactional request must not set batch timestamp")
Expand All @@ -71,10 +71,11 @@ func (ba *BatchRequest) SetActiveTimestamp(nowFn func() hlc.Timestamp) error {
// txn.WriteTimestamp, regardless of the batch timestamp.
ba.Timestamp = txn.ReadTimestamp
} else {
// When not transactional, allow empty timestamp and use nowFn instead
// When not transactional, allow empty timestamp and use clock instead.
if ba.Timestamp.IsEmpty() {
ba.Timestamp = nowFn()
ba.TimestampFromServerClock = true
now := clock.NowAsClockTimestamp()
ba.Timestamp = now.ToTimestamp() // copies value, not aliasing reference
ba.TimestampFromServerClock = &now
}
}
return nil
Expand Down

0 comments on commit 4d9a372

Please sign in to comment.