From c22dc1355880fad0effcab554344605f21142f17 Mon Sep 17 00:00:00 2001 From: Nathan VanBenschoten Date: Mon, 28 Oct 2024 12:39:41 -0400 Subject: [PATCH] kv: redirect requests to raft leader who holds leader lease Informs #132762. This commit updates the lease status logic to redirect requests to the raft leader who holds a leader lease when evaluating a request on a follower. Previously, the follower would return a NotLeaseHolderError, but it would not include the lease in the response, so the client would not be immediately redirected to the leader. Instead, it would continue trying each replica in order, eventually throwing a `sending to all replicas failed` error. Furthermore, since no lease was included, request proxying would never be used. This explains why the `failover/partial/lease-gateway` test was failing to recover. With this change, the test now observes recovery in in 7.650s. To demonstrate that this change works as expected, the commit also ports over two unit test that exercise request proxying to run with leader leases. Without this change, they fail. With it, they pass. Release note: None --- .../kvcoord/dist_sender_server_test.go | 220 ++++++++++-------- pkg/kv/kvserver/kvserverpb/lease_status.proto | 22 +- pkg/kv/kvserver/leases/status.go | 3 - pkg/kv/kvserver/replica_range_lease.go | 23 +- pkg/kv/kvserver/replica_store_liveness.go | 2 + 5 files changed, 165 insertions(+), 105 deletions(-) diff --git a/pkg/kv/kvclient/kvcoord/dist_sender_server_test.go b/pkg/kv/kvclient/kvcoord/dist_sender_server_test.go index c38c36264fe6..d83149539f03 100644 --- a/pkg/kv/kvclient/kvcoord/dist_sender_server_test.go +++ b/pkg/kv/kvclient/kvcoord/dist_sender_server_test.go @@ -4698,16 +4698,26 @@ func TestPartialPartition(t *testing.T) { {false, 3, [][2]roachpb.NodeID{{1, 2}}}, } for _, test := range testCases { - t.Run(fmt.Sprintf("%t-%d", test.useProxy, test.numServers), - func(t *testing.T) { + t.Run(fmt.Sprintf("%t-%d", test.useProxy, test.numServers), func(t *testing.T) { + testutils.RunValues(t, "lease-type", roachpb.LeaseTypes(), func(t *testing.T, leaseType roachpb.LeaseType) { st := cluster.MakeTestingClusterSettings() kvcoord.ProxyBatchRequest.Override(ctx, &st.SV, test.useProxy) - // With epoch leases this test doesn't work reliably. It passes - // in cases where it should fail and fails in cases where it - // should pass. - // TODO(baptist): Attempt to pin the liveness leaseholder to - // node 3 to make epoch leases reliable. - kvserver.ExpirationLeasesOnly.Override(ctx, &st.SV, true) + switch leaseType { + case roachpb.LeaseExpiration: + kvserver.ExpirationLeasesOnly.Override(ctx, &st.SV, true) + case roachpb.LeaseEpoch: + // With epoch leases this test doesn't work reliably. It passes + // in cases where it should fail and fails in cases where it + // should pass. + // TODO(baptist): Attempt to pin the liveness leaseholder to + // node 3 to make epoch leases reliable. + skip.IgnoreLint(t, "flaky with epoch leases") + case roachpb.LeaseLeader: + kvserver.ExpirationLeasesOnly.Override(ctx, &st.SV, false) + kvserver.RaftLeaderFortificationFractionEnabled.Override(ctx, &st.SV, 1.0) + default: + t.Fatalf("unknown lease type %s", leaseType) + } kvserver.RangefeedEnabled.Override(ctx, &st.SV, true) kvserver.RangeFeedRefreshInterval.Override(ctx, &st.SV, 10*time.Millisecond) closedts.TargetDuration.Override(ctx, &st.SV, 10*time.Millisecond) @@ -4801,6 +4811,7 @@ func TestPartialPartition(t *testing.T) { tc.Stopper().Stop(ctx) }) + }) } } @@ -4812,104 +4823,121 @@ func TestProxyTracing(t *testing.T) { defer log.Scope(t).Close(t) ctx := context.Background() - const numServers = 3 - const numRanges = 3 - st := cluster.MakeTestingClusterSettings() - kvserver.ExpirationLeasesOnly.Override(ctx, &st.SV, true) - kvserver.RangefeedEnabled.Override(ctx, &st.SV, true) - kvserver.RangeFeedRefreshInterval.Override(ctx, &st.SV, 10*time.Millisecond) - closedts.TargetDuration.Override(ctx, &st.SV, 10*time.Millisecond) - closedts.SideTransportCloseInterval.Override(ctx, &st.SV, 10*time.Millisecond) - - var p rpc.Partitioner - tc := testcluster.StartTestCluster(t, numServers, base.TestClusterArgs{ - ServerArgsPerNode: func() map[int]base.TestServerArgs { - perNode := make(map[int]base.TestServerArgs) - for i := 0; i < numServers; i++ { - ctk := rpc.ContextTestingKnobs{} - // Partition between n1 and n3. - p.RegisterTestingKnobs(roachpb.NodeID(i+1), [][2]roachpb.NodeID{{1, 3}}, &ctk) - perNode[i] = base.TestServerArgs{ - Settings: st, - Knobs: base.TestingKnobs{ - Server: &server.TestingKnobs{ - ContextTestingKnobs: ctk, + testutils.RunValues(t, "lease-type", roachpb.LeaseTypes(), func(t *testing.T, leaseType roachpb.LeaseType) { + const numServers = 3 + const numRanges = 3 + st := cluster.MakeTestingClusterSettings() + switch leaseType { + case roachpb.LeaseExpiration: + kvserver.ExpirationLeasesOnly.Override(ctx, &st.SV, true) + case roachpb.LeaseEpoch: + // With epoch leases this test doesn't work reliably. It passes + // in cases where it should fail and fails in cases where it + // should pass. + // TODO(baptist): Attempt to pin the liveness leaseholder to + // node 3 to make epoch leases reliable. + skip.IgnoreLint(t, "flaky with epoch leases") + case roachpb.LeaseLeader: + kvserver.ExpirationLeasesOnly.Override(ctx, &st.SV, false) + kvserver.RaftLeaderFortificationFractionEnabled.Override(ctx, &st.SV, 1.0) + default: + t.Fatalf("unknown lease type %s", leaseType) + } + kvserver.RangefeedEnabled.Override(ctx, &st.SV, true) + kvserver.RangeFeedRefreshInterval.Override(ctx, &st.SV, 10*time.Millisecond) + closedts.TargetDuration.Override(ctx, &st.SV, 10*time.Millisecond) + closedts.SideTransportCloseInterval.Override(ctx, &st.SV, 10*time.Millisecond) + + var p rpc.Partitioner + tc := testcluster.StartTestCluster(t, numServers, base.TestClusterArgs{ + ServerArgsPerNode: func() map[int]base.TestServerArgs { + perNode := make(map[int]base.TestServerArgs) + for i := 0; i < numServers; i++ { + ctk := rpc.ContextTestingKnobs{} + // Partition between n1 and n3. + p.RegisterTestingKnobs(roachpb.NodeID(i+1), [][2]roachpb.NodeID{{1, 3}}, &ctk) + perNode[i] = base.TestServerArgs{ + Settings: st, + Knobs: base.TestingKnobs{ + Server: &server.TestingKnobs{ + ContextTestingKnobs: ctk, + }, }, - }, + } } - } - return perNode - }(), - }) - defer tc.Stopper().Stop(ctx) - - // Set up the mapping after the nodes have started and we have their - // addresses. - for i := 0; i < numServers; i++ { - g := tc.Servers[i].StorageLayer().GossipI().(*gossip.Gossip) - addr := g.GetNodeAddr().String() - nodeID := g.NodeID.Get() - p.RegisterNodeAddr(addr, nodeID) - } - - conn := tc.Conns[0] + return perNode + }(), + }) + defer tc.Stopper().Stop(ctx) + + // Set up the mapping after the nodes have started and we have their + // addresses. + for i := 0; i < numServers; i++ { + g := tc.Servers[i].StorageLayer().GossipI().(*gossip.Gossip) + addr := g.GetNodeAddr().String() + nodeID := g.NodeID.Get() + p.RegisterNodeAddr(addr, nodeID) + } - // Create a table and pin the leaseholder replicas to n3. The partition - // between n1 and n3 will lead to re-routing via n2, which we expect captured - // in the trace. - _, err := conn.Exec("CREATE TABLE t (i INT)") - require.NoError(t, err) - _, err = conn.Exec("ALTER TABLE t CONFIGURE ZONE USING num_replicas=3, lease_preferences='[[+dc=dc3]]', constraints='[]'") - require.NoError(t, err) - _, err = conn.Exec( - fmt.Sprintf("INSERT INTO t(i) select generate_series(1,%d)", numRanges-1)) - require.NoError(t, err) - _, err = conn.Exec("ALTER TABLE t SPLIT AT SELECT i FROM t") - require.NoError(t, err) - require.NoError(t, tc.WaitForFullReplication()) + conn := tc.Conns[0] - leaseCount := func(node int) int { - var count int - err := conn.QueryRow(fmt.Sprintf( - "SELECT count(*) FROM [SHOW RANGES FROM TABLE t WITH DETAILS] WHERE lease_holder = %d", node), - ).Scan(&count) + // Create a table and pin the leaseholder replicas to n3. The partition + // between n1 and n3 will lead to re-routing via n2, which we expect captured + // in the trace. + _, err := conn.Exec("CREATE TABLE t (i INT)") require.NoError(t, err) - return count - } + _, err = conn.Exec("ALTER TABLE t CONFIGURE ZONE USING num_replicas=3, lease_preferences='[[+dc=dc3]]', constraints='[]'") + require.NoError(t, err) + _, err = conn.Exec( + fmt.Sprintf("INSERT INTO t(i) select generate_series(1,%d)", numRanges-1)) + require.NoError(t, err) + _, err = conn.Exec("ALTER TABLE t SPLIT AT SELECT i FROM t") + require.NoError(t, err) + require.NoError(t, tc.WaitForFullReplication()) - checkLeaseCount := func(node, expectedLeaseCount int) error { - if count := leaseCount(node); count != expectedLeaseCount { - require.NoError(t, tc.GetFirstStoreFromServer(t, 0). - ForceLeaseQueueProcess()) - return errors.Errorf("expected %d leases on node %d, found %d", - expectedLeaseCount, node, count) + leaseCount := func(node int) int { + var count int + err := conn.QueryRow(fmt.Sprintf( + "SELECT count(*) FROM [SHOW RANGES FROM TABLE t WITH DETAILS] WHERE lease_holder = %d", node), + ).Scan(&count) + require.NoError(t, err) + return count } - return nil - } - // Wait until the leaseholder for the test table ranges are on n3. - testutils.SucceedsSoon(t, func() error { - return checkLeaseCount(3, numRanges) - }) + checkLeaseCount := func(node, expectedLeaseCount int) error { + if count := leaseCount(node); count != expectedLeaseCount { + require.NoError(t, tc.GetFirstStoreFromServer(t, 0). + ForceLeaseQueueProcess()) + return errors.Errorf("expected %d leases on node %d, found %d", + expectedLeaseCount, node, count) + } + return nil + } - p.EnablePartition(true) + // Wait until the leaseholder for the test table ranges are on n3. + testutils.SucceedsSoon(t, func() error { + return checkLeaseCount(3, numRanges) + }) - _, err = conn.Exec("SET TRACING = on; SELECT FROM t where i = 987654321; SET TRACING = off") - require.NoError(t, err) + p.EnablePartition(true) - // Expect the "proxy request complete" message to be in the trace and that it - // comes from the proxy node n2. - var msg, tag, loc string - if err = conn.QueryRowContext(ctx, `SELECT message, tag, location - FROM [SHOW TRACE FOR SESSION] - WHERE message LIKE '%proxy request complete%' - AND location LIKE '%server/node%' - AND tag LIKE '%n2%'`, - ).Scan(&msg, &tag, &loc); err != nil { - if errors.Is(err, gosql.ErrNoRows) { - t.Fatalf("request succeeded without proxying") + _, err = conn.Exec("SET TRACING = on; SELECT FROM t where i = 987654321; SET TRACING = off") + require.NoError(t, err) + + // Expect the "proxy request complete" message to be in the trace and that it + // comes from the proxy node n2. + var msg, tag, loc string + if err = conn.QueryRowContext(ctx, `SELECT message, tag, location + FROM [SHOW TRACE FOR SESSION] + WHERE message LIKE '%proxy request complete%' + AND location LIKE '%server/node%' + AND tag LIKE '%n2%'`, + ).Scan(&msg, &tag, &loc); err != nil { + if errors.Is(err, gosql.ErrNoRows) { + t.Fatalf("request succeeded without proxying") + } + t.Fatal(err) } - t.Fatal(err) - } - t.Logf("found trace event; msg=%s, tag=%s, loc=%s", msg, tag, loc) + t.Logf("found trace event; msg=%s, tag=%s, loc=%s", msg, tag, loc) + }) } diff --git a/pkg/kv/kvserver/kvserverpb/lease_status.proto b/pkg/kv/kvserver/kvserverpb/lease_status.proto index 2248908613c1..4a492d7a31c5 100644 --- a/pkg/kv/kvserver/kvserverpb/lease_status.proto +++ b/pkg/kv/kvserver/kvserverpb/lease_status.proto @@ -14,7 +14,27 @@ import "util/hlc/timestamp.proto"; import "gogoproto/gogo.proto"; enum LeaseState { - // ERROR indicates that the lease can't be used or acquired. + // ERROR indicates that the lease can't be used or acquired. The state is + // not a definitive indication of the lease's validity. Rather, it is an + // indication that the validity is indeterminate; it may be valid or it + // may not be. + // + // The ERROR state is returned in the following cases: + // 1. An epoch lease has a reference to a node liveness record which the + // lease status evaluator is not aware of. This can happen when gossip + // is down and node liveness information is not available for the lease + // holder. In such cases, it would be unsafe to use the lease because + // the evaluator cannot determine the lease's expiration, so it may + // have expired. However, it would also be unsafe to replace the lease, + // because it may still be valid. + // 2. A leader lease is evaluated on a replica that is not the raft leader + // and is not aware of a successor raft leader at a future term (either + // itself or some other replica). In such cases, the lease may be valid + // or it may have expired. The raft leader (+leaseholder) itself would + // be able to tell, but the only way for a follower replica to tell is + // for it to try to become the raft leader, while respecting raft + // fortification rules. In the meantime, it is best for the follower + // replica to redirect any requests to the raft leader (+leaseholder). ERROR = 0; // VALID indicates that the lease is not expired at the current clock // time and can be used to serve a given request. diff --git a/pkg/kv/kvserver/leases/status.go b/pkg/kv/kvserver/leases/status.go index 64cf08c6b9cb..69b340e3cb56 100644 --- a/pkg/kv/kvserver/leases/status.go +++ b/pkg/kv/kvserver/leases/status.go @@ -178,9 +178,6 @@ func Status(ctx context.Context, nl NodeLiveness, i StatusInput) kvserverpb.Leas // to replace it. knownSuccessor := i.RaftStatus.Term > lease.Term && i.RaftStatus.Lead != raft.None if !knownSuccessor { - // TODO(nvanbenschoten): we could introduce a new INDETERMINATE state - // for this case, instead of using ERROR. This would look a bit less - // unexpected. status.State = kvserverpb.LeaseState_ERROR status.ErrInfo = "leader lease is not held locally, cannot determine validity" return status diff --git a/pkg/kv/kvserver/replica_range_lease.go b/pkg/kv/kvserver/replica_range_lease.go index 3e0e2f1b9b07..258315c38752 100644 --- a/pkg/kv/kvserver/replica_range_lease.go +++ b/pkg/kv/kvserver/replica_range_lease.go @@ -1268,12 +1268,25 @@ func (r *Replica) redirectOnOrAcquireLeaseForRequest( msg = "lease state could not be determined" } log.VEventf(ctx, 2, "%s", msg) - // TODO(nvanbenschoten): now that leader leases are going to return an - // ERROR status on follower replicas instead of a VALID status, we will - // hit this path more. Do we need to add the lease to this - // NotLeaseHolder error to ensure fast redirection? + // If the lease state could not be determined as valid or invalid, then + // we return an error to redirect the request to the replica pointed to + // by the lease record. We don't know for sure who the leaseholder is, + // but that replica is still the best bet. + // + // However, we only do this if the lease is not owned by the local store + // who is currently struggling to evaluate the validity of the lease. + // This avoids self-redirection, which might prevent the client from + // trying other replicas. + // + // TODO(nvanbenschoten): this self-redirection case only happens with + // epoch-based leases, so we can remove this logic when we remove that + // lease type. + var holder roachpb.Lease + if !status.Lease.OwnedBy(r.store.StoreID()) { + holder = status.Lease + } return nil, kvserverpb.LeaseStatus{}, false, kvpb.NewError( - kvpb.NewNotLeaseHolderError(roachpb.Lease{}, r.store.StoreID(), r.shMu.state.Desc, msg)) + kvpb.NewNotLeaseHolderError(holder, r.store.StoreID(), r.shMu.state.Desc, msg)) case kvserverpb.LeaseState_VALID, kvserverpb.LeaseState_UNUSABLE: if !status.Lease.OwnedBy(r.store.StoreID()) { diff --git a/pkg/kv/kvserver/replica_store_liveness.go b/pkg/kv/kvserver/replica_store_liveness.go index 7f879265da50..fe0f518f431d 100644 --- a/pkg/kv/kvserver/replica_store_liveness.go +++ b/pkg/kv/kvserver/replica_store_liveness.go @@ -21,6 +21,8 @@ import ( "github.com/cockroachdb/cockroach/pkg/util/log" ) +// RaftLeaderFortificationFractionEnabled controls the fraction of ranges for +// which the raft leader fortification protocol is enabled. var RaftLeaderFortificationFractionEnabled = settings.RegisterFloatSetting( settings.SystemOnly, "kv.raft.leader_fortification.fraction_enabled",