From 1631b00e8c04e22bc69c024f5e9d8842519b5fdf Mon Sep 17 00:00:00 2001 From: Andrei Matei Date: Thu, 15 Oct 2020 18:47:32 -0400 Subject: [PATCH] kvserver: don't always refuse to take leases when draining Before this patch, a draining node would not take any new leases once it's draining. This is a problem in case all replicas of a range are draining at the same time (e.g. when draining a single-node cluster, or when draining multiple nodes at the same time perhaps by mistake) - nobody wants the lease. Particularly because the liveness range is expiration-based (and thus permanently in need of new leases to be granted), this quickly results in nodes failing their liveness. It also becomes more of a problem with #55148, where we start refusing to take the lease on replicas that are not the leader - so if the leader is draining, we deadlock. This patch makes an exception for leaders, which now no longer refuse the lease even when they're draining. The reasonsing being that it's too easy to deadlock otherwise. Release note: None --- pkg/kv/kvserver/replica_range_lease.go | 18 +- pkg/kv/kvserver/replica_test.go | 157 +++++++++++++++--- .../serverutils/test_cluster_shim.go | 5 + 3 files changed, 154 insertions(+), 26 deletions(-) diff --git a/pkg/kv/kvserver/replica_range_lease.go b/pkg/kv/kvserver/replica_range_lease.go index b4ab6989c3ab..c18deb414bf6 100644 --- a/pkg/kv/kvserver/replica_range_lease.go +++ b/pkg/kv/kvserver/replica_range_lease.go @@ -58,6 +58,7 @@ import ( "github.com/cockroachdb/errors" "github.com/cockroachdb/logtags" "github.com/opentracing/opentracing-go" + "go.etcd.io/etcd/raft" ) var leaseStatusLogLimiter = log.Every(5 * time.Second) @@ -616,8 +617,14 @@ func (r *Replica) requestLeaseLocked( return r.mu.pendingLeaseRequest.newResolvedHandle(roachpb.NewError( newNotLeaseHolderError(&transferLease, r.store.StoreID(), r.mu.state.Desc))) } - if r.store.IsDraining() { - // We've retired from active duty. + // If we're draining, we'd rather not take any new leases (since we're also + // trying to move leases away elsewhere). But if we're the leader, we don't + // really have a choice and we take the lease - there might not be any other + // replica available to take this lease (perhaps they're all draining). + if r.store.IsDraining() && (r.raftBasicStatusRLocked().RaftState != raft.StateLeader) { + // TODO(andrei): If we start refusing to take leases on followers elsewhere, + // this code can go away. + log.VEventf(ctx, 2, "refusing to take the lease because we're draining") return r.mu.pendingLeaseRequest.newResolvedHandle(roachpb.NewError( newNotLeaseHolderError(nil, r.store.StoreID(), r.mu.state.Desc))) } @@ -834,6 +841,13 @@ func newNotLeaseHolderError( return err } +func (r *Replica) currentLeaseStatus(ctx context.Context) kvserverpb.LeaseStatus { + timestamp := r.store.Clock().Now() + r.mu.RLock() + defer r.mu.RUnlock() + return r.leaseStatus(ctx, *r.mu.state.Lease, timestamp, r.mu.minLeaseProposedTS) +} + // leaseGoodToGo is a fast-path for lease checks which verifies that an // existing lease is valid and owned by the current store. This method should // not be called directly. Use redirectOnOrAcquireLease instead. diff --git a/pkg/kv/kvserver/replica_test.go b/pkg/kv/kvserver/replica_test.go index df9b0d969cc7..a2ea9dee0095 100644 --- a/pkg/kv/kvserver/replica_test.go +++ b/pkg/kv/kvserver/replica_test.go @@ -33,6 +33,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/gossip" "github.com/cockroachdb/cockroach/pkg/keys" "github.com/cockroachdb/cockroach/pkg/kv" + "github.com/cockroachdb/cockroach/pkg/kv/kvclient/kvcoord" "github.com/cockroachdb/cockroach/pkg/kv/kvserver/apply" "github.com/cockroachdb/cockroach/pkg/kv/kvserver/batcheval" "github.com/cockroachdb/cockroach/pkg/kv/kvserver/batcheval/result" @@ -1428,36 +1429,144 @@ func TestReplicaLeaseRejectUnknownRaftNodeID(t *testing.T) { } } -// TestReplicaDrainLease makes sure that no new leases are granted when -// the Store is draining. +// Test that draining nodes only take the lease if they're the leader. func TestReplicaDrainLease(t *testing.T) { defer leaktest.AfterTest(t)() defer log.Scope(t).Close(t) - tc := testContext{} - stopper := stop.NewStopper() - defer stopper.Stop(context.Background()) - tc.Start(t, stopper) - - // Acquire initial lease. ctx := context.Background() - status, pErr := tc.repl.redirectOnOrAcquireLease(ctx) - if pErr != nil { - t.Fatal(pErr) + var suspendN1Heartbeats syncutil.AtomicBool + + // This test waits for an epoch-based lease to expire, so we're setting the + // liveness duration as low as possible while still keeping the test stable. + nlKnobs := NodeLivenessTestingKnobs{ + LivenessDuration: 2000 * time.Millisecond, + RenewalDuration: 1000 * time.Millisecond, + } + // We eliminate clock offsets in order to eliminate the stasis period of + // leases. Otherwise we'd need to make leases longer. + storeKnobs := &StoreTestingKnobs{ + MaxOffset: time.Nanosecond, + } + + // rejectLivenessHeartbeats is a gRPC filter which returns errors on all + // attempts to heartbeat n1's liveness when suspendN1Heartbeats is set. + rejectN1Heartbeats := func(_ context.Context, ba roachpb.BatchRequest) (*roachpb.BatchResponse, error) { + if !suspendN1Heartbeats.Get() { + return nil, nil + } + cput, ok := ba.GetArg(roachpb.ConditionalPut) + if !ok { + return nil, nil + } + cp := cput.(*roachpb.ConditionalPutRequest) + if cp.Key.Equal(keys.NodeLivenessKey(1)) { + br := &roachpb.BatchResponse{} + br.Error = roachpb.NewErrorf("test rejecting request") + return br, nil + } + return nil, nil + } + + clusterArgs := base.TestClusterArgs{ + ServerArgs: base.TestServerArgs{ + Knobs: base.TestingKnobs{ + NodeLiveness: nlKnobs, + Store: storeKnobs, + }, + }, + ReplicationMode: base.ReplicationManual, + ServerArgsPerNode: map[int]base.TestServerArgs{ + 0: { + Knobs: base.TestingKnobs{ + NodeLiveness: nlKnobs, + Store: storeKnobs, + KVClient: &kvcoord.ClientTestingKnobs{ + RequestFilter: rejectN1Heartbeats, + }, + }, + }, + }, } + tc := serverutils.StartNewTestCluster(t, 2, clusterArgs) + defer tc.Stopper().Stop(ctx) + rngKey := tc.ScratchRange(t) + _, err := tc.AddReplicas(rngKey, tc.Target(1)) + require.NoError(t, err) - tc.store.SetDraining(true, nil /* reporter */) - tc.repl.mu.Lock() - pErr = <-tc.repl.requestLeaseLocked(ctx, status).C() - tc.repl.mu.Unlock() - _, ok := pErr.GetDetail().(*roachpb.NotLeaseHolderError) - if !ok { - t.Fatalf("expected NotLeaseHolderError, not %v", pErr) - } - tc.store.SetDraining(false, nil /* reporter */) - // Newly undrained, leases work again. - if _, pErr := tc.repl.redirectOnOrAcquireLease(ctx); pErr != nil { - t.Fatal(pErr) - } + s1 := tc.Server(0) + s2 := tc.Server(1) + store1, err := s1.GetStores().(*Stores).GetStore(s1.GetFirstStoreID()) + require.NoError(t, err) + store2, err := s2.GetStores().(*Stores).GetStore(s2.GetFirstStoreID()) + require.NoError(t, err) + + rd, err := tc.LookupRange(rngKey) + require.NoError(t, err) + r1, err := store1.GetReplica(rd.RangeID) + require.NoError(t, err) + status := r1.currentLeaseStatus(ctx) + require.True(t, status.Lease.OwnedBy(store1.StoreID()), "someone else got the lease: %s", status) + // We expect the lease to be valid, but don't check that because, under race, it might have + // expired already. + + // Stop n1's heartbeats and wait for the lease to expire. + + log.Infof(ctx, "test: suspending heartbeats for n1") + suspendN1Heartbeats.Set(true) + + require.NoError(t, err) + testutils.SucceedsSoon(t, func() error { + status := r1.currentLeaseStatus(ctx) + require.True(t, status.Lease.OwnedBy(store1.StoreID()), "someone else got the lease: %s", status) + if status.State == kvserverpb.LeaseState_VALID { + return errors.New("lease still valid") + } + // We need to wait for the stasis state to pass too; during stasis other + // replicas can't take the lease. + if status.State == kvserverpb.LeaseState_STASIS { + return errors.New("lease still in stasis") + } + return nil + }) + + require.Equal(t, r1.RaftStatus().Lead, uint64(r1.ReplicaID()), + "expected leadership to still be on the first replica") + + // Mark the stores as draining. We'll then start checking how acquiring leases + // behaves while draining. + store1.draining.Store(true) + store2.draining.Store(true) + + r2, err := store2.GetReplica(rd.RangeID) + require.NoError(t, err) + // Check that a draining replica that's not the leader does NOT take the + // lease. + _, pErr := r2.redirectOnOrAcquireLease(ctx) + require.NotNil(t, pErr) + require.IsType(t, &roachpb.NotLeaseHolderError{}, pErr.GetDetail()) + + // Now transfer the leadership from r1 to r2 and check that r1 can now acquire + // the lease. + + // Initiate the leadership transfer. + r1.mu.Lock() + r1.mu.internalRaftGroup.TransferLeader(uint64(r2.ReplicaID())) + r1.mu.Unlock() + // Run the range through the Raft scheduler, otherwise the leadership messages + // doesn't get sent because the range is quiesced. + store1.EnqueueRaftUpdateCheck(r1.RangeID) + + // Wait for the leadership transfer to happen. + testutils.SucceedsSoon(t, func() error { + if r2.RaftStatus().SoftState.RaftState != raft.StateLeader { + return errors.Newf("r1 not yet leader") + } + return nil + }) + + // Check that r2 can now acquire the lease. + _, pErr = r2.redirectOnOrAcquireLease(ctx) + require.NoError(t, pErr.GoError()) } // TestReplicaGossipFirstRange verifies that the first range gossips its diff --git a/pkg/testutils/serverutils/test_cluster_shim.go b/pkg/testutils/serverutils/test_cluster_shim.go index 6b3ab1ec436d..12db416025ed 100644 --- a/pkg/testutils/serverutils/test_cluster_shim.go +++ b/pkg/testutils/serverutils/test_cluster_shim.go @@ -119,6 +119,11 @@ type TestClusterInterface interface { // ReplicationMode returns the ReplicationMode that the test cluster was // configured with. ReplicationMode() base.TestClusterReplicationMode + + // ScratchRange returns the start key of a span of keyspace suitable for use + // as kv scratch space (it doesn't overlap system spans or SQL tables). The + // range is lazily split off on the first call to ScratchRange. + ScratchRange(t testing.TB) roachpb.Key } // TestClusterFactory encompasses the actual implementation of the shim