diff --git a/pkg/ccl/kvccl/kvfollowerreadsccl/followerreads.go b/pkg/ccl/kvccl/kvfollowerreadsccl/followerreads.go index 63544da1696b..dfe701a72a01 100644 --- a/pkg/ccl/kvccl/kvfollowerreadsccl/followerreads.go +++ b/pkg/ccl/kvccl/kvfollowerreadsccl/followerreads.go @@ -92,12 +92,6 @@ func evalFollowerReadOffset(clusterID uuid.UUID, st *cluster.Settings) (time.Dur return getFollowerReadLag(st), nil } -// batchCanBeEvaluatedOnFollower determines if a batch consists exclusively of -// requests that can be evaluated on a follower replica. -func batchCanBeEvaluatedOnFollower(ba roachpb.BatchRequest) bool { - return ba.Txn != nil && !ba.IsLocking() && ba.IsAllTransactional() -} - // closedTimestampLikelySufficient determines if a request with a given required // frontier timestamp is likely to be below a follower's closed timestamp and // serviceable as a follower read were the request to be sent to a follower @@ -131,7 +125,7 @@ func canSendToFollower( ba roachpb.BatchRequest, ) bool { return checkFollowerReadsEnabled(clusterID, st) && - batchCanBeEvaluatedOnFollower(ba) && + kvserver.BatchCanBeEvaluatedOnFollower(ba) && closedTimestampLikelySufficient(st, clock, ctPolicy, ba.Txn.RequiredFrontier()) } diff --git a/pkg/ccl/kvccl/kvfollowerreadsccl/followerreads_test.go b/pkg/ccl/kvccl/kvfollowerreadsccl/followerreads_test.go index 98bec175ea26..73fde553ed48 100644 --- a/pkg/ccl/kvccl/kvfollowerreadsccl/followerreads_test.go +++ b/pkg/ccl/kvccl/kvfollowerreadsccl/followerreads_test.go @@ -121,6 +121,16 @@ func TestCanSendToFollower(t *testing.T) { ba: batch(txn(stale), &roachpb.PutRequest{}), exp: false, }, + { + name: "stale heartbeat txn", + ba: batch(txn(stale), &roachpb.HeartbeatTxnRequest{}), + exp: false, + }, + { + name: "stale end txn", + ba: batch(txn(stale), &roachpb.EndTxnRequest{}), + exp: false, + }, { name: "stale non-txn request", ba: batch(txn(stale), &roachpb.QueryTxnRequest{}), @@ -170,6 +180,18 @@ func TestCanSendToFollower(t *testing.T) { ctPolicy: roachpb.LEAD_FOR_GLOBAL_READS, exp: false, }, + { + name: "stale heartbeat txn, global reads policy", + ba: batch(txn(stale), &roachpb.HeartbeatTxnRequest{}), + ctPolicy: roachpb.LEAD_FOR_GLOBAL_READS, + exp: false, + }, + { + name: "stale end txn, global reads policy", + ba: batch(txn(stale), &roachpb.EndTxnRequest{}), + ctPolicy: roachpb.LEAD_FOR_GLOBAL_READS, + exp: false, + }, { name: "stale non-txn request, global reads policy", ba: batch(txn(stale), &roachpb.QueryTxnRequest{}), diff --git a/pkg/kv/kvserver/replica.go b/pkg/kv/kvserver/replica.go index 8014cb82bfeb..e602af9193b0 100644 --- a/pkg/kv/kvserver/replica.go +++ b/pkg/kv/kvserver/replica.go @@ -1243,6 +1243,7 @@ func (r *Replica) checkExecutionCanProceed( if !r.canServeFollowerReadRLocked(ctx, ba, err) { return st, err } + err = nil // ignoring error } } diff --git a/pkg/kv/kvserver/replica_follower_read.go b/pkg/kv/kvserver/replica_follower_read.go index ddfea460cc21..cdc35c0b3778 100644 --- a/pkg/kv/kvserver/replica_follower_read.go +++ b/pkg/kv/kvserver/replica_follower_read.go @@ -33,6 +33,25 @@ var FollowerReadsEnabled = settings.RegisterBoolSetting( true, ).WithPublic() +// BatchCanBeEvaluatedOnFollower determines if a batch consists exclusively of +// requests that can be evaluated on a follower replica, given a sufficiently +// advanced closed timestamp. +func BatchCanBeEvaluatedOnFollower(ba roachpb.BatchRequest) bool { + // Explanation of conditions: + // 1. the batch needs to be part of a transaction, because non-transactional + // batches often rely on the server setting their timestamp. If a follower + // with a lagging clock sets their timestamp then they might miss past + // writes served at higher timestamps. + // 2. each request in the batch needs to be "transactional", because those are + // the only ones that have clearly defined semantics when served under the + // closed timestamp. + // 3. the batch needs to be read-only, because a follower replica cannot + // propose writes to Raft. + // 4. the batch needs to be non-locking, because unreplicated locks are only + // held on the leaseholder. + return ba.Txn != nil && ba.IsAllTransactional() && ba.IsReadOnly() && !ba.IsLocking() +} + // canServeFollowerReadRLocked tests, when a range lease could not be acquired, // whether the batch can be served as a follower read despite the error. Only // non-locking, read-only requests can be served as follower reads. The batch @@ -44,7 +63,7 @@ func (r *Replica) canServeFollowerReadRLocked( var lErr *roachpb.NotLeaseHolderError eligible := errors.As(err, &lErr) && lErr.LeaseHolder != nil && lErr.Lease.Type() == roachpb.LeaseEpoch && - (ba.Txn != nil && !ba.IsLocking() && ba.IsAllTransactional()) && // followerreadsccl.batchCanBeEvaluatedOnFollower + BatchCanBeEvaluatedOnFollower(*ba) && FollowerReadsEnabled.Get(&r.store.cfg.Settings.SV) if !eligible { diff --git a/pkg/kv/kvserver/replica_raft.go b/pkg/kv/kvserver/replica_raft.go index ca0b62679e25..686294c3ee75 100644 --- a/pkg/kv/kvserver/replica_raft.go +++ b/pkg/kv/kvserver/replica_raft.go @@ -151,6 +151,10 @@ func (r *Replica) evalAndPropose( // Attach information about the proposer to the command. proposal.command.ProposerLeaseSequence = st.Lease.Sequence + // Perform a sanity check that the lease is owned by this replica. + if !st.Lease.OwnedBy(r.store.StoreID()) && !ba.IsLeaseRequest() { + log.Fatalf(ctx, "cannot propose %s on follower with remotely owned lease %s", ba, st.Lease) + } // Once a command is written to the raft log, it must be loaded into memory // and replayed on all replicas. If a command is too big, stop it here. If