Skip to content

Commit

Permalink
kv: don't pass clock information through Raft log
Browse files Browse the repository at this point in the history
This commit eliminates the primary mechanism that we use to pass clock
information from a leaseholder, through Raft log entries, to a Range's
followers. As we found in cockroachdb#72278, this was only needed for correctness
in a few specific cases — namely lease transfers and range merges. These
two operations continue to pass clock signals through more explicit
channels, but we remove the unnecessary general case.

The allows us to remote one of the two remaining places where we convert
a `Timestamp` to a `ClockTimestamp` through the `TryToClockTimestamp`
method. As outlined in cockroachdb#72121 (comment),
I would like to remove ability to downcast a "data-plane" `Timestamp` to
a "control-plane" `CloudTimestamp` entirely. This will clarify the role
of `ClockTimestamps` in the system and clean up the channels through
which clock information is passed between nodes.

The other place where we cast from `Timestamp` to `ClockTimesatmp` is
in `Store.Send`, at the boundary of KV RPCs. I would also like to get
rid of this, but doing so needs to wait on cockroachdb#73732.
  • Loading branch information
nvanbenschoten committed Feb 5, 2022
1 parent 6ea73f4 commit f16f5cd
Show file tree
Hide file tree
Showing 10 changed files with 59 additions and 152 deletions.
2 changes: 1 addition & 1 deletion docs/generated/settings/settings-for-tenants.txt
Original file line number Diff line number Diff line change
Expand Up @@ -178,4 +178,4 @@ trace.debug.enable boolean false if set, traces for recent requests can be seen
trace.jaeger.agent string the address of a Jaeger agent to receive traces using the Jaeger UDP Thrift protocol, as <host>:<port>. If no port is specified, 6381 will be used.
trace.opentelemetry.collector string address of an OpenTelemetry trace collector to receive traces using the otel gRPC protocol, as <host>:<port>. If no port is specified, 4317 will be used.
trace.zipkin.collector string the address of a Zipkin instance to receive traces, as <host>:<port>. If no port is specified, 9411 will be used.
version version 21.2-58 set the active cluster version in the format '<major>.<minor>'
version version 21.2-60 set the active cluster version in the format '<major>.<minor>'
2 changes: 1 addition & 1 deletion docs/generated/settings/settings.html
Original file line number Diff line number Diff line change
Expand Up @@ -186,6 +186,6 @@
<tr><td><code>trace.jaeger.agent</code></td><td>string</td><td><code></code></td><td>the address of a Jaeger agent to receive traces using the Jaeger UDP Thrift protocol, as <host>:<port>. If no port is specified, 6381 will be used.</td></tr>
<tr><td><code>trace.opentelemetry.collector</code></td><td>string</td><td><code></code></td><td>address of an OpenTelemetry trace collector to receive traces using the otel gRPC protocol, as <host>:<port>. If no port is specified, 4317 will be used.</td></tr>
<tr><td><code>trace.zipkin.collector</code></td><td>string</td><td><code></code></td><td>the address of a Zipkin instance to receive traces, as <host>:<port>. If no port is specified, 9411 will be used.</td></tr>
<tr><td><code>version</code></td><td>version</td><td><code>21.2-58</code></td><td>set the active cluster version in the format '<major>.<minor>'</td></tr>
<tr><td><code>version</code></td><td>version</td><td><code>21.2-60</code></td><td>set the active cluster version in the format '<major>.<minor>'</td></tr>
</tbody>
</table>
10 changes: 8 additions & 2 deletions pkg/clusterversion/cockroach_versions.go
Original file line number Diff line number Diff line change
Expand Up @@ -263,7 +263,10 @@ const (
// PostAddRaftAppliedIndexTermMigration is used for asserting that
// RaftAppliedIndexTerm is populated.
PostAddRaftAppliedIndexTermMigration

// DontProposeWriteTimestampForLeaseTransfers stops setting the WriteTimestamp
// on lease transfer Raft proposals. New leaseholders now forward their clock
// directly to the new lease start time.
DontProposeWriteTimestampForLeaseTransfers
// *************************************************
// Step (1): Add new versions here.
// Do not add new versions to a patch release.
Expand Down Expand Up @@ -407,7 +410,6 @@ var versionsSingleton = keyedVersions{
Key: RemoveIncompatibleDatabasePrivileges,
Version: roachpb.Version{Major: 21, Minor: 2, Internal: 54},
},

{
Key: AddRaftAppliedIndexTermMigration,
Version: roachpb.Version{Major: 21, Minor: 2, Internal: 56},
Expand All @@ -416,6 +418,10 @@ var versionsSingleton = keyedVersions{
Key: PostAddRaftAppliedIndexTermMigration,
Version: roachpb.Version{Major: 21, Minor: 2, Internal: 58},
},
{
Key: DontProposeWriteTimestampForLeaseTransfers,
Version: roachpb.Version{Major: 21, Minor: 2, Internal: 60},
},

// *************************************************
// Step (2): Add new versions here.
Expand Down
14 changes: 8 additions & 6 deletions pkg/kv/kvserver/batcheval/cmd_lease_transfer.go
Original file line number Diff line number Diff line change
Expand Up @@ -68,13 +68,7 @@ func TransferLease(
// LeaseRejectedError before going through Raft.
prevLease, _ := cArgs.EvalCtx.GetLease()

// Forward the lease's start time to a current clock reading. At this
// point, we're holding latches across the entire range, we know that
// this time is greater than the timestamps at which any request was
// serviced by the leaseholder before it stopped serving requests (i.e.
// before the TransferLease request acquired latches).
newLease := args.Lease
newLease.Start.Forward(cArgs.EvalCtx.Clock().NowAsClockTimestamp())
args.Lease = roachpb.Lease{} // prevent accidental use below

// If this check is removed at some point, the filtering of learners on the
Expand Down Expand Up @@ -104,6 +98,14 @@ func TransferLease(
// and be rejected with the correct error below Raft.
cArgs.EvalCtx.RevokeLease(ctx, args.PrevLease.Sequence)

// Forward the lease's start time to a current clock reading. At this
// point, we're holding latches across the entire range, we know that
// this time is greater than the timestamps at which any request was
// serviced by the leaseholder before it stopped serving requests (i.e.
// before the TransferLease request acquired latches and before the
// previous lease was revoked).
newLease.Start.Forward(cArgs.EvalCtx.Clock().NowAsClockTimestamp())

// Collect a read summary from the outgoing leaseholder to ship to the
// incoming leaseholder. This is used to instruct the new leaseholder on how
// to update its timestamp cache to ensure that no future writes are allowed
Expand Down
111 changes: 10 additions & 101 deletions pkg/kv/kvserver/client_replica_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -64,9 +64,9 @@ import (
"go.etcd.io/etcd/raft/v3/raftpb"
)

// TestReplicaClockUpdates verifies that the leaseholder and followers both
// update their clocks when executing a command to the command's timestamp, as
// long as the request timestamp is from a clock (i.e. is not synthetic).
// TestReplicaClockUpdates verifies that the leaseholder updates its clocks
// when executing a command to the command's timestamp, as long as the
// request timestamp is from a clock (i.e. is not synthetic).
func TestReplicaClockUpdates(t *testing.T) {
defer leaktest.AfterTest(t)()
defer log.Scope(t).Close(t)
Expand Down Expand Up @@ -125,43 +125,14 @@ func TestReplicaClockUpdates(t *testing.T) {
t.Fatal(err)
}

// If writing, wait for that command to execute on all the replicas.
// Consensus is asynchronous outside of the majority quorum, and Raft
// application is asynchronous on all nodes.
if write {
testutils.SucceedsSoon(t, func() error {
var values []int64
for i := range tc.Servers {
val, _, err := storage.MVCCGet(ctx,
tc.GetFirstStoreFromServer(t, i).Engine(), reqKey, reqTS,
storage.MVCCGetOptions{})
if err != nil {
return err
}
values = append(values, mustGetInt(val))
}
if !reflect.DeepEqual(values, []int64{5, 5, 5}) {
return errors.Errorf("expected (5, 5, 5), got %v", values)
}
return nil
})
}

// Verify that clocks were updated as expected. Check all clocks if we
// issued a write, but only the leaseholder's if we issued a read. In
// theory, we should be able to assert that _only_ the leaseholder's
// clock is updated by a read, but in practice an assertion against
// followers' clocks being updated is very difficult to make without
// being flaky because it's difficult to prevent other channels
// (background work, etc.) from carrying the clock update.
// Verify that clocks were updated as expected. Only the leaseholder should
// have updated its clock for either a read or a write. In theory, we should
// be able to assert that _only_ the leaseholder's clock is updated, but in
// practice an assertion against followers' clocks being updated is very
// difficult to make without being flaky because it's difficult to prevent
// other channels (background work, etc.) from carrying the clock update.
expUpdated := !synthetic
clocksToCheck := clocks
if !write {
clocksToCheck = clocks[:1]
}
for _, c := range clocksToCheck {
require.Equal(t, expUpdated, reqTS.Less(c.Now()))
}
require.Equal(t, expUpdated, reqTS.Less(clocks[0].Now()))
}

testutils.RunTrueAndFalse(t, "write", func(t *testing.T, write bool) {
Expand All @@ -171,68 +142,6 @@ func TestReplicaClockUpdates(t *testing.T) {
})
}

// TestFollowersDontRejectClockUpdateWithJump verifies that followers update
// their clocks when executing a command, even if the leaseholder's clock is
// far in the future.
func TestFollowersDontRejectClockUpdateWithJump(t *testing.T) {
defer leaktest.AfterTest(t)()
defer log.Scope(t).Close(t)

const numNodes = 3
var manuals []*hlc.HybridManualClock
var clocks []*hlc.Clock
for i := 0; i < numNodes; i++ {
manuals = append(manuals, hlc.NewHybridManualClock())
}
serverArgs := make(map[int]base.TestServerArgs)
for i := 0; i < numNodes; i++ {
serverArgs[i] = base.TestServerArgs{
Knobs: base.TestingKnobs{
Server: &server.TestingKnobs{
ClockSource: manuals[i].UnixNano,
},
},
}
}
ctx := context.Background()
tc := testcluster.StartTestCluster(t, numNodes,
base.TestClusterArgs{
ReplicationMode: base.ReplicationManual,
ServerArgsPerNode: serverArgs,
})
defer tc.Stopper().Stop(ctx)

store := tc.GetFirstStoreFromServer(t, 0)
reqKey := roachpb.Key("a")
tc.SplitRangeOrFatal(t, reqKey)
tc.AddVotersOrFatal(t, reqKey, tc.Targets(1, 2)...)

for i, s := range tc.Servers {
clocks = append(clocks, s.Clock())
manuals[i].Pause()
}
// Advance the lease holder's clock ahead of the followers (by more than
// MaxOffset but less than the range lease) and execute a command.
manuals[0].Increment(int64(500 * time.Millisecond))
incArgs := incrementArgs(reqKey, 5)
ts := clocks[0].Now()
if _, err := kv.SendWrappedWith(context.Background(), store.TestSender(), roachpb.Header{Timestamp: ts}, incArgs); err != nil {
t.Fatal(err)
}
// Wait for that command to execute on all the followers.
tc.WaitForValues(t, reqKey, []int64{5, 5, 5})

// Verify that all the followers have accepted the clock update from
// node 0 even though it comes from outside the usual max offset.
now := clocks[0].Now()
for i, clock := range clocks {
// Only compare the WallTimes: it's normal for clock 0 to be a few logical ticks ahead.
if clock.Now().WallTime < now.WallTime {
t.Errorf("clock %d is behind clock 0: %s vs %s", i, clock.Now(), now)
}
}
}

// TestLeaseholdersRejectClockUpdateWithJump verifies that leaseholders reject
// commands that would cause a large time jump.
func TestLeaseholdersRejectClockUpdateWithJump(t *testing.T) {
Expand Down
6 changes: 2 additions & 4 deletions pkg/kv/kvserver/kvserverpb/proposer_kv.proto
Original file line number Diff line number Diff line change
Expand Up @@ -128,10 +128,8 @@ message ReplicatedEvalResult {
bool is_probe = 23;
// The timestamp at which this command is writing. Used to verify the validity
// of the command against the GC threshold and to update the followers'
// clocks. If the request that produced this command is not a write that cares
// about the timestamp cache, then the request's write timestamp is
// meaningless; for such request's, this field is simply a clock reading from
// the proposer.
// clocks. Only set if the request that produced this command is a write that
// cares about the timestamp cache.
util.hlc.Timestamp write_timestamp = 8 [(gogoproto.nullable) = false];
// The stats delta corresponding to the data in this WriteBatch. On
// a split, contains only the contributions to the left-hand side.
Expand Down
22 changes: 1 addition & 21 deletions pkg/kv/kvserver/replica_application_state_machine.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,6 @@ import (
"github.com/cockroachdb/cockroach/pkg/storage"
"github.com/cockroachdb/cockroach/pkg/storage/enginepb"
"github.com/cockroachdb/cockroach/pkg/util/envutil"
"github.com/cockroachdb/cockroach/pkg/util/hlc"
"github.com/cockroachdb/cockroach/pkg/util/log"
"github.com/cockroachdb/cockroach/pkg/util/protoutil"
"github.com/cockroachdb/cockroach/pkg/util/timeutil"
Expand Down Expand Up @@ -425,11 +424,6 @@ type replicaAppBatch struct {
// replicaState other than Stats are overwritten completely rather than
// updated in-place.
stats enginepb.MVCCStats
// maxTS is the maximum clock timestamp that this command carries. Timestamps
// come from the writes that are part of this command, and also from the
// closed timestamp carried by this command. Synthetic timestamps are not
// registered here.
maxTS hlc.ClockTimestamp
// changeRemovesReplica tracks whether the command in the batch (there must
// be only one) removes this replica from the range.
changeRemovesReplica bool
Expand Down Expand Up @@ -522,11 +516,6 @@ func (b *replicaAppBatch) Stage(
cmd.splitMergeUnlock = splitMergeUnlock
}

// Update the batch's max timestamp.
if clockTS, ok := cmd.replicatedResult().WriteTimestamp.TryToClockTimestamp(); ok {
b.maxTS.Forward(clockTS)
}

// Normalize the command, accounting for past migrations.
b.migrateReplicatedResult(ctx, cmd)

Expand Down Expand Up @@ -873,9 +862,6 @@ func (b *replicaAppBatch) stageTrivialReplicatedEvalResult(
if cts := cmd.raftCmd.ClosedTimestamp; cts != nil && !cts.IsEmpty() {
b.state.RaftClosedTimestamp = *cts
b.closedTimestampSetter.record(cmd, b.state.Lease)
if clockTS, ok := cts.TryToClockTimestamp(); ok {
b.maxTS.Forward(clockTS)
}
}

res := cmd.replicatedResult()
Expand All @@ -897,13 +883,6 @@ func (b *replicaAppBatch) ApplyToStateMachine(ctx context.Context) error {
log.Infof(ctx, "flushing batch %v of %d entries", b.state, b.entries)
}

// Update the node clock with the maximum timestamp of all commands in the
// batch. This maintains a high water mark for all ops serviced, so that
// received ops without a timestamp specified are guaranteed one higher than
// any op already executed for overlapping keys.
r := b.r
r.store.Clock().Update(b.maxTS)

// Add the replica applied state key to the write batch if this change
// doesn't remove us.
if !b.changeRemovesReplica {
Expand All @@ -928,6 +907,7 @@ func (b *replicaAppBatch) ApplyToStateMachine(ctx context.Context) error {
b.batch = nil

// Update the replica's applied indexes, mvcc stats and closed timestamp.
r := b.r
r.mu.Lock()
r.mu.state.RaftAppliedIndex = b.state.RaftAppliedIndex
// RaftAppliedIndexTerm will be non-zero only when the
Expand Down
23 changes: 18 additions & 5 deletions pkg/kv/kvserver/replica_proposal.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ import (
"unsafe"

"github.com/cockroachdb/cockroach/pkg/base"
"github.com/cockroachdb/cockroach/pkg/clusterversion"
"github.com/cockroachdb/cockroach/pkg/keys"
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/batcheval"
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/batcheval/result"
Expand Down Expand Up @@ -407,6 +408,12 @@ func (r *Replica) leasePostApplyLocked(
newLease, err)
}

// Forward the node clock to the start time of the new lease. This ensures
// that the leaseholder's clock always leads its lease's start time. For an
// explanation about why this is needed, see "Cooperative lease transfers"
// in pkg/util/hlc/doc.go.
r.Clock().Update(newLease.Start)

// If this replica is a new holder of the lease, update the timestamp
// cache. Note that clock offset scenarios are handled via a stasis
// period inherent in the lease which is documented in the Lease struct.
Expand Down Expand Up @@ -854,11 +861,17 @@ func (r *Replica) evaluateProposal(
if ba.AppliesTimestampCache() {
res.Replicated.WriteTimestamp = ba.WriteTimestamp()
} else {
// For misc requests, use WriteTimestamp to propagate a clock signal. This
// is particularly important for lease transfers, as it assures that the
// follower getting the lease will have a clock above the start time of
// its lease.
res.Replicated.WriteTimestamp = r.store.Clock().Now()
if !r.ClusterSettings().Version.IsActive(ctx, clusterversion.DontProposeWriteTimestampForLeaseTransfers) {
// For misc requests, use WriteTimestamp to propagate a clock signal. This
// is particularly important for lease transfers, as it assures that the
// follower getting the lease will have a clock above the start time of
// its lease.
//
// This is no longer needed in v22.1 because nodes running v22.1 and
// above will update their clock directly from the new lease's start
// time.
res.Replicated.WriteTimestamp = r.store.Clock().Now()
}
}
res.Replicated.Delta = ms.ToStatsDelta()

Expand Down
7 changes: 3 additions & 4 deletions pkg/kv/kvserver/store_merge.go
Original file line number Diff line number Diff line change
Expand Up @@ -172,10 +172,9 @@ func (s *Store) MergeRange(
// reads all the way up to freezeStart, the time at which the RHS
// promised to stop serving traffic.
//
// Note that we need to update our clock with freezeStart to preserve
// the invariant that our clock is always greater than or equal to any
// timestamps in the timestamp cache. For a full discussion, see the
// comment on TestStoreRangeMergeTimestampCacheCausality.
// For an explanation about why we need to update out clock with the
// merge's freezeStart, see "Range merges" in pkg/util/hlc/doc.go. Also,
// see the comment on TestStoreRangeMergeTimestampCacheCausality.
s.Clock().Update(freezeStart)

var sum rspb.ReadSummary
Expand Down
14 changes: 7 additions & 7 deletions pkg/util/hlc/doc.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,10 +31,11 @@ There are currently three channels through which HLC timestamps are passed
between nodes in a cluster:
- Raft (unidirectional): proposers of Raft commands (i.e. leaseholders) attach
clock readings to these command, which are later consumed by followers when
commands are applied to their Raft state machine.
clock readings to some of these command (e.g. lease transfers, range merges),
which are later consumed by followers when commands are applied to their Raft
state machine.
Ref: (kvserverpb.ReplicatedEvalResult).WriteTimestamp.
Ref: (roachpb.Lease).Start.
Ref: (roachpb.MergeTrigger).FreezeStart.
- BatchRequest API (bidirectional): clients and servers of the KV BatchRequest
Expand Down Expand Up @@ -82,10 +83,9 @@ TODO(nvanbenschoten): Update the above on written timestamps after #72121.
transfer from one replica of a range to another, the outgoing leaseholder
revokes its lease before its expiration time and consults its clock to
determine the start time of the next lease. It then proposes this new lease
through Raft (see the raft channel above) with a clock reading attached that
is >= the new lease's start time. Upon application of this Raft entry, the
incoming leaseholder forwards its HLC to this clock reading, transitively
ensuring that its clock is >= the new lease's start time.
through Raft (see the raft channel above). Upon application of this Raft
entry, the incoming leaseholder forwards its HLC to the start time of the
lease, ensuring that its clock is >= the new lease's start time.
The invariant that a leaseholder's clock is always >= its lease's start time
is used in a few places. First, it ensures that the leaseholder's clock
Expand Down

0 comments on commit f16f5cd

Please sign in to comment.