Skip to content

Commit

Permalink
kv: redirect requests to raft leader who holds leader lease
Browse files Browse the repository at this point in the history
Informs cockroachdb#132762.

This commit updates the lease status logic to redirect requests to the raft
leader who holds a leader lease when evaluating a request on a follower.
Previously, the follower would return a NotLeaseHolderError, but it would not
include the lease in the response, so the client would not be immediately
redirected to the leader. Instead, it would continue trying each replica in
order, eventually throwing a `sending to all replicas failed` error.

Furthermore, since no lease was included, request proxying would never be used.
This explains why the `failover/partial/lease-gateway` test was failing to
recover. With this change, the test now observes recovery in in 7.650s.

To demonstrate that this change works as expected, the commit also ports over
two unit test that exercise request proxying to run with leader leases. Without
this change, they fail. With it, they pass.

Release note: None
  • Loading branch information
nvanbenschoten committed Oct 29, 2024
1 parent f759a46 commit c22dc13
Show file tree
Hide file tree
Showing 5 changed files with 165 additions and 105 deletions.
220 changes: 124 additions & 96 deletions pkg/kv/kvclient/kvcoord/dist_sender_server_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -4698,16 +4698,26 @@ func TestPartialPartition(t *testing.T) {
{false, 3, [][2]roachpb.NodeID{{1, 2}}},
}
for _, test := range testCases {
t.Run(fmt.Sprintf("%t-%d", test.useProxy, test.numServers),
func(t *testing.T) {
t.Run(fmt.Sprintf("%t-%d", test.useProxy, test.numServers), func(t *testing.T) {
testutils.RunValues(t, "lease-type", roachpb.LeaseTypes(), func(t *testing.T, leaseType roachpb.LeaseType) {
st := cluster.MakeTestingClusterSettings()
kvcoord.ProxyBatchRequest.Override(ctx, &st.SV, test.useProxy)
// With epoch leases this test doesn't work reliably. It passes
// in cases where it should fail and fails in cases where it
// should pass.
// TODO(baptist): Attempt to pin the liveness leaseholder to
// node 3 to make epoch leases reliable.
kvserver.ExpirationLeasesOnly.Override(ctx, &st.SV, true)
switch leaseType {
case roachpb.LeaseExpiration:
kvserver.ExpirationLeasesOnly.Override(ctx, &st.SV, true)
case roachpb.LeaseEpoch:
// With epoch leases this test doesn't work reliably. It passes
// in cases where it should fail and fails in cases where it
// should pass.
// TODO(baptist): Attempt to pin the liveness leaseholder to
// node 3 to make epoch leases reliable.
skip.IgnoreLint(t, "flaky with epoch leases")
case roachpb.LeaseLeader:
kvserver.ExpirationLeasesOnly.Override(ctx, &st.SV, false)
kvserver.RaftLeaderFortificationFractionEnabled.Override(ctx, &st.SV, 1.0)
default:
t.Fatalf("unknown lease type %s", leaseType)
}
kvserver.RangefeedEnabled.Override(ctx, &st.SV, true)
kvserver.RangeFeedRefreshInterval.Override(ctx, &st.SV, 10*time.Millisecond)
closedts.TargetDuration.Override(ctx, &st.SV, 10*time.Millisecond)
Expand Down Expand Up @@ -4801,6 +4811,7 @@ func TestPartialPartition(t *testing.T) {

tc.Stopper().Stop(ctx)
})
})
}
}

Expand All @@ -4812,104 +4823,121 @@ func TestProxyTracing(t *testing.T) {
defer log.Scope(t).Close(t)
ctx := context.Background()

const numServers = 3
const numRanges = 3
st := cluster.MakeTestingClusterSettings()
kvserver.ExpirationLeasesOnly.Override(ctx, &st.SV, true)
kvserver.RangefeedEnabled.Override(ctx, &st.SV, true)
kvserver.RangeFeedRefreshInterval.Override(ctx, &st.SV, 10*time.Millisecond)
closedts.TargetDuration.Override(ctx, &st.SV, 10*time.Millisecond)
closedts.SideTransportCloseInterval.Override(ctx, &st.SV, 10*time.Millisecond)

var p rpc.Partitioner
tc := testcluster.StartTestCluster(t, numServers, base.TestClusterArgs{
ServerArgsPerNode: func() map[int]base.TestServerArgs {
perNode := make(map[int]base.TestServerArgs)
for i := 0; i < numServers; i++ {
ctk := rpc.ContextTestingKnobs{}
// Partition between n1 and n3.
p.RegisterTestingKnobs(roachpb.NodeID(i+1), [][2]roachpb.NodeID{{1, 3}}, &ctk)
perNode[i] = base.TestServerArgs{
Settings: st,
Knobs: base.TestingKnobs{
Server: &server.TestingKnobs{
ContextTestingKnobs: ctk,
testutils.RunValues(t, "lease-type", roachpb.LeaseTypes(), func(t *testing.T, leaseType roachpb.LeaseType) {
const numServers = 3
const numRanges = 3
st := cluster.MakeTestingClusterSettings()
switch leaseType {
case roachpb.LeaseExpiration:
kvserver.ExpirationLeasesOnly.Override(ctx, &st.SV, true)
case roachpb.LeaseEpoch:
// With epoch leases this test doesn't work reliably. It passes
// in cases where it should fail and fails in cases where it
// should pass.
// TODO(baptist): Attempt to pin the liveness leaseholder to
// node 3 to make epoch leases reliable.
skip.IgnoreLint(t, "flaky with epoch leases")
case roachpb.LeaseLeader:
kvserver.ExpirationLeasesOnly.Override(ctx, &st.SV, false)
kvserver.RaftLeaderFortificationFractionEnabled.Override(ctx, &st.SV, 1.0)
default:
t.Fatalf("unknown lease type %s", leaseType)
}
kvserver.RangefeedEnabled.Override(ctx, &st.SV, true)
kvserver.RangeFeedRefreshInterval.Override(ctx, &st.SV, 10*time.Millisecond)
closedts.TargetDuration.Override(ctx, &st.SV, 10*time.Millisecond)
closedts.SideTransportCloseInterval.Override(ctx, &st.SV, 10*time.Millisecond)

var p rpc.Partitioner
tc := testcluster.StartTestCluster(t, numServers, base.TestClusterArgs{
ServerArgsPerNode: func() map[int]base.TestServerArgs {
perNode := make(map[int]base.TestServerArgs)
for i := 0; i < numServers; i++ {
ctk := rpc.ContextTestingKnobs{}
// Partition between n1 and n3.
p.RegisterTestingKnobs(roachpb.NodeID(i+1), [][2]roachpb.NodeID{{1, 3}}, &ctk)
perNode[i] = base.TestServerArgs{
Settings: st,
Knobs: base.TestingKnobs{
Server: &server.TestingKnobs{
ContextTestingKnobs: ctk,
},
},
},
}
}
}
return perNode
}(),
})
defer tc.Stopper().Stop(ctx)

// Set up the mapping after the nodes have started and we have their
// addresses.
for i := 0; i < numServers; i++ {
g := tc.Servers[i].StorageLayer().GossipI().(*gossip.Gossip)
addr := g.GetNodeAddr().String()
nodeID := g.NodeID.Get()
p.RegisterNodeAddr(addr, nodeID)
}

conn := tc.Conns[0]
return perNode
}(),
})
defer tc.Stopper().Stop(ctx)

// Set up the mapping after the nodes have started and we have their
// addresses.
for i := 0; i < numServers; i++ {
g := tc.Servers[i].StorageLayer().GossipI().(*gossip.Gossip)
addr := g.GetNodeAddr().String()
nodeID := g.NodeID.Get()
p.RegisterNodeAddr(addr, nodeID)
}

// Create a table and pin the leaseholder replicas to n3. The partition
// between n1 and n3 will lead to re-routing via n2, which we expect captured
// in the trace.
_, err := conn.Exec("CREATE TABLE t (i INT)")
require.NoError(t, err)
_, err = conn.Exec("ALTER TABLE t CONFIGURE ZONE USING num_replicas=3, lease_preferences='[[+dc=dc3]]', constraints='[]'")
require.NoError(t, err)
_, err = conn.Exec(
fmt.Sprintf("INSERT INTO t(i) select generate_series(1,%d)", numRanges-1))
require.NoError(t, err)
_, err = conn.Exec("ALTER TABLE t SPLIT AT SELECT i FROM t")
require.NoError(t, err)
require.NoError(t, tc.WaitForFullReplication())
conn := tc.Conns[0]

leaseCount := func(node int) int {
var count int
err := conn.QueryRow(fmt.Sprintf(
"SELECT count(*) FROM [SHOW RANGES FROM TABLE t WITH DETAILS] WHERE lease_holder = %d", node),
).Scan(&count)
// Create a table and pin the leaseholder replicas to n3. The partition
// between n1 and n3 will lead to re-routing via n2, which we expect captured
// in the trace.
_, err := conn.Exec("CREATE TABLE t (i INT)")
require.NoError(t, err)
return count
}
_, err = conn.Exec("ALTER TABLE t CONFIGURE ZONE USING num_replicas=3, lease_preferences='[[+dc=dc3]]', constraints='[]'")
require.NoError(t, err)
_, err = conn.Exec(
fmt.Sprintf("INSERT INTO t(i) select generate_series(1,%d)", numRanges-1))
require.NoError(t, err)
_, err = conn.Exec("ALTER TABLE t SPLIT AT SELECT i FROM t")
require.NoError(t, err)
require.NoError(t, tc.WaitForFullReplication())

checkLeaseCount := func(node, expectedLeaseCount int) error {
if count := leaseCount(node); count != expectedLeaseCount {
require.NoError(t, tc.GetFirstStoreFromServer(t, 0).
ForceLeaseQueueProcess())
return errors.Errorf("expected %d leases on node %d, found %d",
expectedLeaseCount, node, count)
leaseCount := func(node int) int {
var count int
err := conn.QueryRow(fmt.Sprintf(
"SELECT count(*) FROM [SHOW RANGES FROM TABLE t WITH DETAILS] WHERE lease_holder = %d", node),
).Scan(&count)
require.NoError(t, err)
return count
}
return nil
}

// Wait until the leaseholder for the test table ranges are on n3.
testutils.SucceedsSoon(t, func() error {
return checkLeaseCount(3, numRanges)
})
checkLeaseCount := func(node, expectedLeaseCount int) error {
if count := leaseCount(node); count != expectedLeaseCount {
require.NoError(t, tc.GetFirstStoreFromServer(t, 0).
ForceLeaseQueueProcess())
return errors.Errorf("expected %d leases on node %d, found %d",
expectedLeaseCount, node, count)
}
return nil
}

p.EnablePartition(true)
// Wait until the leaseholder for the test table ranges are on n3.
testutils.SucceedsSoon(t, func() error {
return checkLeaseCount(3, numRanges)
})

_, err = conn.Exec("SET TRACING = on; SELECT FROM t where i = 987654321; SET TRACING = off")
require.NoError(t, err)
p.EnablePartition(true)

// Expect the "proxy request complete" message to be in the trace and that it
// comes from the proxy node n2.
var msg, tag, loc string
if err = conn.QueryRowContext(ctx, `SELECT message, tag, location
FROM [SHOW TRACE FOR SESSION]
WHERE message LIKE '%proxy request complete%'
AND location LIKE '%server/node%'
AND tag LIKE '%n2%'`,
).Scan(&msg, &tag, &loc); err != nil {
if errors.Is(err, gosql.ErrNoRows) {
t.Fatalf("request succeeded without proxying")
_, err = conn.Exec("SET TRACING = on; SELECT FROM t where i = 987654321; SET TRACING = off")
require.NoError(t, err)

// Expect the "proxy request complete" message to be in the trace and that it
// comes from the proxy node n2.
var msg, tag, loc string
if err = conn.QueryRowContext(ctx, `SELECT message, tag, location
FROM [SHOW TRACE FOR SESSION]
WHERE message LIKE '%proxy request complete%'
AND location LIKE '%server/node%'
AND tag LIKE '%n2%'`,
).Scan(&msg, &tag, &loc); err != nil {
if errors.Is(err, gosql.ErrNoRows) {
t.Fatalf("request succeeded without proxying")
}
t.Fatal(err)
}
t.Fatal(err)
}
t.Logf("found trace event; msg=%s, tag=%s, loc=%s", msg, tag, loc)
t.Logf("found trace event; msg=%s, tag=%s, loc=%s", msg, tag, loc)
})
}
22 changes: 21 additions & 1 deletion pkg/kv/kvserver/kvserverpb/lease_status.proto
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,27 @@ import "util/hlc/timestamp.proto";
import "gogoproto/gogo.proto";

enum LeaseState {
// ERROR indicates that the lease can't be used or acquired.
// ERROR indicates that the lease can't be used or acquired. The state is
// not a definitive indication of the lease's validity. Rather, it is an
// indication that the validity is indeterminate; it may be valid or it
// may not be.
//
// The ERROR state is returned in the following cases:
// 1. An epoch lease has a reference to a node liveness record which the
// lease status evaluator is not aware of. This can happen when gossip
// is down and node liveness information is not available for the lease
// holder. In such cases, it would be unsafe to use the lease because
// the evaluator cannot determine the lease's expiration, so it may
// have expired. However, it would also be unsafe to replace the lease,
// because it may still be valid.
// 2. A leader lease is evaluated on a replica that is not the raft leader
// and is not aware of a successor raft leader at a future term (either
// itself or some other replica). In such cases, the lease may be valid
// or it may have expired. The raft leader (+leaseholder) itself would
// be able to tell, but the only way for a follower replica to tell is
// for it to try to become the raft leader, while respecting raft
// fortification rules. In the meantime, it is best for the follower
// replica to redirect any requests to the raft leader (+leaseholder).
ERROR = 0;
// VALID indicates that the lease is not expired at the current clock
// time and can be used to serve a given request.
Expand Down
3 changes: 0 additions & 3 deletions pkg/kv/kvserver/leases/status.go
Original file line number Diff line number Diff line change
Expand Up @@ -178,9 +178,6 @@ func Status(ctx context.Context, nl NodeLiveness, i StatusInput) kvserverpb.Leas
// to replace it.
knownSuccessor := i.RaftStatus.Term > lease.Term && i.RaftStatus.Lead != raft.None
if !knownSuccessor {
// TODO(nvanbenschoten): we could introduce a new INDETERMINATE state
// for this case, instead of using ERROR. This would look a bit less
// unexpected.
status.State = kvserverpb.LeaseState_ERROR
status.ErrInfo = "leader lease is not held locally, cannot determine validity"
return status
Expand Down
23 changes: 18 additions & 5 deletions pkg/kv/kvserver/replica_range_lease.go
Original file line number Diff line number Diff line change
Expand Up @@ -1268,12 +1268,25 @@ func (r *Replica) redirectOnOrAcquireLeaseForRequest(
msg = "lease state could not be determined"
}
log.VEventf(ctx, 2, "%s", msg)
// TODO(nvanbenschoten): now that leader leases are going to return an
// ERROR status on follower replicas instead of a VALID status, we will
// hit this path more. Do we need to add the lease to this
// NotLeaseHolder error to ensure fast redirection?
// If the lease state could not be determined as valid or invalid, then
// we return an error to redirect the request to the replica pointed to
// by the lease record. We don't know for sure who the leaseholder is,
// but that replica is still the best bet.
//
// However, we only do this if the lease is not owned by the local store
// who is currently struggling to evaluate the validity of the lease.
// This avoids self-redirection, which might prevent the client from
// trying other replicas.
//
// TODO(nvanbenschoten): this self-redirection case only happens with
// epoch-based leases, so we can remove this logic when we remove that
// lease type.
var holder roachpb.Lease
if !status.Lease.OwnedBy(r.store.StoreID()) {
holder = status.Lease
}
return nil, kvserverpb.LeaseStatus{}, false, kvpb.NewError(
kvpb.NewNotLeaseHolderError(roachpb.Lease{}, r.store.StoreID(), r.shMu.state.Desc, msg))
kvpb.NewNotLeaseHolderError(holder, r.store.StoreID(), r.shMu.state.Desc, msg))

case kvserverpb.LeaseState_VALID, kvserverpb.LeaseState_UNUSABLE:
if !status.Lease.OwnedBy(r.store.StoreID()) {
Expand Down
2 changes: 2 additions & 0 deletions pkg/kv/kvserver/replica_store_liveness.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,8 @@ import (
"github.com/cockroachdb/cockroach/pkg/util/log"
)

// RaftLeaderFortificationFractionEnabled controls the fraction of ranges for
// which the raft leader fortification protocol is enabled.
var RaftLeaderFortificationFractionEnabled = settings.RegisterFloatSetting(
settings.SystemOnly,
"kv.raft.leader_fortification.fraction_enabled",
Expand Down

0 comments on commit c22dc13

Please sign in to comment.