Skip to content

Commit

Permalink
Revert "kvserver: improve reproposal assertions and documentation"
Browse files Browse the repository at this point in the history
Reverts cockroachdb#97973.

This reverts commit 866d58a.
  • Loading branch information
tbg committed Mar 13, 2023
1 parent e23a2ea commit 939f955
Show file tree
Hide file tree
Showing 6 changed files with 72 additions and 292 deletions.
156 changes: 9 additions & 147 deletions pkg/kv/kvserver/replica.go
Original file line number Diff line number Diff line change
Expand Up @@ -501,13 +501,9 @@ type Replica struct {
// Instead, the buffer internally holds a reference to mu and will use
// it appropriately.
proposalBuf propBuf

// proposals stores the Raft in-flight commands which originated at this
// Replica, i.e. all commands for which propose has been called, but which
// have not yet applied. A proposal is "pending" until it is "finalized",
// meaning that `finishApplication` has been invoked on the proposal (which
// informs the client that the proposal has now been applied, optionally
// with an error, which may be an AmbiguousResultError).
// proposals stores the Raft in-flight commands which originated at
// this Replica, i.e. all commands for which propose has been called,
// but which have not yet applied.
//
// The *ProposalData in the map are "owned" by it. Elements from the
// map must only be referenced while the Replica.mu is held, except
Expand All @@ -516,146 +512,12 @@ type Replica struct {
// underneath raft. See comments on ProposalData fields for synchronization
// requirements.
//
// Due to Raft reproposals, multiple in-flight Raft entries can have the
// same CmdIDKey. There are two kinds of reproposals:
//
// (1) the exact same entry is handed to raft (possibly despite already being
// present in the log), usually after a timeout[^1].
//
// (2) an existing proposal is updated with a new MaxLeaseIndex and handed to
// raft, i.e. we're intentionally creating a duplicate. This exists because
// for pipelined proposals, the client's goroutine returns without waiting
// for the proposal to apply.[^2][^3] When (2) is carried out, the existing
// copies of the proposal in the log will be "Superseded", see below. Note
// that (2) will only be invoked for proposals that aren't currently in the
// proposals map any more because they're in the middle of being applied;
// as part of (2), they are re-added to the map.
//
// To understand reproposals, we need a broad overview of entry application,
// which is batched (i.e. may process multiple log entries to be applied in
// a batched fashion). In entry application, the following steps are taken:
//
// 1. retrieve all local proposals: iterate through the entries in order,
// and look them up in the proposals map. For each "local" entry (i.e.
// tracked in the map), remove it from the map (unless the proposal
// is not superseded, see below) and attach the value to the entry.
// 2. for each entry:
// - stage written and in-memory effects of the entry (some may apply as no-ops
// if they fail below-raft checks such as the MaxLeaseIndex check)
// - Assuming the MaxLeaseIndex is violated and additional constraints are
// satisfied, carry out (2) from above. On success, we know now that there
// will be a reproposal in the log that can successfully apply. We unbind
// the local proposal (so we don't signal it) and apply the current entry
// as a no-op.
// 3. carry out additional side effects of the entire batch (stats updates etc).
//
// A prerequisite for (2) is that there currently aren't any copies of the proposal
// in the log that may ultimately apply, or we risk doubly applying commands - a
// correctness bug. After (2), any copies of the entry present in the log will have
// a MaxLeaseIndex strictly less than that of the in-memory command, and will be
// Superseded() by it.
//
// We can always safely create an identical copy (i.e. (1)) because of the
// replay protection conferred by the MaxLeaseIndex - all but the first
// proposal (that reach consensus) will be rejected (i.e. apply as a no-op).
//
// Naively, one might hope that by invoking (2) upon applying an entry for
// a command that is rejected due to a MaxLeaseIndex one could achieve the
// invariant that there is only ever one unapplied copy of the entry in the
// log, and then the in-memory proposal could reflect the MaxLeaseIndex
// assigned to this unapplied copy at all times.
//
// Unfortunately, for various reasons, this invariant does not hold:
// - entry application isn't durable, so upon a restart, we might roll
// back to a log position that yet has to catch up over multiple previous
// incarnations of (2), i.e. we will see the same entry multiple times at
// various MaxLeaseIndex values.
// (This technically not a problem, since we're losing the in-memory proposal
// during the restart anyway, but should be kept in mind anyway).
// - Raft proposal forwarding due to (1)-type reproposals could "in
// principle" lead to an old copy of the entry appearing again in the
// unapplied log, at least if we make the reasonable assumption that
// forwarded proposals may arrive at the leader with arbitrary delays.
//
// As a result, we can't "just" invoke (2) when seeing a rejected command,
// we additionally have to verify that there isn't a more recent reproposal
// underway that could apply successfully and supersedes the one we're
// currently looking at.
// So we carry out (2) only if the MaxLeaseIndex of the in-mem proposal matches
// that of the current entry, and update the in-mem MaxLeaseIndex with the result
// of (2) if it did.
//
// An example follows. Consider the following situation (where N is some base
// index not relevant to the example) in which we have one inflight proposal which
// has been triplicated in the log (due to [^1]):
//
// proposals[id] = p{Cmd{MaxLeaseIndex: 100, ...}}
//
// ... (unrelated entries)
// raftlog[N] = Cmd{MaxLeaseIndex: 100, ...}
// ... (unrelated entries)
// raftlog[N+12] = (same as N)
// ... (unrelated entries)
// raftlog[N+15] = (same as N)
//
// where we assume that the `MaxLeaseIndex` 100 is invalid, i.e. when we see
// the first copy of the command being applied, we've already applied some
// command with equal or higher `MaxLeaseIndex`. In a world without
// mechanism (2), `N` would be rejected, and would finalize the proposal
// (i.e. signal the client with an error and remove the entry from
// `proposals`). Later, `N+12` and `N+15` would similarly be rejected (but
// they wouldn't even be regarded as local proposals any more due to not
// being present in `proposals`).
//
// However, (2) exists and it will engage during application of `N`: realizing
// that the current copies of the entry are all going to be rejected, it will
// modify the proposal by assigning a new `MaxLeaseIndex` to it, and handing
// it to `(*Replica).propose` again (which hands it to the proposal buffer,
// which will at some point flush it, leading to re-insertion into the raft
// log and the `proposals` map). The result will be this picture:
//
// proposals[id] = p{Cmd{MaxLeaseIndex: 192, ...}} <-- modified
//
// ... (unrelated entries)
// raftlog[N] = Cmd{MaxLeaseIndex: 100, ...} <-- applied (as no-op)
// ... (unrelated entries)
// raftlog[N+12] = (same as N) (superseded)
// ... (unrelated entries)
// raftlog[N+15] = (same as N) (superseded)
// ... (unrelated entries)
// raftlog[N+18] = Cmd{MaxLeaseIndex: 192, ...} <-- modified
//
// `N+18` might (in fact, is likely to) apply successfully. As a result, when
// we consider `N+12` or `N+15` for application, we must *not* carry out (2)
// again, or we break replay protection. In other words, the `MaxLeaseIndex`
// of the command being applied must be compared with the `MaxLeaseIndex` of
// the command in the proposals map; only if they match do we know that this
// is the most recent (in MaxLeaseIndex order) copy of the command, and only
// then can (2) engage. In addition, an entry that doesn't pass this equality
// check must not signal the proposer and/or unlink from the proposals map (as a
// newer reproposal which might succeed is likely in the log)[^4].
//
// Another way of framing the above is that `proposals[id].Cmd.MaxLeaseIndex`
// actually tracks the maximum `MaxLeaseIndex` of all copies that may be present in
// the log.
//
// If (2) results in an error (for example, since the proposal now fails to
// respect the closed timestamp), that error will finalize the proposal and
// is returned to the client.
//
// [^1]: https://github.com/cockroachdb/cockroach/blob/59ce13b6052a99a0318e3dfe017908ff5630db30/pkg/kv/kvserver/replica_raft.go#L1224
// [^2]: https://github.com/cockroachdb/cockroach/blob/59ce13b6052a99a0318e3dfe017908ff5630db30/pkg/kv/kvserver/replica_application_result.go#L148
// [^3]: it's debatable how useful this below-raft reproposal mechanism is.
// It was introduced in https://github.com/cockroachdb/cockroach/pull/35261,
// and perhaps could be phased out again if we also did
// https://github.com/cockroachdb/cockroach/issues/21849. Historical
// evidence points to https://github.com/cockroachdb/cockroach/issues/28876
// as the motivation for introducing this mechanism, i.e. it was about
// reducing failure rates early in the life of a cluster when raft
// leaderships were being determined. Perhaps we could "simply" disable
// async writes unless leadership was stable instead, by blocking on the
// proposal anyway.
// [^4]: https://github.com/cockroachdb/cockroach/blob/ab6a8650621ae798377f12bbfc1eee2fbec95480/pkg/kv/kvserver/replica_application_decoder.go#L100-L114
// Due to Raft reproposals, multiple in-flight Raft entries can have
// the same CmdIDKey, all corresponding to the same KV request. However,
// not all Raft entries with a given command ID will correspond directly
// to the *RaftCommand contained in its associated *ProposalData. This
// is because the *RaftCommand can be mutated during reproposals by
// Replica.tryReproposeWithNewLeaseIndex.
proposals map[kvserverbase.CmdIDKey]*ProposalData
// Indicates that the replica is in the process of applying log entries.
// Updated to true in handleRaftReady before entries are removed from
Expand Down
3 changes: 1 addition & 2 deletions pkg/kv/kvserver/replica_application_decoder.go
Original file line number Diff line number Diff line change
Expand Up @@ -111,8 +111,7 @@ func (d *replicaDecoder) retrieveLocalProposals(ctx context.Context) (anyLocal b
// criterion. While such proposals can be reproposed, only the first
// instance that gets applied matters and so removing the command is
// always what we want to happen.
!cmd.proposal.Supersedes(cmd.Cmd.MaxLeaseIndex)

cmd.Cmd.MaxLeaseIndex == cmd.proposal.command.MaxLeaseIndex
if shouldRemove {
// Delete the proposal from the proposals map. There may be reproposals
// of the proposal in the pipeline, but those will all have the same max
Expand Down
110 changes: 28 additions & 82 deletions pkg/kv/kvserver/replica_application_result.go
Original file line number Diff line number Diff line change
Expand Up @@ -108,8 +108,7 @@ func (r *Replica) prepareLocalResult(ctx context.Context, cmd *replicatedCmd) {
// new one. This is important for pipelined writes, since they don't
// have a client watching to retry, so a failure to eventually apply
// the proposal would be a user-visible error.
pErr = tryReproposeWithNewLeaseIndex(ctx, cmd, (*replicaReproposer)(r))

pErr = r.tryReproposeWithNewLeaseIndex(ctx, cmd)
if pErr != nil {
// An error from tryReproposeWithNewLeaseIndex implies that the current
// entry is not superseded (i.e. we don't have a reproposal at a higher
Expand All @@ -134,14 +133,6 @@ func (r *Replica) prepareLocalResult(ctx context.Context, cmd *replicatedCmd) {
// at time of writing, through TestReplicaReproposalWithNewLeaseIndexError).
log.Infof(ctx, "failed to repropose %s at idx %d with new lease index: %s", cmd.ID, cmd.Index(), pErr)
cmd.response.Err = pErr

r.mu.RLock()
_, inMap := r.mu.proposals[cmd.ID]
r.mu.RUnlock()

if inMap {
log.Fatalf(ctx, "failed reproposal unexpectedly in proposals map: %+v", cmd)
}
} else {
// Unbind the entry's local proposal because we just succeeded
// in reproposing it and we don't want to acknowledge the client
Expand All @@ -166,77 +157,36 @@ func (r *Replica) prepareLocalResult(ctx context.Context, cmd *replicatedCmd) {
}
}

// reproposer is used by tryReproposeWithNewLeaseIndex.
type reproposer interface {
trackEvaluatingRequest(context.Context, hlc.Timestamp) (hlc.Timestamp, TrackedRequestToken)
propose(context.Context, *ProposalData, TrackedRequestToken) *kvpb.Error
newNotLeaseHolderError(string) *kvpb.NotLeaseHolderError
}

type replicaReproposer Replica

var _ reproposer = (*replicaReproposer)(nil)

func (r *replicaReproposer) trackEvaluatingRequest(
ctx context.Context, wts hlc.Timestamp,
) (hlc.Timestamp, TrackedRequestToken) {
// NB: must not hold r.mu here, the propBuf acquires it itself.
return r.mu.proposalBuf.TrackEvaluatingRequest(ctx, wts)
}

func (r *replicaReproposer) propose(
ctx context.Context, p *ProposalData, tok TrackedRequestToken,
) *kvpb.Error {
return (*Replica)(r).propose(ctx, p, tok)
}

func (r *replicaReproposer) newNotLeaseHolderError(msg string) *kvpb.NotLeaseHolderError {
r.mu.RLock()
defer r.mu.RUnlock()
return kvpb.NewNotLeaseHolderError(
*r.mu.state.Lease,
r.store.StoreID(),
r.mu.state.Desc,
msg,
)
}

// tryReproposeWithNewLeaseIndex is used by prepareLocalResult to repropose
// commands that have gotten an illegal lease index error, and that we know
// could not have applied while their lease index was valid (that is, we
// observed all applied entries between proposal and the lease index becoming
// invalid, as opposed to skipping some of them by applying a snapshot).
//
// Returns a nil error if the command has already been successfully applied or
// has been reproposed here or by a different entry for the same proposal that
// hit an illegal lease index error.
//
// If this returns a nil error once, it will return a nil error for future calls
// as well, assuming that trackEvaluatingRequest returns monotonically increasing
// timestamps across subsequent calls.
func tryReproposeWithNewLeaseIndex(
ctx context.Context, cmd *replicatedCmd, r reproposer,
// It is not intended for use elsewhere and is only a top-level function so that
// it can avoid the below_raft_protos check. Returns a nil error if the command
// has already been successfully applied or has been reproposed here or by a
// different entry for the same proposal that hit an illegal lease index error.
func (r *Replica) tryReproposeWithNewLeaseIndex(
ctx context.Context, cmd *replicatedCmd,
) *kvpb.Error {
// Note that we don't need to validate anything about the proposal's
// lease here - if we got this far, we know that everything but the
// index is valid at this point in the log.
p := cmd.proposal
if p.applied || p.Supersedes(cmd.Cmd.MaxLeaseIndex) {
// If the command associated with this rejected raft entry already applied
// then we don't want to repropose it. Doing so could lead to duplicate
// application of the same proposal. (We can see hit this case if an application
// batch contains multiple copies of the same proposal, in which case they are
// all marked as local, the first one will apply (and set p.applied) and the
// remaining copies will hit this branch).
if p.applied || cmd.Cmd.MaxLeaseIndex != p.command.MaxLeaseIndex {
// If the command associated with this rejected raft entry already
// applied then we don't want to repropose it. Doing so could lead
// to duplicate application of the same proposal.
//
// Similarly, if the proposal associated with this rejected raft entry is
// superseded by a different (larger) MaxLeaseIndex than the one we decoded
// from the entry itself, the command must have already passed through
// tryReproposeWithNewLeaseIndex previously (this can happen if there are
// multiple copies of the command in the logs; see
// TestReplicaRefreshMultiple). We must not create multiple copies with
// multiple lease indexes, so don't repropose it again. This ensures that at
// any time, there is only up to a single lease index that has a chance of
// Similarly, if the command associated with this rejected raft
// entry has a different (larger) MaxLeaseIndex than the one we
// decoded from the entry itself, the command must have already
// been reproposed (this can happen if there are multiple copies
// of the command in the logs; see TestReplicaRefreshMultiple).
// We must not create multiple copies with multiple lease indexes,
// so don't repropose it again. This ensures that at any time,
// there is only up to a single lease index that has a chance of
// succeeding in the Raft log for a given command.
return nil
}
Expand All @@ -245,31 +195,27 @@ func tryReproposeWithNewLeaseIndex(
// it gets reproposed.
// TODO(andrei): Only track if the request consults the ts cache. Some
// requests (e.g. EndTxn) don't care about closed timestamps.
minTS, tok := r.trackEvaluatingRequest(ctx, p.Request.WriteTimestamp())
minTS, tok := r.mu.proposalBuf.TrackEvaluatingRequest(ctx, p.Request.WriteTimestamp())
defer tok.DoneIfNotMoved(ctx)

// NB: p.Request.Timestamp reflects the action of ba.SetActiveTimestamp.
if p.Request.AppliesTimestampCache() && p.Request.WriteTimestamp().LessEq(minTS) {
// The tracker wants us to forward the request timestamp, but we can't
// do that without re-evaluating, so give up. The error returned here
// will go back to DistSender, so send something it can digest.
return kvpb.NewError(r.newNotLeaseHolderError("reproposal failed due to closed timestamp"))
// will go to back to DistSender, so send something it can digest.
err := kvpb.NewNotLeaseHolderError(
*r.mu.state.Lease,
r.store.StoreID(),
r.mu.state.Desc,
"reproposal failed due to closed timestamp",
)
return kvpb.NewError(err)
}
// Some tests check for this log message in the trace.
log.VEventf(ctx, 2, "retry: proposalIllegalLeaseIndex")

// Reset the command for reproposal.
prevMaxLeaseIndex := p.command.MaxLeaseIndex
prevEncodedCommand := p.encodedCommand
p.command.MaxLeaseIndex = 0
p.encodedCommand = nil
pErr := r.propose(ctx, p, tok.Move(ctx))
if pErr != nil {
// On error, reset the fields we zeroed out to their old value.
// This ensures that the proposal doesn't count as Superseded
// now.
p.command.MaxLeaseIndex = prevMaxLeaseIndex
p.encodedCommand = prevEncodedCommand
return pErr
}
log.VEventf(ctx, 2, "reproposed command %x", cmd.ID)
Expand Down
Loading

0 comments on commit 939f955

Please sign in to comment.