Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

kvserver: prevent finished proposal from being present in proposals map #94825

Merged
merged 2 commits into from
Feb 9, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
33 changes: 24 additions & 9 deletions pkg/kv/kvserver/apply/task.go
Original file line number Diff line number Diff line change
Expand Up @@ -248,15 +248,30 @@ func (t *Task) ApplyCommittedEntries(ctx context.Context) error {

iter := t.dec.NewCommandIter()
for iter.Valid() {
if err := t.applyOneBatch(ctx, iter); err != nil {
// If the batch threw an error, reject all remaining commands in the
// iterator to avoid leaking resources or leaving a proposer hanging.
//
// NOTE: forEachCmdIter closes iter.
if rejectErr := forEachCmdIter(ctx, iter, func(cmd Command, ctx context.Context) error {
return cmd.AckErrAndFinish(ctx, err)
}); rejectErr != nil {
return rejectErr
err := t.applyOneBatch(ctx, iter)
if err != nil {
if errors.Is(err, ErrRemoved) {
// On ErrRemoved, we know that the replica has been destroyed and in
// particular, the Replica's proposals map has already been cleared out.
// But there may be unfinished proposals that are only known to the
// current Task (because we remove proposals we're about to apply from the
// map). To avoid leaking resources and/or leaving proposers hanging,
// finish them here. Note that it is important that we know that the
// proposals map is (and always will be, due to replicaGC setting the
// destroy status) empty at this point, since there is an invariant
// that all proposals in the map are unfinished, and the Task has only
// removed a subset[^1] of the proposals that might be finished below.
// But since it's empty, we can finish them all without having to
// check which ones are no longer in the map.
//
// NOTE: forEachCmdIter closes iter.
//
// [^1]: (*replicaDecoder).retrieveLocalProposals
if rejectErr := forEachCmdIter(ctx, iter, func(cmd Command, ctx context.Context) error {
return cmd.AckErrAndFinish(ctx, err)
}); rejectErr != nil {
return rejectErr
}
}
return err
}
Expand Down
4 changes: 4 additions & 0 deletions pkg/kv/kvserver/replica_proposal_buf.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/raftutil"
"github.com/cockroachdb/cockroach/pkg/roachpb"
"github.com/cockroachdb/cockroach/pkg/settings/cluster"
"github.com/cockroachdb/cockroach/pkg/util/buildutil"
"github.com/cockroachdb/cockroach/pkg/util/errorutil"
"github.com/cockroachdb/cockroach/pkg/util/hlc"
"github.com/cockroachdb/cockroach/pkg/util/log"
Expand Down Expand Up @@ -1187,6 +1188,9 @@ func (rp *replicaProposer) registerProposalLocked(p *ProposalData) {
if p.createdAtTicks == 0 {
p.createdAtTicks = rp.mu.ticks
}
if buildutil.CrdbTestBuild && (p.ec.repl == nil || p.ec.g == nil) {
log.Fatalf(rp.store.AnnotateCtx(context.Background()), "finished proposal inserted into map: %+v", p)
}
rp.mu.proposals[p.idKey] = p
}

Expand Down
18 changes: 18 additions & 0 deletions pkg/kv/kvserver/replica_raft.go
Original file line number Diff line number Diff line change
Expand Up @@ -729,6 +729,24 @@ func (r *Replica) handleRaftReadyRaftMuLocked(
err := r.withRaftGroupLocked(true, func(raftGroup *raft.RawNode) (bool, error) {
r.deliverLocalRaftMsgsRaftMuLockedReplicaMuLocked(ctx, raftGroup)

// NB: we need to have flushed the proposals before each application cycle
// because due to reproposals it is possible to have a proposal that
//
// - is going to be applied in this raft cycle, and
// - is not in the proposals map, and
// - is in the proposals buffer.
//
// The current structure of the code makes sure that by the time we apply the
// entry, the in-mem proposal has moved from the proposal buffer to the proposals
// map. Without this property, we could have the following interleaving:
//
// - proposal is in map (initial state)
// - refreshProposalsLocked adds it to the proposal buffer again
// - proposal applies with an error: removes it from map, finishes proposal
// - proposal buffer flushes, inserts proposal into map
// - we now have a finished proposal in the proposal map, an invariant violation.
//
// See Replica.mu.proposals.
numFlushed, err := r.mu.proposalBuf.FlushLockedWithRaftGroup(ctx, raftGroup)
if err != nil {
return false, err
Expand Down
36 changes: 36 additions & 0 deletions pkg/kv/kvserver/replica_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -7990,6 +7990,24 @@ func TestReplicaRefreshPendingCommandsTicks(t *testing.T) {
t.Fatal(pErr)
}

g, _, pErr := r.concMgr.SequenceReq(ctx, nil /* guard */, concurrency.Request{
Txn: ba.Txn,
Timestamp: ba.Timestamp,
NonTxnPriority: ba.UserPriority,
ReadConsistency: ba.ReadConsistency,
WaitPolicy: ba.WaitPolicy,
LockTimeout: ba.LockTimeout,
Requests: ba.Requests,
LatchSpans: spanset.New(),
LockSpans: spanset.New(),
}, concurrency.PessimisticEval)
require.NoError(t, pErr.GoError())

cmd.ec = endCmds{
repl: r,
g: g,
}

dropProposals.Lock()
dropProposals.m[cmd] = struct{}{} // silently drop proposals
dropProposals.Unlock()
Expand Down Expand Up @@ -8105,6 +8123,19 @@ func TestReplicaRefreshMultiple(t *testing.T) {
ba.Add(inc)
ba.Timestamp = tc.Clock().Now()

g, _, pErr := repl.concMgr.SequenceReq(ctx, nil /* guard */, concurrency.Request{
Txn: ba.Txn,
Timestamp: ba.Timestamp,
NonTxnPriority: ba.UserPriority,
ReadConsistency: ba.ReadConsistency,
WaitPolicy: ba.WaitPolicy,
LockTimeout: ba.LockTimeout,
Requests: ba.Requests,
LatchSpans: spanset.New(),
LockSpans: spanset.New(),
}, concurrency.PessimisticEval)
require.NoError(t, pErr.GoError())

st := repl.CurrentLeaseStatus(ctx)
proposal, pErr := repl.requestToProposal(ctx, incCmdID, ba, allSpansGuard(), &st, uncertainty.Interval{})
if pErr != nil {
Expand All @@ -8113,6 +8144,11 @@ func TestReplicaRefreshMultiple(t *testing.T) {
// Save this channel; it may get reset to nil before we read from it.
proposalDoneCh := proposal.doneCh

proposal.ec = endCmds{
repl: repl,
g: g,
}

repl.mu.Lock()
ai := repl.mu.state.LeaseAppliedIndex
if ai <= 1 {
Expand Down