From f16f5cd28bc9135c3d9b1aa27422dee6f4eb7a8c Mon Sep 17 00:00:00 2001 From: Nathan VanBenschoten Date: Fri, 4 Feb 2022 15:17:45 -0500 Subject: [PATCH] kv: don't pass clock information through Raft log MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit This commit eliminates the primary mechanism that we use to pass clock information from a leaseholder, through Raft log entries, to a Range's followers. As we found in #72278, this was only needed for correctness in a few specific cases — namely lease transfers and range merges. These two operations continue to pass clock signals through more explicit channels, but we remove the unnecessary general case. The allows us to remote one of the two remaining places where we convert a `Timestamp` to a `ClockTimestamp` through the `TryToClockTimestamp` method. As outlined in https://github.com/cockroachdb/cockroach/pull/72121#issuecomment-954433047, I would like to remove ability to downcast a "data-plane" `Timestamp` to a "control-plane" `CloudTimestamp` entirely. This will clarify the role of `ClockTimestamps` in the system and clean up the channels through which clock information is passed between nodes. The other place where we cast from `Timestamp` to `ClockTimesatmp` is in `Store.Send`, at the boundary of KV RPCs. I would also like to get rid of this, but doing so needs to wait on #73732. --- .../settings/settings-for-tenants.txt | 2 +- docs/generated/settings/settings.html | 2 +- pkg/clusterversion/cockroach_versions.go | 10 +- .../kvserver/batcheval/cmd_lease_transfer.go | 14 ++- pkg/kv/kvserver/client_replica_test.go | 111 ++---------------- pkg/kv/kvserver/kvserverpb/proposer_kv.proto | 6 +- .../replica_application_state_machine.go | 22 +--- pkg/kv/kvserver/replica_proposal.go | 23 +++- pkg/kv/kvserver/store_merge.go | 7 +- pkg/util/hlc/doc.go | 14 +-- 10 files changed, 59 insertions(+), 152 deletions(-) diff --git a/docs/generated/settings/settings-for-tenants.txt b/docs/generated/settings/settings-for-tenants.txt index 44204b8b12c..c2a157b8472 100644 --- a/docs/generated/settings/settings-for-tenants.txt +++ b/docs/generated/settings/settings-for-tenants.txt @@ -178,4 +178,4 @@ trace.debug.enable boolean false if set, traces for recent requests can be seen trace.jaeger.agent string the address of a Jaeger agent to receive traces using the Jaeger UDP Thrift protocol, as :. If no port is specified, 6381 will be used. trace.opentelemetry.collector string address of an OpenTelemetry trace collector to receive traces using the otel gRPC protocol, as :. If no port is specified, 4317 will be used. trace.zipkin.collector string the address of a Zipkin instance to receive traces, as :. If no port is specified, 9411 will be used. -version version 21.2-58 set the active cluster version in the format '.' +version version 21.2-60 set the active cluster version in the format '.' diff --git a/docs/generated/settings/settings.html b/docs/generated/settings/settings.html index 5e187f6be60..783bdef619a 100644 --- a/docs/generated/settings/settings.html +++ b/docs/generated/settings/settings.html @@ -186,6 +186,6 @@ trace.jaeger.agentstringthe address of a Jaeger agent to receive traces using the Jaeger UDP Thrift protocol, as :. If no port is specified, 6381 will be used. trace.opentelemetry.collectorstringaddress of an OpenTelemetry trace collector to receive traces using the otel gRPC protocol, as :. If no port is specified, 4317 will be used. trace.zipkin.collectorstringthe address of a Zipkin instance to receive traces, as :. If no port is specified, 9411 will be used. -versionversion21.2-58set the active cluster version in the format '.' +versionversion21.2-60set the active cluster version in the format '.' diff --git a/pkg/clusterversion/cockroach_versions.go b/pkg/clusterversion/cockroach_versions.go index 808a6e73ae0..e74010c036b 100644 --- a/pkg/clusterversion/cockroach_versions.go +++ b/pkg/clusterversion/cockroach_versions.go @@ -263,7 +263,10 @@ const ( // PostAddRaftAppliedIndexTermMigration is used for asserting that // RaftAppliedIndexTerm is populated. PostAddRaftAppliedIndexTermMigration - + // DontProposeWriteTimestampForLeaseTransfers stops setting the WriteTimestamp + // on lease transfer Raft proposals. New leaseholders now forward their clock + // directly to the new lease start time. + DontProposeWriteTimestampForLeaseTransfers // ************************************************* // Step (1): Add new versions here. // Do not add new versions to a patch release. @@ -407,7 +410,6 @@ var versionsSingleton = keyedVersions{ Key: RemoveIncompatibleDatabasePrivileges, Version: roachpb.Version{Major: 21, Minor: 2, Internal: 54}, }, - { Key: AddRaftAppliedIndexTermMigration, Version: roachpb.Version{Major: 21, Minor: 2, Internal: 56}, @@ -416,6 +418,10 @@ var versionsSingleton = keyedVersions{ Key: PostAddRaftAppliedIndexTermMigration, Version: roachpb.Version{Major: 21, Minor: 2, Internal: 58}, }, + { + Key: DontProposeWriteTimestampForLeaseTransfers, + Version: roachpb.Version{Major: 21, Minor: 2, Internal: 60}, + }, // ************************************************* // Step (2): Add new versions here. diff --git a/pkg/kv/kvserver/batcheval/cmd_lease_transfer.go b/pkg/kv/kvserver/batcheval/cmd_lease_transfer.go index a91ae04db97..0a0f92f12a5 100644 --- a/pkg/kv/kvserver/batcheval/cmd_lease_transfer.go +++ b/pkg/kv/kvserver/batcheval/cmd_lease_transfer.go @@ -68,13 +68,7 @@ func TransferLease( // LeaseRejectedError before going through Raft. prevLease, _ := cArgs.EvalCtx.GetLease() - // Forward the lease's start time to a current clock reading. At this - // point, we're holding latches across the entire range, we know that - // this time is greater than the timestamps at which any request was - // serviced by the leaseholder before it stopped serving requests (i.e. - // before the TransferLease request acquired latches). newLease := args.Lease - newLease.Start.Forward(cArgs.EvalCtx.Clock().NowAsClockTimestamp()) args.Lease = roachpb.Lease{} // prevent accidental use below // If this check is removed at some point, the filtering of learners on the @@ -104,6 +98,14 @@ func TransferLease( // and be rejected with the correct error below Raft. cArgs.EvalCtx.RevokeLease(ctx, args.PrevLease.Sequence) + // Forward the lease's start time to a current clock reading. At this + // point, we're holding latches across the entire range, we know that + // this time is greater than the timestamps at which any request was + // serviced by the leaseholder before it stopped serving requests (i.e. + // before the TransferLease request acquired latches and before the + // previous lease was revoked). + newLease.Start.Forward(cArgs.EvalCtx.Clock().NowAsClockTimestamp()) + // Collect a read summary from the outgoing leaseholder to ship to the // incoming leaseholder. This is used to instruct the new leaseholder on how // to update its timestamp cache to ensure that no future writes are allowed diff --git a/pkg/kv/kvserver/client_replica_test.go b/pkg/kv/kvserver/client_replica_test.go index f52a0e42c05..4cc76797998 100644 --- a/pkg/kv/kvserver/client_replica_test.go +++ b/pkg/kv/kvserver/client_replica_test.go @@ -64,9 +64,9 @@ import ( "go.etcd.io/etcd/raft/v3/raftpb" ) -// TestReplicaClockUpdates verifies that the leaseholder and followers both -// update their clocks when executing a command to the command's timestamp, as -// long as the request timestamp is from a clock (i.e. is not synthetic). +// TestReplicaClockUpdates verifies that the leaseholder updates its clocks +// when executing a command to the command's timestamp, as long as the +// request timestamp is from a clock (i.e. is not synthetic). func TestReplicaClockUpdates(t *testing.T) { defer leaktest.AfterTest(t)() defer log.Scope(t).Close(t) @@ -125,43 +125,14 @@ func TestReplicaClockUpdates(t *testing.T) { t.Fatal(err) } - // If writing, wait for that command to execute on all the replicas. - // Consensus is asynchronous outside of the majority quorum, and Raft - // application is asynchronous on all nodes. - if write { - testutils.SucceedsSoon(t, func() error { - var values []int64 - for i := range tc.Servers { - val, _, err := storage.MVCCGet(ctx, - tc.GetFirstStoreFromServer(t, i).Engine(), reqKey, reqTS, - storage.MVCCGetOptions{}) - if err != nil { - return err - } - values = append(values, mustGetInt(val)) - } - if !reflect.DeepEqual(values, []int64{5, 5, 5}) { - return errors.Errorf("expected (5, 5, 5), got %v", values) - } - return nil - }) - } - - // Verify that clocks were updated as expected. Check all clocks if we - // issued a write, but only the leaseholder's if we issued a read. In - // theory, we should be able to assert that _only_ the leaseholder's - // clock is updated by a read, but in practice an assertion against - // followers' clocks being updated is very difficult to make without - // being flaky because it's difficult to prevent other channels - // (background work, etc.) from carrying the clock update. + // Verify that clocks were updated as expected. Only the leaseholder should + // have updated its clock for either a read or a write. In theory, we should + // be able to assert that _only_ the leaseholder's clock is updated, but in + // practice an assertion against followers' clocks being updated is very + // difficult to make without being flaky because it's difficult to prevent + // other channels (background work, etc.) from carrying the clock update. expUpdated := !synthetic - clocksToCheck := clocks - if !write { - clocksToCheck = clocks[:1] - } - for _, c := range clocksToCheck { - require.Equal(t, expUpdated, reqTS.Less(c.Now())) - } + require.Equal(t, expUpdated, reqTS.Less(clocks[0].Now())) } testutils.RunTrueAndFalse(t, "write", func(t *testing.T, write bool) { @@ -171,68 +142,6 @@ func TestReplicaClockUpdates(t *testing.T) { }) } -// TestFollowersDontRejectClockUpdateWithJump verifies that followers update -// their clocks when executing a command, even if the leaseholder's clock is -// far in the future. -func TestFollowersDontRejectClockUpdateWithJump(t *testing.T) { - defer leaktest.AfterTest(t)() - defer log.Scope(t).Close(t) - - const numNodes = 3 - var manuals []*hlc.HybridManualClock - var clocks []*hlc.Clock - for i := 0; i < numNodes; i++ { - manuals = append(manuals, hlc.NewHybridManualClock()) - } - serverArgs := make(map[int]base.TestServerArgs) - for i := 0; i < numNodes; i++ { - serverArgs[i] = base.TestServerArgs{ - Knobs: base.TestingKnobs{ - Server: &server.TestingKnobs{ - ClockSource: manuals[i].UnixNano, - }, - }, - } - } - ctx := context.Background() - tc := testcluster.StartTestCluster(t, numNodes, - base.TestClusterArgs{ - ReplicationMode: base.ReplicationManual, - ServerArgsPerNode: serverArgs, - }) - defer tc.Stopper().Stop(ctx) - - store := tc.GetFirstStoreFromServer(t, 0) - reqKey := roachpb.Key("a") - tc.SplitRangeOrFatal(t, reqKey) - tc.AddVotersOrFatal(t, reqKey, tc.Targets(1, 2)...) - - for i, s := range tc.Servers { - clocks = append(clocks, s.Clock()) - manuals[i].Pause() - } - // Advance the lease holder's clock ahead of the followers (by more than - // MaxOffset but less than the range lease) and execute a command. - manuals[0].Increment(int64(500 * time.Millisecond)) - incArgs := incrementArgs(reqKey, 5) - ts := clocks[0].Now() - if _, err := kv.SendWrappedWith(context.Background(), store.TestSender(), roachpb.Header{Timestamp: ts}, incArgs); err != nil { - t.Fatal(err) - } - // Wait for that command to execute on all the followers. - tc.WaitForValues(t, reqKey, []int64{5, 5, 5}) - - // Verify that all the followers have accepted the clock update from - // node 0 even though it comes from outside the usual max offset. - now := clocks[0].Now() - for i, clock := range clocks { - // Only compare the WallTimes: it's normal for clock 0 to be a few logical ticks ahead. - if clock.Now().WallTime < now.WallTime { - t.Errorf("clock %d is behind clock 0: %s vs %s", i, clock.Now(), now) - } - } -} - // TestLeaseholdersRejectClockUpdateWithJump verifies that leaseholders reject // commands that would cause a large time jump. func TestLeaseholdersRejectClockUpdateWithJump(t *testing.T) { diff --git a/pkg/kv/kvserver/kvserverpb/proposer_kv.proto b/pkg/kv/kvserver/kvserverpb/proposer_kv.proto index a06ae500c83..241a0098251 100644 --- a/pkg/kv/kvserver/kvserverpb/proposer_kv.proto +++ b/pkg/kv/kvserver/kvserverpb/proposer_kv.proto @@ -128,10 +128,8 @@ message ReplicatedEvalResult { bool is_probe = 23; // The timestamp at which this command is writing. Used to verify the validity // of the command against the GC threshold and to update the followers' - // clocks. If the request that produced this command is not a write that cares - // about the timestamp cache, then the request's write timestamp is - // meaningless; for such request's, this field is simply a clock reading from - // the proposer. + // clocks. Only set if the request that produced this command is a write that + // cares about the timestamp cache. util.hlc.Timestamp write_timestamp = 8 [(gogoproto.nullable) = false]; // The stats delta corresponding to the data in this WriteBatch. On // a split, contains only the contributions to the left-hand side. diff --git a/pkg/kv/kvserver/replica_application_state_machine.go b/pkg/kv/kvserver/replica_application_state_machine.go index 16e6e8a104a..01583437cad 100644 --- a/pkg/kv/kvserver/replica_application_state_machine.go +++ b/pkg/kv/kvserver/replica_application_state_machine.go @@ -24,7 +24,6 @@ import ( "github.com/cockroachdb/cockroach/pkg/storage" "github.com/cockroachdb/cockroach/pkg/storage/enginepb" "github.com/cockroachdb/cockroach/pkg/util/envutil" - "github.com/cockroachdb/cockroach/pkg/util/hlc" "github.com/cockroachdb/cockroach/pkg/util/log" "github.com/cockroachdb/cockroach/pkg/util/protoutil" "github.com/cockroachdb/cockroach/pkg/util/timeutil" @@ -425,11 +424,6 @@ type replicaAppBatch struct { // replicaState other than Stats are overwritten completely rather than // updated in-place. stats enginepb.MVCCStats - // maxTS is the maximum clock timestamp that this command carries. Timestamps - // come from the writes that are part of this command, and also from the - // closed timestamp carried by this command. Synthetic timestamps are not - // registered here. - maxTS hlc.ClockTimestamp // changeRemovesReplica tracks whether the command in the batch (there must // be only one) removes this replica from the range. changeRemovesReplica bool @@ -522,11 +516,6 @@ func (b *replicaAppBatch) Stage( cmd.splitMergeUnlock = splitMergeUnlock } - // Update the batch's max timestamp. - if clockTS, ok := cmd.replicatedResult().WriteTimestamp.TryToClockTimestamp(); ok { - b.maxTS.Forward(clockTS) - } - // Normalize the command, accounting for past migrations. b.migrateReplicatedResult(ctx, cmd) @@ -873,9 +862,6 @@ func (b *replicaAppBatch) stageTrivialReplicatedEvalResult( if cts := cmd.raftCmd.ClosedTimestamp; cts != nil && !cts.IsEmpty() { b.state.RaftClosedTimestamp = *cts b.closedTimestampSetter.record(cmd, b.state.Lease) - if clockTS, ok := cts.TryToClockTimestamp(); ok { - b.maxTS.Forward(clockTS) - } } res := cmd.replicatedResult() @@ -897,13 +883,6 @@ func (b *replicaAppBatch) ApplyToStateMachine(ctx context.Context) error { log.Infof(ctx, "flushing batch %v of %d entries", b.state, b.entries) } - // Update the node clock with the maximum timestamp of all commands in the - // batch. This maintains a high water mark for all ops serviced, so that - // received ops without a timestamp specified are guaranteed one higher than - // any op already executed for overlapping keys. - r := b.r - r.store.Clock().Update(b.maxTS) - // Add the replica applied state key to the write batch if this change // doesn't remove us. if !b.changeRemovesReplica { @@ -928,6 +907,7 @@ func (b *replicaAppBatch) ApplyToStateMachine(ctx context.Context) error { b.batch = nil // Update the replica's applied indexes, mvcc stats and closed timestamp. + r := b.r r.mu.Lock() r.mu.state.RaftAppliedIndex = b.state.RaftAppliedIndex // RaftAppliedIndexTerm will be non-zero only when the diff --git a/pkg/kv/kvserver/replica_proposal.go b/pkg/kv/kvserver/replica_proposal.go index effe9fcdb97..7cc47bab68e 100644 --- a/pkg/kv/kvserver/replica_proposal.go +++ b/pkg/kv/kvserver/replica_proposal.go @@ -18,6 +18,7 @@ import ( "unsafe" "github.com/cockroachdb/cockroach/pkg/base" + "github.com/cockroachdb/cockroach/pkg/clusterversion" "github.com/cockroachdb/cockroach/pkg/keys" "github.com/cockroachdb/cockroach/pkg/kv/kvserver/batcheval" "github.com/cockroachdb/cockroach/pkg/kv/kvserver/batcheval/result" @@ -407,6 +408,12 @@ func (r *Replica) leasePostApplyLocked( newLease, err) } + // Forward the node clock to the start time of the new lease. This ensures + // that the leaseholder's clock always leads its lease's start time. For an + // explanation about why this is needed, see "Cooperative lease transfers" + // in pkg/util/hlc/doc.go. + r.Clock().Update(newLease.Start) + // If this replica is a new holder of the lease, update the timestamp // cache. Note that clock offset scenarios are handled via a stasis // period inherent in the lease which is documented in the Lease struct. @@ -854,11 +861,17 @@ func (r *Replica) evaluateProposal( if ba.AppliesTimestampCache() { res.Replicated.WriteTimestamp = ba.WriteTimestamp() } else { - // For misc requests, use WriteTimestamp to propagate a clock signal. This - // is particularly important for lease transfers, as it assures that the - // follower getting the lease will have a clock above the start time of - // its lease. - res.Replicated.WriteTimestamp = r.store.Clock().Now() + if !r.ClusterSettings().Version.IsActive(ctx, clusterversion.DontProposeWriteTimestampForLeaseTransfers) { + // For misc requests, use WriteTimestamp to propagate a clock signal. This + // is particularly important for lease transfers, as it assures that the + // follower getting the lease will have a clock above the start time of + // its lease. + // + // This is no longer needed in v22.1 because nodes running v22.1 and + // above will update their clock directly from the new lease's start + // time. + res.Replicated.WriteTimestamp = r.store.Clock().Now() + } } res.Replicated.Delta = ms.ToStatsDelta() diff --git a/pkg/kv/kvserver/store_merge.go b/pkg/kv/kvserver/store_merge.go index fbd2569bf90..e9409352aee 100644 --- a/pkg/kv/kvserver/store_merge.go +++ b/pkg/kv/kvserver/store_merge.go @@ -172,10 +172,9 @@ func (s *Store) MergeRange( // reads all the way up to freezeStart, the time at which the RHS // promised to stop serving traffic. // - // Note that we need to update our clock with freezeStart to preserve - // the invariant that our clock is always greater than or equal to any - // timestamps in the timestamp cache. For a full discussion, see the - // comment on TestStoreRangeMergeTimestampCacheCausality. + // For an explanation about why we need to update out clock with the + // merge's freezeStart, see "Range merges" in pkg/util/hlc/doc.go. Also, + // see the comment on TestStoreRangeMergeTimestampCacheCausality. s.Clock().Update(freezeStart) var sum rspb.ReadSummary diff --git a/pkg/util/hlc/doc.go b/pkg/util/hlc/doc.go index 6cb3f6a6327..bc67e1f11a8 100644 --- a/pkg/util/hlc/doc.go +++ b/pkg/util/hlc/doc.go @@ -31,10 +31,11 @@ There are currently three channels through which HLC timestamps are passed between nodes in a cluster: - Raft (unidirectional): proposers of Raft commands (i.e. leaseholders) attach - clock readings to these command, which are later consumed by followers when - commands are applied to their Raft state machine. + clock readings to some of these command (e.g. lease transfers, range merges), + which are later consumed by followers when commands are applied to their Raft + state machine. - Ref: (kvserverpb.ReplicatedEvalResult).WriteTimestamp. + Ref: (roachpb.Lease).Start. Ref: (roachpb.MergeTrigger).FreezeStart. - BatchRequest API (bidirectional): clients and servers of the KV BatchRequest @@ -82,10 +83,9 @@ TODO(nvanbenschoten): Update the above on written timestamps after #72121. transfer from one replica of a range to another, the outgoing leaseholder revokes its lease before its expiration time and consults its clock to determine the start time of the next lease. It then proposes this new lease - through Raft (see the raft channel above) with a clock reading attached that - is >= the new lease's start time. Upon application of this Raft entry, the - incoming leaseholder forwards its HLC to this clock reading, transitively - ensuring that its clock is >= the new lease's start time. + through Raft (see the raft channel above). Upon application of this Raft + entry, the incoming leaseholder forwards its HLC to the start time of the + lease, ensuring that its clock is >= the new lease's start time. The invariant that a leaseholder's clock is always >= its lease's start time is used in a few places. First, it ensures that the leaseholder's clock