Skip to content

Commit

Permalink
kvserver: hoist early return out of tryReproposeWithNewLeaseIndex
Browse files Browse the repository at this point in the history
This exposes the madness of this control flow for what it is. There is
no behavior change here. We were previously unconditionally overriding
`pErr`, potentially with a `nil`, which seems sketchy but we get away
with it and now we document this explicitly.

Epic: none
Release note: None
  • Loading branch information
tbg committed Feb 24, 2023
1 parent c0b2a17 commit 548b2f3
Showing 1 changed file with 39 additions and 38 deletions.
77 changes: 39 additions & 38 deletions pkg/kv/kvserver/replica_application_result.go
Original file line number Diff line number Diff line change
Expand Up @@ -104,11 +104,40 @@ func (r *Replica) prepareLocalResult(ctx context.Context, cmd *replicatedCmd) {
case kvserverbase.ProposalRejectionPermanent:
cmd.response.Err = pErr
case kvserverbase.ProposalRejectionIllegalLeaseIndex:
// If we failed to apply at the right lease index, try again with a
// new one. This is important for pipelined writes, since they don't
// have a client watching to retry, so a failure to eventually apply
// the proposal would be a user-visible error.
pErr = tryReproposeWithNewLeaseIndex(ctx, cmd, (*replicaReproposer)(r))
// Reset the error as it's now going to be determined by the outcome of
// reproposing (or not).
//
// Note that if pErr remains nil, we will mark the proposal as non-local at
// the end of this block and return, so we're not hitting an NPE near the end
// of this method where we're attempting to reach into `cmd.proposal`.
//
// This control flow is sketchy but it preserves existing behavior
// that would be too risky to change at the time of writing.
//
pErr = nil
// If the command associated with this rejected raft entry already applied
// then we don't want to repropose it. Doing so could lead to duplicate
// application of the same proposal. (We can see hit this case if an application
// batch contains multiple copies of the same proposal, in which case they are
// all marked as local, the first one will apply (and set p.applied) and the
// remaining copies will hit this branch).
//
// Similarly, if the proposal associated with this rejected raft entry is
// superseded by a different (larger) MaxLeaseIndex than the one we decoded
// from the entry itself, the command must have already passed through
// tryReproposeWithNewLeaseIndex previously (this can happen if there are
// multiple copies of the command in the logs; see
// TestReplicaRefreshMultiple). We must not create multiple copies with
// multiple lease indexes, so don't repropose it again. This ensures that at
// any time, there is only up to a single lease index that has a chance of
// succeeding in the Raft log for a given command.
if !cmd.proposal.applied && !cmd.proposal.Supersedes(cmd.Cmd.MaxLeaseIndex) {
// If we failed to apply at the right lease index, try again with a
// new one. This is important for pipelined writes, since they don't
// have a client watching to retry, so a failure to eventually apply
// the proposal would be a user-visible error.
pErr = tryReproposeWithNewLeaseIndex(ctx, cmd, (*replicaReproposer)(r))
}

if pErr != nil {
// An error from tryReproposeWithNewLeaseIndex implies that the current
Expand Down Expand Up @@ -143,9 +172,9 @@ func (r *Replica) prepareLocalResult(ctx context.Context, cmd *replicatedCmd) {
log.Fatalf(ctx, "failed reproposal unexpectedly in proposals map: %+v", cmd)
}
} else {
// Unbind the entry's local proposal because we just succeeded
// in reproposing it and we don't want to acknowledge the client
// yet.
// Unbind the entry's local proposal because we just succeeded in
// reproposing it or decided not to repropose. Either way, we don't want
// to acknowledge the client yet.
cmd.proposal = nil
return
}
Expand Down Expand Up @@ -207,40 +236,12 @@ func (r *replicaReproposer) newNotLeaseHolderError(msg string) *kvpb.NotLeaseHol
// observed all applied entries between proposal and the lease index becoming
// invalid, as opposed to skipping some of them by applying a snapshot).
//
// Returns a nil error if the command has already been successfully applied or
// has been reproposed here or by a different entry for the same proposal that
// hit an illegal lease index error.
//
// If this returns a nil error once, it will return a nil error for future calls
// as well, assuming that trackEvaluatingRequest returns monotonically increasing
// timestamps across subsequent calls.
// The caller must already have checked that the entry is local and not
// superseded, and that it was rejected with an illegal lease index error.
func tryReproposeWithNewLeaseIndex(
ctx context.Context, cmd *replicatedCmd, r reproposer,
) *kvpb.Error {
// Note that we don't need to validate anything about the proposal's
// lease here - if we got this far, we know that everything but the
// index is valid at this point in the log.
p := cmd.proposal
if p.applied || p.Supersedes(cmd.Cmd.MaxLeaseIndex) {
// If the command associated with this rejected raft entry already applied
// then we don't want to repropose it. Doing so could lead to duplicate
// application of the same proposal. (We can see hit this case if an application
// batch contains multiple copies of the same proposal, in which case they are
// all marked as local, the first one will apply (and set p.applied) and the
// remaining copies will hit this branch).
//
// Similarly, if the proposal associated with this rejected raft entry is
// superseded by a different (larger) MaxLeaseIndex than the one we decoded
// from the entry itself, the command must have already passed through
// tryReproposeWithNewLeaseIndex previously (this can happen if there are
// multiple copies of the command in the logs; see
// TestReplicaRefreshMultiple). We must not create multiple copies with
// multiple lease indexes, so don't repropose it again. This ensures that at
// any time, there is only up to a single lease index that has a chance of
// succeeding in the Raft log for a given command.
return nil
}

// We need to track the request again in order to protect its timestamp until
// it gets reproposed.
// TODO(andrei): Only track if the request consults the ts cache. Some
Expand Down

0 comments on commit 548b2f3

Please sign in to comment.