Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

kvserver: revert recent changes to reproposals #98481

Merged
merged 11 commits into from
Mar 14, 2023
Merged
33 changes: 9 additions & 24 deletions pkg/kv/kvserver/apply/task.go
Original file line number Diff line number Diff line change
Expand Up @@ -248,30 +248,15 @@ func (t *Task) ApplyCommittedEntries(ctx context.Context) error {

iter := t.dec.NewCommandIter()
for iter.Valid() {
err := t.applyOneBatch(ctx, iter)
if err != nil {
if errors.Is(err, ErrRemoved) {
// On ErrRemoved, we know that the replica has been destroyed and in
// particular, the Replica's proposals map has already been cleared out.
// But there may be unfinished proposals that are only known to the
// current Task (because we remove proposals we're about to apply from the
// map). To avoid leaking resources and/or leaving proposers hanging,
// finish them here. Note that it is important that we know that the
// proposals map is (and always will be, due to replicaGC setting the
// destroy status) empty at this point, since there is an invariant
// that all proposals in the map are unfinished, and the Task has only
// removed a subset[^1] of the proposals that might be finished below.
// But since it's empty, we can finish them all without having to
// check which ones are no longer in the map.
//
// NOTE: forEachCmdIter closes iter.
//
// [^1]: (*replicaDecoder).retrieveLocalProposals
if rejectErr := forEachCmdIter(ctx, iter, func(cmd Command, ctx context.Context) error {
return cmd.AckErrAndFinish(ctx, err)
}); rejectErr != nil {
return rejectErr
}
if err := t.applyOneBatch(ctx, iter); err != nil {
// If the batch threw an error, reject all remaining commands in the
// iterator to avoid leaking resources or leaving a proposer hanging.
//
// NOTE: forEachCmdIter closes iter.
if rejectErr := forEachCmdIter(ctx, iter, func(cmd Command, ctx context.Context) error {
return cmd.AckErrAndFinish(ctx, err)
}); rejectErr != nil {
return rejectErr
}
return err
}
Expand Down
1 change: 0 additions & 1 deletion pkg/kv/kvserver/replica.go
Original file line number Diff line number Diff line change
Expand Up @@ -501,7 +501,6 @@ type Replica struct {
// Instead, the buffer internally holds a reference to mu and will use
// it appropriately.
proposalBuf propBuf

// proposals stores the Raft in-flight commands which originated at this
// Replica, i.e. all commands for which propose has been called, but which
// have not yet applied. A proposal is "pending" until it is "finalized",
Expand Down
3 changes: 1 addition & 2 deletions pkg/kv/kvserver/replica_application_decoder.go
Original file line number Diff line number Diff line change
Expand Up @@ -111,8 +111,7 @@ func (d *replicaDecoder) retrieveLocalProposals(ctx context.Context) (anyLocal b
// criterion. While such proposals can be reproposed, only the first
// instance that gets applied matters and so removing the command is
// always what we want to happen.
!cmd.proposal.Supersedes(cmd.Cmd.MaxLeaseIndex)

cmd.Cmd.MaxLeaseIndex == cmd.proposal.command.MaxLeaseIndex
if shouldRemove {
// Delete the proposal from the proposals map. There may be reproposals
// of the proposal in the pipeline, but those will all have the same max
Expand Down
152 changes: 61 additions & 91 deletions pkg/kv/kvserver/replica_application_result.go
Original file line number Diff line number Diff line change
Expand Up @@ -105,16 +105,17 @@ func (r *Replica) prepareLocalResult(ctx context.Context, cmd *replicatedCmd) {
cmd.response.Err = pErr
case kvserverbase.ProposalRejectionIllegalLeaseIndex:
// Reset the error as it's now going to be determined by the outcome of
// reproposing (or not).
// reproposing (or not); note that tryReproposeWithNewLeaseIndex will
// return `nil` if the entry is not eligible for reproposals.
//
// Note that if pErr remains nil, we will mark the proposal as non-local at
// the end of this block and return, so we're not hitting an NPE near the end
// of this method where we're attempting to reach into `cmd.proposal`.
// If pErr gets "reset" here as a result, we will mark the proposal as
// non-local at the end of this block and return, so we're not hitting an
// NPE near the end of this method where we're attempting to reach into
// `cmd.proposal`.
//
// This control flow is sketchy but it preserves existing behavior
// that would be too risky to change at the time of writing.
//
pErr = nil
// If we failed to apply at the right lease index, try again with a
// new one. This is important for pipelined writes, since they don't
// have a client watching to retry, so a failure to eventually apply
Expand Down Expand Up @@ -162,10 +163,7 @@ func (r *Replica) prepareLocalResult(ctx context.Context, cmd *replicatedCmd) {
//
// These many possible worlds are a major source of complexity, a
// reduction of which is postponed.
if !cmd.proposal.applied && !cmd.proposal.Supersedes(cmd.Cmd.MaxLeaseIndex) {
pErr = tryReproposeWithNewLeaseIndex(ctx, cmd, (*replicaReproposer)(r))
}

pErr = r.tryReproposeWithNewLeaseIndex(ctx, cmd)
if pErr != nil {
// An error from tryReproposeWithNewLeaseIndex implies that the current
// entry is not superseded (i.e. we don't have a reproposal at a higher
Expand All @@ -183,40 +181,29 @@ func (r *Replica) prepareLocalResult(ctx context.Context, cmd *replicatedCmd) {
// It is thus safe to signal the error back to the client, which is also
// the only sensible choice at this point.
//
// We also know that the proposal is not in the proposals map, since the
// command is local and wasn't superseded, which is the condition in
// retrieveLocalProposals for removing from the map. So we're not leaking
// a map entry here, which we assert against below (and which has coverage,
// at time of writing, through TestReplicaReproposalWithNewLeaseIndexError).
log.Infof(ctx, "failed to repropose %s at idx %d with new lease index: %s", cmd.ID, cmd.Index(), pErr)
cmd.response.Err = pErr

// This assertion can mis-fire if we artificially inject invalid LAIs
// during proposal, see the explanatory comment above. If, in the
// current app batch, we had two identical copies of an entry (which
// maps to a local proposal), and the LAI was stale, then both entries
// would be local. The first could succeed to repropose, which, if the
// propBuf was full, would immediately insert into the proposals map.
// Normally, when we then apply the second entry, it would be superseded
// and not hit the assertion. But, if we injected a stale LAI during
// this last reproposal, we could "accidentally" assign the same LAI
// again. The second entry would then try to repropose again, which is
// fine, but it could bump into the closed timestamp, get an error,
// enter this branch, and then trip the assertion.
// Note that the proposal may or may not be in the proposals map at this
// point. For example, if we artificially inject invalid LAIs during
// proposal, see the explanatory comment above. If, in the current app
// batch, we had two identical copies of an entry (which maps to a local
// proposal), and the LAI was stale, then both entries would be local.
// The first could succeed to repropose, which, if the propBuf was full,
// would immediately insert into the proposals map. Normally, when we
// then apply the second entry, it would be superseded and not hit the
// assertion. But, if we injected a stale LAI during this last
// reproposal, we could "accidentally" assign the same LAI again. The
// second entry would then try to repropose again, which is fine, but it
// could bump into the closed timestamp, get an error, and now we are in
// a situation where a reproposal attempt failed with the proposal being
// present in the map.
//
// For proposed simplifications, see:
// https://github.com/cockroachdb/cockroach/issues/97633
r.mu.RLock()
_, inMap := r.mu.proposals[cmd.ID]
r.mu.RUnlock()

if inMap {
log.Fatalf(ctx, "failed reproposal unexpectedly in proposals map: %+v", cmd)
}
log.Infof(ctx, "failed to repropose %s at idx %d with new lease index: %s", cmd.ID, cmd.Index(), pErr)
cmd.response.Err = pErr
} else {
// Unbind the entry's local proposal because we just succeeded in
// reproposing it or decided not to repropose. Either way, we don't want
// to acknowledge the client yet.
// Unbind the entry's local proposal because we just succeeded
// in reproposing it and we don't want to acknowledge the client
// yet.
cmd.proposal = nil
return
}
Expand All @@ -237,82 +224,65 @@ func (r *Replica) prepareLocalResult(ctx context.Context, cmd *replicatedCmd) {
}
}

// reproposer is used by tryReproposeWithNewLeaseIndex.
type reproposer interface {
trackEvaluatingRequest(context.Context, hlc.Timestamp) (hlc.Timestamp, TrackedRequestToken)
propose(context.Context, *ProposalData, TrackedRequestToken) *kvpb.Error
newNotLeaseHolderError(string) *kvpb.NotLeaseHolderError
}

type replicaReproposer Replica

var _ reproposer = (*replicaReproposer)(nil)

func (r *replicaReproposer) trackEvaluatingRequest(
ctx context.Context, wts hlc.Timestamp,
) (hlc.Timestamp, TrackedRequestToken) {
// NB: must not hold r.mu here, the propBuf acquires it itself.
return r.mu.proposalBuf.TrackEvaluatingRequest(ctx, wts)
}

func (r *replicaReproposer) propose(
ctx context.Context, p *ProposalData, tok TrackedRequestToken,
) *kvpb.Error {
return (*Replica)(r).propose(ctx, p, tok)
}

func (r *replicaReproposer) newNotLeaseHolderError(msg string) *kvpb.NotLeaseHolderError {
r.mu.RLock()
defer r.mu.RUnlock()
return kvpb.NewNotLeaseHolderError(
*r.mu.state.Lease,
r.store.StoreID(),
r.mu.state.Desc,
msg,
)
}

// tryReproposeWithNewLeaseIndex is used by prepareLocalResult to repropose
// commands that have gotten an illegal lease index error, and that we know
// could not have applied while their lease index was valid (that is, we
// observed all applied entries between proposal and the lease index becoming
// invalid, as opposed to skipping some of them by applying a snapshot).
//
// The caller must already have checked that the entry is local and not
// superseded, and that it was rejected with an illegal lease index error.
func tryReproposeWithNewLeaseIndex(
ctx context.Context, cmd *replicatedCmd, r reproposer,
// It is not intended for use elsewhere and is only a top-level function so that
// it can avoid the below_raft_protos check. Returns a nil error if the command
// has already been successfully applied or has been reproposed here or by a
// different entry for the same proposal that hit an illegal lease index error.
func (r *Replica) tryReproposeWithNewLeaseIndex(
ctx context.Context, cmd *replicatedCmd,
) *kvpb.Error {
// Note that we don't need to validate anything about the proposal's
// lease here - if we got this far, we know that everything but the
// index is valid at this point in the log.
p := cmd.proposal
if p.applied || cmd.Cmd.MaxLeaseIndex != p.command.MaxLeaseIndex {
// If the command associated with this rejected raft entry already
// applied then we don't want to repropose it. Doing so could lead
// to duplicate application of the same proposal.
//
// Similarly, if the command associated with this rejected raft
// entry has a different (larger) MaxLeaseIndex than the one we
// decoded from the entry itself, the command must have already
// been reproposed (this can happen if there are multiple copies
// of the command in the logs; see TestReplicaRefreshMultiple).
// We must not create multiple copies with multiple lease indexes,
// so don't repropose it again. This ensures that at any time,
// there is only up to a single lease index that has a chance of
// succeeding in the Raft log for a given command.
return nil
}

// We need to track the request again in order to protect its timestamp until
// it gets reproposed.
// TODO(andrei): Only track if the request consults the ts cache. Some
// requests (e.g. EndTxn) don't care about closed timestamps.
minTS, tok := r.trackEvaluatingRequest(ctx, p.Request.WriteTimestamp())
minTS, tok := r.mu.proposalBuf.TrackEvaluatingRequest(ctx, p.Request.WriteTimestamp())
defer tok.DoneIfNotMoved(ctx)

// NB: p.Request.Timestamp reflects the action of ba.SetActiveTimestamp.
if p.Request.AppliesTimestampCache() && p.Request.WriteTimestamp().LessEq(minTS) {
// The tracker wants us to forward the request timestamp, but we can't
// do that without re-evaluating, so give up. The error returned here
// will go back to DistSender, so send something it can digest.
return kvpb.NewError(r.newNotLeaseHolderError("reproposal failed due to closed timestamp"))
// will go to back to DistSender, so send something it can digest.
err := kvpb.NewNotLeaseHolderError(
*r.mu.state.Lease,
r.store.StoreID(),
r.mu.state.Desc,
"reproposal failed due to closed timestamp",
)
return kvpb.NewError(err)
}
// Some tests check for this log message in the trace.
log.VEventf(ctx, 2, "retry: proposalIllegalLeaseIndex")

// Reset the command for reproposal.
prevMaxLeaseIndex := p.command.MaxLeaseIndex
prevEncodedCommand := p.encodedCommand
p.command.MaxLeaseIndex = 0
p.encodedCommand = nil
pErr := r.propose(ctx, p, tok.Move(ctx))
if pErr != nil {
// On error, reset the fields we zeroed out to their old value.
// This ensures that the proposal doesn't count as Superseded
// now.
p.command.MaxLeaseIndex = prevMaxLeaseIndex
p.encodedCommand = prevEncodedCommand
return pErr
}
log.VEventf(ctx, 2, "reproposed command %x", cmd.ID)
Expand Down
60 changes: 24 additions & 36 deletions pkg/kv/kvserver/replica_application_state_machine.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,6 @@ import (
"github.com/cockroachdb/cockroach/pkg/util/protoutil"
"github.com/cockroachdb/cockroach/pkg/util/timeutil"
"github.com/cockroachdb/errors"
"github.com/cockroachdb/redact"
"github.com/kr/pretty"
"go.etcd.io/raft/v3"
)
Expand Down Expand Up @@ -151,26 +150,6 @@ func (sm *replicaStateMachine) NewBatch() apply.Batch {
return b
}

func formatReplicatedCmd(cmd *replicatedCmd) redact.RedactableString {
var buf redact.StringBuilder
// We need to zero various data structures that would otherwise
// cause panics in `pretty.Sprint`.
var pd ProposalData
if cmd.proposal != nil {
pd = *cmd.proposal
pd.ctx = nil
pd.sp = nil
pd.command.TraceData = nil
pd.quotaAlloc = nil
pd.tok = TrackedRequestToken{}
pd.ec = endCmds{}
}

// NB: this redacts very poorly, but this is considered acceptable for now.
redact.Fprintf(&buf, "cmd:%s\n\nproposal: %s", pretty.Sprint(cmd.ReplicatedCmd), pretty.Sprint(pd))
return buf.RedactableString()
}

// ApplySideEffects implements the apply.StateMachine interface. The method
// handles the third phase of applying a command to the replica state machine.
//
Expand Down Expand Up @@ -244,21 +223,30 @@ func (sm *replicaStateMachine) ApplySideEffects(
sm.r.handleReadWriteLocalEvalResult(ctx, *cmd.localResult)
}

if !cmd.Rejected() {
if cmd.proposal.applied {
// If the command already applied then we shouldn't be "finishing" its
// application again because it should only be able to apply successfully
// once. We expect that when any reproposal for the same command attempts
// to apply it will be rejected by the below raft lease sequence or lease
// index check in checkForcedErr.
log.Fatalf(ctx, "command already applied: %+v; unexpected successful result", cmd)
}
if cmd.proposal.Supersedes(cmd.Cmd.MaxLeaseIndex) {
// If an entry is superseded but it wasn't rejected, something is wrong.
// The superseding reproposal could apply as well, leading to doubly applying
// a command.
log.Fatalf(ctx, "applying superseded proposal: %s", formatReplicatedCmd(cmd))
}
rejected := cmd.Rejected()
higherReproposalsExist := cmd.Cmd.MaxLeaseIndex != cmd.proposal.command.MaxLeaseIndex
if !rejected && higherReproposalsExist {
log.Fatalf(ctx, "finishing proposal with outstanding reproposal at a higher max lease index")
}
if !rejected && cmd.proposal.applied {
// If the command already applied then we shouldn't be "finishing" its
// application again because it should only be able to apply successfully
// once. We expect that when any reproposal for the same command attempts
// to apply it will be rejected by the below raft lease sequence or lease
// index check in checkForcedErr.
log.Fatalf(ctx, "command already applied: %+v; unexpected successful result", cmd)
}
// If any reproposals at a higher MaxLeaseIndex exist we know that they will
// never successfully apply, remove them from the map to avoid future
// reproposals. If there is no command referencing this proposal at a higher
// MaxLeaseIndex then it will already have been removed (see
// shouldRemove in replicaDecoder.retrieveLocalProposals()). It is possible
// that a later command in this batch referred to this proposal but it must
// have failed because it carried the same MaxLeaseIndex.
if higherReproposalsExist {
sm.r.mu.Lock()
delete(sm.r.mu.proposals, cmd.ID)
sm.r.mu.Unlock()
}
cmd.proposal.applied = true
}
Expand Down
14 changes: 0 additions & 14 deletions pkg/kv/kvserver/replica_proposal.go
Original file line number Diff line number Diff line change
Expand Up @@ -127,20 +127,6 @@ type ProposalData struct {
tok TrackedRequestToken
}

// Supersedes takes the MaxLeaseIndex of a RaftCommand obtained from a log
// entry. It returns true if the ProposalData tracks a different MaxIndex,
// implying that the log entry has been reproposed under an updated
// MaxLeaseIndex. This implies that the current log entry will have been
// rejected and should not be reproposed.
//
// Note that some commands such as lease requests (but not transfers) don't use
// MaxLeaseIndex. For these, false will be returned.
//
// See (*Replica).mu.proposals for a detailed explanation of reproposals.
func (proposal *ProposalData) Supersedes(entryMaxLeaseIndex uint64) bool {
return proposal.command.MaxLeaseIndex != entryMaxLeaseIndex
}

// finishApplication is called when a command application has finished. The
// method will be called downstream of Raft if the command required consensus,
// but can be called upstream of Raft if the command did not and was never
Expand Down
Loading