From c5b42a7f363c165f35ff838e77c96be9dee83475 Mon Sep 17 00:00:00 2001 From: Tobias Grieger Date: Mon, 13 Mar 2023 11:38:16 +0100 Subject: [PATCH 01/14] Revert "kvserver: disable assertion 'finished proposal inserted'" Reverts #97606. This reverts commit f95866d3c8df8e0d3ff58369c98dc007066d95df. --- pkg/kv/kvserver/replica_proposal_buf.go | 5 +---- 1 file changed, 1 insertion(+), 4 deletions(-) diff --git a/pkg/kv/kvserver/replica_proposal_buf.go b/pkg/kv/kvserver/replica_proposal_buf.go index ecc7a1f6ca7f..5b477d257bb8 100644 --- a/pkg/kv/kvserver/replica_proposal_buf.go +++ b/pkg/kv/kvserver/replica_proposal_buf.go @@ -1189,10 +1189,7 @@ func (rp *replicaProposer) registerProposalLocked(p *ProposalData) { if p.createdAtTicks == 0 { p.createdAtTicks = rp.mu.ticks } - // TODO(tbg): this assertion fires. Figure out why. See: - // https://github.com/cockroachdb/cockroach/issues/97605 - const enableAssertion = false - if enableAssertion && buildutil.CrdbTestBuild && (p.ec.repl == nil || p.ec.g == nil) { + if buildutil.CrdbTestBuild && (p.ec.repl == nil || p.ec.g == nil) { log.Fatalf(rp.store.AnnotateCtx(context.Background()), "finished proposal inserted into map: %+v", p) } if prev := rp.mu.proposals[p.idKey]; prev != nil && prev != p { From 70894d23900ace36ae2b55c4fafe46236de147de Mon Sep 17 00:00:00 2001 From: Tobias Grieger Date: Mon, 13 Mar 2023 11:50:11 +0100 Subject: [PATCH 02/14] Revert "kvserver: narrow down 'finishing a proposal with outstanding reproposal'" Reverts #97564. This reverts commit 554a5d5e28b205d78242d23bc827e60f8033453c. --- .../replica_application_state_machine.go | 60 ++++++++----------- 1 file changed, 24 insertions(+), 36 deletions(-) diff --git a/pkg/kv/kvserver/replica_application_state_machine.go b/pkg/kv/kvserver/replica_application_state_machine.go index 433840984cb7..baf87b49268f 100644 --- a/pkg/kv/kvserver/replica_application_state_machine.go +++ b/pkg/kv/kvserver/replica_application_state_machine.go @@ -25,7 +25,6 @@ import ( "github.com/cockroachdb/cockroach/pkg/util/protoutil" "github.com/cockroachdb/cockroach/pkg/util/timeutil" "github.com/cockroachdb/errors" - "github.com/cockroachdb/redact" "github.com/kr/pretty" "go.etcd.io/raft/v3" ) @@ -151,26 +150,6 @@ func (sm *replicaStateMachine) NewBatch() apply.Batch { return b } -func formatReplicatedCmd(cmd *replicatedCmd) redact.RedactableString { - var buf redact.StringBuilder - // We need to zero various data structures that would otherwise - // cause panics in `pretty.Sprint`. - var pd ProposalData - if cmd.proposal != nil { - pd = *cmd.proposal - pd.ctx = nil - pd.sp = nil - pd.command.TraceData = nil - pd.quotaAlloc = nil - pd.tok = TrackedRequestToken{} - pd.ec = endCmds{} - } - - // NB: this redacts very poorly, but this is considered acceptable for now. - redact.Fprintf(&buf, "cmd:%s\n\nproposal: %s", pretty.Sprint(cmd.ReplicatedCmd), pretty.Sprint(pd)) - return buf.RedactableString() -} - // ApplySideEffects implements the apply.StateMachine interface. The method // handles the third phase of applying a command to the replica state machine. // @@ -244,21 +223,30 @@ func (sm *replicaStateMachine) ApplySideEffects( sm.r.handleReadWriteLocalEvalResult(ctx, *cmd.localResult) } - if !cmd.Rejected() { - if cmd.proposal.applied { - // If the command already applied then we shouldn't be "finishing" its - // application again because it should only be able to apply successfully - // once. We expect that when any reproposal for the same command attempts - // to apply it will be rejected by the below raft lease sequence or lease - // index check in checkForcedErr. - log.Fatalf(ctx, "command already applied: %+v; unexpected successful result", cmd) - } - if cmd.proposal.Supersedes(cmd.Cmd.MaxLeaseIndex) { - // If an entry is superseded but it wasn't rejected, something is wrong. - // The superseding reproposal could apply as well, leading to doubly applying - // a command. - log.Fatalf(ctx, "applying superseded proposal: %s", formatReplicatedCmd(cmd)) - } + if higherReproposalsExist := cmd.proposal.Supersedes(cmd.Cmd.MaxLeaseIndex); higherReproposalsExist { + // If the command wasn't rejected, we just applied it and no higher + // reproposal must exist (since that one may also apply). + // + // If the command was rejected with ProposalRejectionPermanent, no higher + // reproposal should exist (after all, whoever made that reproposal should + // also have seen a permanent rejection). + // + // If it was rejected with ProposalRejectionIllegalLeaseIndex, then the + // subsequent call to tryReproposeWithNewLeaseIndex[^1] must have returned an + // error (or the proposal would not be IsLocal() now). But that call + // cannot return an error for a proposal that is already superseded + // initially. + // + // [^1]: see (*replicaDecoder).retrieveLocalProposals() + log.Fatalf(ctx, "finishing proposal with outstanding reproposal at a higher max lease index: %+v", cmd) + } + if !cmd.Rejected() && cmd.proposal.applied { + // If the command already applied then we shouldn't be "finishing" its + // application again because it should only be able to apply successfully + // once. We expect that when any reproposal for the same command attempts + // to apply it will be rejected by the below raft lease sequence or lease + // index check in checkForcedErr. + log.Fatalf(ctx, "command already applied: %+v; unexpected successful result", cmd) } cmd.proposal.applied = true } From fcc87423de94178724418226d464ccc65e99ad57 Mon Sep 17 00:00:00 2001 From: Tobias Grieger Date: Mon, 13 Mar 2023 11:50:25 +0100 Subject: [PATCH 03/14] Revert "kvserver: fill gaps in comment near tryReproposeWithNewLeaseIndex" Reverts #97564. This reverts commit 6283099e89196b1043e322e3dbd9238147852bde. --- pkg/kv/kvserver/replica_application_result.go | 50 ++----------------- 1 file changed, 4 insertions(+), 46 deletions(-) diff --git a/pkg/kv/kvserver/replica_application_result.go b/pkg/kv/kvserver/replica_application_result.go index d8b97b555244..a0ad9beca3f4 100644 --- a/pkg/kv/kvserver/replica_application_result.go +++ b/pkg/kv/kvserver/replica_application_result.go @@ -115,11 +115,6 @@ func (r *Replica) prepareLocalResult(ctx context.Context, cmd *replicatedCmd) { // that would be too risky to change at the time of writing. // pErr = nil - // If we failed to apply at the right lease index, try again with a - // new one. This is important for pipelined writes, since they don't - // have a client watching to retry, so a failure to eventually apply - // the proposal would be a user-visible error. - // // If the command associated with this rejected raft entry already applied // then we don't want to repropose it. Doing so could lead to duplicate // application of the same proposal. (We can see hit this case if an application @@ -136,33 +131,11 @@ func (r *Replica) prepareLocalResult(ctx context.Context, cmd *replicatedCmd) { // multiple lease indexes, so don't repropose it again. This ensures that at // any time, there is only up to a single lease index that has a chance of // succeeding in the Raft log for a given command. - // - // Taking a looking glass to the last paragraph, we see that the situation - // is much more subtle. As tryReproposeWithNewLeaseIndex gets to the stage - // where it calls `(*Replica).propose` (i.e. it passed the closed - // timestamp checks), it *resets* `cmd.proposal.MaxLeaseIndex` to zero. - // This means that the proposal immediately supersedes any other copy - // presently in the log, including for the remainder of application of the - // current log entry (since the current entry's LAI is certainly not equal - // to zero). However, the proposal buffer adds another layer of - // possibility on top of this. Typically, the buffer does not flush - // immediately: this only happens at the beginning of the *next* raft - // handling cycle, i.e. the proposal will not re-enter the proposals map - // while the current batches of commands (recall, which may contain an - // arbitrary number of copies of the current command, both with various - // LAIs all but at most one of which are too small) are applied. *But*, - // the proposal buffer has a capacity, and if it does fill up in - // `(*Replica).propose` it will synchronously flush, meaning that a new - // LAI will be assigned to `cmd.proposal.MaxLeaseIndex` AND the command - // will have re-entered the proposals map. So even if we remove the - // "zeroing" of the MaxLeaseIndex prior to proposing, we still have to - // contend with the fact that once `tryReproposeWithNewLeaseIndex` may - // or may not be in the map. (At least it is assigned a new LAI if and - // only if it is back in the map!). - // - // These many possible worlds are a major source of complexity, a - // reduction of which is postponed. if !cmd.proposal.applied && !cmd.proposal.Supersedes(cmd.Cmd.MaxLeaseIndex) { + // If we failed to apply at the right lease index, try again with a + // new one. This is important for pipelined writes, since they don't + // have a client watching to retry, so a failure to eventually apply + // the proposal would be a user-visible error. pErr = tryReproposeWithNewLeaseIndex(ctx, cmd, (*replicaReproposer)(r)) } @@ -191,21 +164,6 @@ func (r *Replica) prepareLocalResult(ctx context.Context, cmd *replicatedCmd) { log.Infof(ctx, "failed to repropose %s at idx %d with new lease index: %s", cmd.ID, cmd.Index(), pErr) cmd.response.Err = pErr - // This assertion can mis-fire if we artificially inject invalid LAIs - // during proposal, see the explanatory comment above. If, in the - // current app batch, we had two identical copies of an entry (which - // maps to a local proposal), and the LAI was stale, then both entries - // would be local. The first could succeed to repropose, which, if the - // propBuf was full, would immediately insert into the proposals map. - // Normally, when we then apply the second entry, it would be superseded - // and not hit the assertion. But, if we injected a stale LAI during - // this last reproposal, we could "accidentally" assign the same LAI - // again. The second entry would then try to repropose again, which is - // fine, but it could bump into the closed timestamp, get an error, - // enter this branch, and then trip the assertion. - // - // For proposed simplifications, see: - // https://github.com/cockroachdb/cockroach/issues/97633 r.mu.RLock() _, inMap := r.mu.proposals[cmd.ID] r.mu.RUnlock() From 9a4438428ec5246cd59340a1573fbc79d13cacb3 Mon Sep 17 00:00:00 2001 From: Tobias Grieger Date: Mon, 13 Mar 2023 11:50:35 +0100 Subject: [PATCH 04/14] Revert "kvserver: hoist early return out of tryReproposeWithNewLeaseIndex" Reverts #97564. This reverts commit 548b2f3552a3324f6c1d524f0fc8a50591907c38. --- pkg/kv/kvserver/replica_application_result.go | 77 +++++++++---------- 1 file changed, 38 insertions(+), 39 deletions(-) diff --git a/pkg/kv/kvserver/replica_application_result.go b/pkg/kv/kvserver/replica_application_result.go index a0ad9beca3f4..21681a1c6fc2 100644 --- a/pkg/kv/kvserver/replica_application_result.go +++ b/pkg/kv/kvserver/replica_application_result.go @@ -104,40 +104,11 @@ func (r *Replica) prepareLocalResult(ctx context.Context, cmd *replicatedCmd) { case kvserverbase.ProposalRejectionPermanent: cmd.response.Err = pErr case kvserverbase.ProposalRejectionIllegalLeaseIndex: - // Reset the error as it's now going to be determined by the outcome of - // reproposing (or not). - // - // Note that if pErr remains nil, we will mark the proposal as non-local at - // the end of this block and return, so we're not hitting an NPE near the end - // of this method where we're attempting to reach into `cmd.proposal`. - // - // This control flow is sketchy but it preserves existing behavior - // that would be too risky to change at the time of writing. - // - pErr = nil - // If the command associated with this rejected raft entry already applied - // then we don't want to repropose it. Doing so could lead to duplicate - // application of the same proposal. (We can see hit this case if an application - // batch contains multiple copies of the same proposal, in which case they are - // all marked as local, the first one will apply (and set p.applied) and the - // remaining copies will hit this branch). - // - // Similarly, if the proposal associated with this rejected raft entry is - // superseded by a different (larger) MaxLeaseIndex than the one we decoded - // from the entry itself, the command must have already passed through - // tryReproposeWithNewLeaseIndex previously (this can happen if there are - // multiple copies of the command in the logs; see - // TestReplicaRefreshMultiple). We must not create multiple copies with - // multiple lease indexes, so don't repropose it again. This ensures that at - // any time, there is only up to a single lease index that has a chance of - // succeeding in the Raft log for a given command. - if !cmd.proposal.applied && !cmd.proposal.Supersedes(cmd.Cmd.MaxLeaseIndex) { - // If we failed to apply at the right lease index, try again with a - // new one. This is important for pipelined writes, since they don't - // have a client watching to retry, so a failure to eventually apply - // the proposal would be a user-visible error. - pErr = tryReproposeWithNewLeaseIndex(ctx, cmd, (*replicaReproposer)(r)) - } + // If we failed to apply at the right lease index, try again with a + // new one. This is important for pipelined writes, since they don't + // have a client watching to retry, so a failure to eventually apply + // the proposal would be a user-visible error. + pErr = tryReproposeWithNewLeaseIndex(ctx, cmd, (*replicaReproposer)(r)) if pErr != nil { // An error from tryReproposeWithNewLeaseIndex implies that the current @@ -172,9 +143,9 @@ func (r *Replica) prepareLocalResult(ctx context.Context, cmd *replicatedCmd) { log.Fatalf(ctx, "failed reproposal unexpectedly in proposals map: %+v", cmd) } } else { - // Unbind the entry's local proposal because we just succeeded in - // reproposing it or decided not to repropose. Either way, we don't want - // to acknowledge the client yet. + // Unbind the entry's local proposal because we just succeeded + // in reproposing it and we don't want to acknowledge the client + // yet. cmd.proposal = nil return } @@ -236,12 +207,40 @@ func (r *replicaReproposer) newNotLeaseHolderError(msg string) *kvpb.NotLeaseHol // observed all applied entries between proposal and the lease index becoming // invalid, as opposed to skipping some of them by applying a snapshot). // -// The caller must already have checked that the entry is local and not -// superseded, and that it was rejected with an illegal lease index error. +// Returns a nil error if the command has already been successfully applied or +// has been reproposed here or by a different entry for the same proposal that +// hit an illegal lease index error. +// +// If this returns a nil error once, it will return a nil error for future calls +// as well, assuming that trackEvaluatingRequest returns monotonically increasing +// timestamps across subsequent calls. func tryReproposeWithNewLeaseIndex( ctx context.Context, cmd *replicatedCmd, r reproposer, ) *kvpb.Error { + // Note that we don't need to validate anything about the proposal's + // lease here - if we got this far, we know that everything but the + // index is valid at this point in the log. p := cmd.proposal + if p.applied || p.Supersedes(cmd.Cmd.MaxLeaseIndex) { + // If the command associated with this rejected raft entry already applied + // then we don't want to repropose it. Doing so could lead to duplicate + // application of the same proposal. (We can see hit this case if an application + // batch contains multiple copies of the same proposal, in which case they are + // all marked as local, the first one will apply (and set p.applied) and the + // remaining copies will hit this branch). + // + // Similarly, if the proposal associated with this rejected raft entry is + // superseded by a different (larger) MaxLeaseIndex than the one we decoded + // from the entry itself, the command must have already passed through + // tryReproposeWithNewLeaseIndex previously (this can happen if there are + // multiple copies of the command in the logs; see + // TestReplicaRefreshMultiple). We must not create multiple copies with + // multiple lease indexes, so don't repropose it again. This ensures that at + // any time, there is only up to a single lease index that has a chance of + // succeeding in the Raft log for a given command. + return nil + } + // We need to track the request again in order to protect its timestamp until // it gets reproposed. // TODO(andrei): Only track if the request consults the ts cache. Some From 500bc8261f3ddd7918b2fd54c417391d5685348c Mon Sep 17 00:00:00 2001 From: Tobias Grieger Date: Mon, 13 Mar 2023 11:51:03 +0100 Subject: [PATCH 05/14] Revert "fixup! kvserver: prevent finished proposal from being present in proposals map" Reverts #94825. This reverts commit ac23f4667f5bb6ef03f95cdff5691d8f1069d140. --- pkg/kv/kvserver/apply/task.go | 2 +- pkg/kv/kvserver/replica_test.go | 16 ++++++++-------- 2 files changed, 9 insertions(+), 9 deletions(-) diff --git a/pkg/kv/kvserver/apply/task.go b/pkg/kv/kvserver/apply/task.go index 76560969c0a9..c15dc31af9d6 100644 --- a/pkg/kv/kvserver/apply/task.go +++ b/pkg/kv/kvserver/apply/task.go @@ -272,8 +272,8 @@ func (t *Task) ApplyCommittedEntries(ctx context.Context) error { }); rejectErr != nil { return rejectErr } + return err } - return err } } iter.Close() diff --git a/pkg/kv/kvserver/replica_test.go b/pkg/kv/kvserver/replica_test.go index e19e1cdcb197..de07e0d8b67a 100644 --- a/pkg/kv/kvserver/replica_test.go +++ b/pkg/kv/kvserver/replica_test.go @@ -8054,6 +8054,14 @@ func TestReplicaRefreshMultiple(t *testing.T) { ba.Add(inc) ba.Timestamp = tc.Clock().Now() + st := repl.CurrentLeaseStatus(ctx) + proposal, pErr := repl.requestToProposal(ctx, incCmdID, ba, allSpansGuard(), &st, uncertainty.Interval{}) + if pErr != nil { + t.Fatal(pErr) + } + // Save this channel; it may get reset to nil before we read from it. + proposalDoneCh := proposal.doneCh + g, _, pErr := repl.concMgr.SequenceReq(ctx, nil /* guard */, concurrency.Request{ Txn: ba.Txn, Timestamp: ba.Timestamp, @@ -8067,14 +8075,6 @@ func TestReplicaRefreshMultiple(t *testing.T) { }, concurrency.PessimisticEval) require.NoError(t, pErr.GoError()) - st := repl.CurrentLeaseStatus(ctx) - proposal, pErr := repl.requestToProposal(ctx, incCmdID, ba, allSpansGuard(), &st, uncertainty.Interval{}) - if pErr != nil { - t.Fatal(pErr) - } - // Save this channel; it may get reset to nil before we read from it. - proposalDoneCh := proposal.doneCh - proposal.ec = endCmds{ repl: repl, g: g, From e23a2ea5ba80bf3a45cf9e8cafc6cbe829d3e0a7 Mon Sep 17 00:00:00 2001 From: Tobias Grieger Date: Mon, 13 Mar 2023 11:54:05 +0100 Subject: [PATCH 06/14] Revert "kvserver: prevent finished proposal from being present in proposals map" Reverts #94825. This reverts commit 15b1c6ae6058c8060eb8afe2110e582106380e96. --- pkg/kv/kvserver/apply/task.go | 35 +++++++----------------- pkg/kv/kvserver/replica_proposal_buf.go | 4 --- pkg/kv/kvserver/replica_raft.go | 18 ------------- pkg/kv/kvserver/replica_test.go | 36 ------------------------- 4 files changed, 10 insertions(+), 83 deletions(-) diff --git a/pkg/kv/kvserver/apply/task.go b/pkg/kv/kvserver/apply/task.go index c15dc31af9d6..989258346681 100644 --- a/pkg/kv/kvserver/apply/task.go +++ b/pkg/kv/kvserver/apply/task.go @@ -248,32 +248,17 @@ func (t *Task) ApplyCommittedEntries(ctx context.Context) error { iter := t.dec.NewCommandIter() for iter.Valid() { - err := t.applyOneBatch(ctx, iter) - if err != nil { - if errors.Is(err, ErrRemoved) { - // On ErrRemoved, we know that the replica has been destroyed and in - // particular, the Replica's proposals map has already been cleared out. - // But there may be unfinished proposals that are only known to the - // current Task (because we remove proposals we're about to apply from the - // map). To avoid leaking resources and/or leaving proposers hanging, - // finish them here. Note that it is important that we know that the - // proposals map is (and always will be, due to replicaGC setting the - // destroy status) empty at this point, since there is an invariant - // that all proposals in the map are unfinished, and the Task has only - // removed a subset[^1] of the proposals that might be finished below. - // But since it's empty, we can finish them all without having to - // check which ones are no longer in the map. - // - // NOTE: forEachCmdIter closes iter. - // - // [^1]: (*replicaDecoder).retrieveLocalProposals - if rejectErr := forEachCmdIter(ctx, iter, func(cmd Command, ctx context.Context) error { - return cmd.AckErrAndFinish(ctx, err) - }); rejectErr != nil { - return rejectErr - } - return err + if err := t.applyOneBatch(ctx, iter); err != nil { + // If the batch threw an error, reject all remaining commands in the + // iterator to avoid leaking resources or leaving a proposer hanging. + // + // NOTE: forEachCmdIter closes iter. + if rejectErr := forEachCmdIter(ctx, iter, func(cmd Command, ctx context.Context) error { + return cmd.AckErrAndFinish(ctx, err) + }); rejectErr != nil { + return rejectErr } + return err } } iter.Close() diff --git a/pkg/kv/kvserver/replica_proposal_buf.go b/pkg/kv/kvserver/replica_proposal_buf.go index 5b477d257bb8..025ced87aa30 100644 --- a/pkg/kv/kvserver/replica_proposal_buf.go +++ b/pkg/kv/kvserver/replica_proposal_buf.go @@ -22,7 +22,6 @@ import ( "github.com/cockroachdb/cockroach/pkg/kv/kvserver/raftutil" "github.com/cockroachdb/cockroach/pkg/roachpb" "github.com/cockroachdb/cockroach/pkg/settings/cluster" - "github.com/cockroachdb/cockroach/pkg/util/buildutil" "github.com/cockroachdb/cockroach/pkg/util/errorutil" "github.com/cockroachdb/cockroach/pkg/util/hlc" "github.com/cockroachdb/cockroach/pkg/util/log" @@ -1189,9 +1188,6 @@ func (rp *replicaProposer) registerProposalLocked(p *ProposalData) { if p.createdAtTicks == 0 { p.createdAtTicks = rp.mu.ticks } - if buildutil.CrdbTestBuild && (p.ec.repl == nil || p.ec.g == nil) { - log.Fatalf(rp.store.AnnotateCtx(context.Background()), "finished proposal inserted into map: %+v", p) - } if prev := rp.mu.proposals[p.idKey]; prev != nil && prev != p { log.Fatalf(rp.store.AnnotateCtx(context.Background()), "two proposals under same ID:\n%+v,\n%+v", prev, p) } diff --git a/pkg/kv/kvserver/replica_raft.go b/pkg/kv/kvserver/replica_raft.go index f67d5a989501..e9c1b16cb02f 100644 --- a/pkg/kv/kvserver/replica_raft.go +++ b/pkg/kv/kvserver/replica_raft.go @@ -743,24 +743,6 @@ func (r *Replica) handleRaftReadyRaftMuLocked( err := r.withRaftGroupLocked(true, func(raftGroup *raft.RawNode) (bool, error) { r.deliverLocalRaftMsgsRaftMuLockedReplicaMuLocked(ctx, raftGroup) - // NB: we need to have flushed the proposals before each application cycle - // because due to reproposals it is possible to have a proposal that - // - // - is going to be applied in this raft cycle, and - // - is not in the proposals map, and - // - is in the proposals buffer. - // - // The current structure of the code makes sure that by the time we apply the - // entry, the in-mem proposal has moved from the proposal buffer to the proposals - // map. Without this property, we could have the following interleaving: - // - // - proposal is in map (initial state) - // - refreshProposalsLocked adds it to the proposal buffer again - // - proposal applies with an error: removes it from map, finishes proposal - // - proposal buffer flushes, inserts proposal into map - // - we now have a finished proposal in the proposal map, an invariant violation. - // - // See Replica.mu.proposals. numFlushed, err := r.mu.proposalBuf.FlushLockedWithRaftGroup(ctx, raftGroup) if err != nil { return false, err diff --git a/pkg/kv/kvserver/replica_test.go b/pkg/kv/kvserver/replica_test.go index de07e0d8b67a..f1f8a5a36720 100644 --- a/pkg/kv/kvserver/replica_test.go +++ b/pkg/kv/kvserver/replica_test.go @@ -7921,24 +7921,6 @@ func TestReplicaRefreshPendingCommandsTicks(t *testing.T) { t.Fatal(pErr) } - g, _, pErr := r.concMgr.SequenceReq(ctx, nil /* guard */, concurrency.Request{ - Txn: ba.Txn, - Timestamp: ba.Timestamp, - NonTxnPriority: ba.UserPriority, - ReadConsistency: ba.ReadConsistency, - WaitPolicy: ba.WaitPolicy, - LockTimeout: ba.LockTimeout, - Requests: ba.Requests, - LatchSpans: spanset.New(), - LockSpans: spanset.New(), - }, concurrency.PessimisticEval) - require.NoError(t, pErr.GoError()) - - cmd.ec = endCmds{ - repl: r, - g: g, - } - dropProposals.Lock() dropProposals.m[cmd] = struct{}{} // silently drop proposals dropProposals.Unlock() @@ -8062,24 +8044,6 @@ func TestReplicaRefreshMultiple(t *testing.T) { // Save this channel; it may get reset to nil before we read from it. proposalDoneCh := proposal.doneCh - g, _, pErr := repl.concMgr.SequenceReq(ctx, nil /* guard */, concurrency.Request{ - Txn: ba.Txn, - Timestamp: ba.Timestamp, - NonTxnPriority: ba.UserPriority, - ReadConsistency: ba.ReadConsistency, - WaitPolicy: ba.WaitPolicy, - LockTimeout: ba.LockTimeout, - Requests: ba.Requests, - LatchSpans: spanset.New(), - LockSpans: spanset.New(), - }, concurrency.PessimisticEval) - require.NoError(t, pErr.GoError()) - - proposal.ec = endCmds{ - repl: repl, - g: g, - } - repl.mu.Lock() ai := repl.mu.state.LeaseAppliedIndex if ai <= 1 { From 939f9552e8c5787506f589094888f52fca667175 Mon Sep 17 00:00:00 2001 From: Tobias Grieger Date: Mon, 13 Mar 2023 11:58:29 +0100 Subject: [PATCH 07/14] Revert "kvserver: improve reproposal assertions and documentation" Reverts #97973. This reverts commit 866d58a62d792f963ff963235406b53b4ccf597f. --- pkg/kv/kvserver/replica.go | 156 +----------------- .../kvserver/replica_application_decoder.go | 3 +- pkg/kv/kvserver/replica_application_result.go | 110 ++++-------- .../replica_application_state_machine.go | 34 ++-- pkg/kv/kvserver/replica_proposal.go | 14 -- pkg/kv/kvserver/replica_raft.go | 47 ++---- 6 files changed, 72 insertions(+), 292 deletions(-) diff --git a/pkg/kv/kvserver/replica.go b/pkg/kv/kvserver/replica.go index 007a9d70d8ac..8f4ff92792e3 100644 --- a/pkg/kv/kvserver/replica.go +++ b/pkg/kv/kvserver/replica.go @@ -501,13 +501,9 @@ type Replica struct { // Instead, the buffer internally holds a reference to mu and will use // it appropriately. proposalBuf propBuf - - // proposals stores the Raft in-flight commands which originated at this - // Replica, i.e. all commands for which propose has been called, but which - // have not yet applied. A proposal is "pending" until it is "finalized", - // meaning that `finishApplication` has been invoked on the proposal (which - // informs the client that the proposal has now been applied, optionally - // with an error, which may be an AmbiguousResultError). + // proposals stores the Raft in-flight commands which originated at + // this Replica, i.e. all commands for which propose has been called, + // but which have not yet applied. // // The *ProposalData in the map are "owned" by it. Elements from the // map must only be referenced while the Replica.mu is held, except @@ -516,146 +512,12 @@ type Replica struct { // underneath raft. See comments on ProposalData fields for synchronization // requirements. // - // Due to Raft reproposals, multiple in-flight Raft entries can have the - // same CmdIDKey. There are two kinds of reproposals: - // - // (1) the exact same entry is handed to raft (possibly despite already being - // present in the log), usually after a timeout[^1]. - // - // (2) an existing proposal is updated with a new MaxLeaseIndex and handed to - // raft, i.e. we're intentionally creating a duplicate. This exists because - // for pipelined proposals, the client's goroutine returns without waiting - // for the proposal to apply.[^2][^3] When (2) is carried out, the existing - // copies of the proposal in the log will be "Superseded", see below. Note - // that (2) will only be invoked for proposals that aren't currently in the - // proposals map any more because they're in the middle of being applied; - // as part of (2), they are re-added to the map. - // - // To understand reproposals, we need a broad overview of entry application, - // which is batched (i.e. may process multiple log entries to be applied in - // a batched fashion). In entry application, the following steps are taken: - // - // 1. retrieve all local proposals: iterate through the entries in order, - // and look them up in the proposals map. For each "local" entry (i.e. - // tracked in the map), remove it from the map (unless the proposal - // is not superseded, see below) and attach the value to the entry. - // 2. for each entry: - // - stage written and in-memory effects of the entry (some may apply as no-ops - // if they fail below-raft checks such as the MaxLeaseIndex check) - // - Assuming the MaxLeaseIndex is violated and additional constraints are - // satisfied, carry out (2) from above. On success, we know now that there - // will be a reproposal in the log that can successfully apply. We unbind - // the local proposal (so we don't signal it) and apply the current entry - // as a no-op. - // 3. carry out additional side effects of the entire batch (stats updates etc). - // - // A prerequisite for (2) is that there currently aren't any copies of the proposal - // in the log that may ultimately apply, or we risk doubly applying commands - a - // correctness bug. After (2), any copies of the entry present in the log will have - // a MaxLeaseIndex strictly less than that of the in-memory command, and will be - // Superseded() by it. - // - // We can always safely create an identical copy (i.e. (1)) because of the - // replay protection conferred by the MaxLeaseIndex - all but the first - // proposal (that reach consensus) will be rejected (i.e. apply as a no-op). - // - // Naively, one might hope that by invoking (2) upon applying an entry for - // a command that is rejected due to a MaxLeaseIndex one could achieve the - // invariant that there is only ever one unapplied copy of the entry in the - // log, and then the in-memory proposal could reflect the MaxLeaseIndex - // assigned to this unapplied copy at all times. - // - // Unfortunately, for various reasons, this invariant does not hold: - // - entry application isn't durable, so upon a restart, we might roll - // back to a log position that yet has to catch up over multiple previous - // incarnations of (2), i.e. we will see the same entry multiple times at - // various MaxLeaseIndex values. - // (This technically not a problem, since we're losing the in-memory proposal - // during the restart anyway, but should be kept in mind anyway). - // - Raft proposal forwarding due to (1)-type reproposals could "in - // principle" lead to an old copy of the entry appearing again in the - // unapplied log, at least if we make the reasonable assumption that - // forwarded proposals may arrive at the leader with arbitrary delays. - // - // As a result, we can't "just" invoke (2) when seeing a rejected command, - // we additionally have to verify that there isn't a more recent reproposal - // underway that could apply successfully and supersedes the one we're - // currently looking at. - // So we carry out (2) only if the MaxLeaseIndex of the in-mem proposal matches - // that of the current entry, and update the in-mem MaxLeaseIndex with the result - // of (2) if it did. - // - // An example follows. Consider the following situation (where N is some base - // index not relevant to the example) in which we have one inflight proposal which - // has been triplicated in the log (due to [^1]): - // - // proposals[id] = p{Cmd{MaxLeaseIndex: 100, ...}} - // - // ... (unrelated entries) - // raftlog[N] = Cmd{MaxLeaseIndex: 100, ...} - // ... (unrelated entries) - // raftlog[N+12] = (same as N) - // ... (unrelated entries) - // raftlog[N+15] = (same as N) - // - // where we assume that the `MaxLeaseIndex` 100 is invalid, i.e. when we see - // the first copy of the command being applied, we've already applied some - // command with equal or higher `MaxLeaseIndex`. In a world without - // mechanism (2), `N` would be rejected, and would finalize the proposal - // (i.e. signal the client with an error and remove the entry from - // `proposals`). Later, `N+12` and `N+15` would similarly be rejected (but - // they wouldn't even be regarded as local proposals any more due to not - // being present in `proposals`). - // - // However, (2) exists and it will engage during application of `N`: realizing - // that the current copies of the entry are all going to be rejected, it will - // modify the proposal by assigning a new `MaxLeaseIndex` to it, and handing - // it to `(*Replica).propose` again (which hands it to the proposal buffer, - // which will at some point flush it, leading to re-insertion into the raft - // log and the `proposals` map). The result will be this picture: - // - // proposals[id] = p{Cmd{MaxLeaseIndex: 192, ...}} <-- modified - // - // ... (unrelated entries) - // raftlog[N] = Cmd{MaxLeaseIndex: 100, ...} <-- applied (as no-op) - // ... (unrelated entries) - // raftlog[N+12] = (same as N) (superseded) - // ... (unrelated entries) - // raftlog[N+15] = (same as N) (superseded) - // ... (unrelated entries) - // raftlog[N+18] = Cmd{MaxLeaseIndex: 192, ...} <-- modified - // - // `N+18` might (in fact, is likely to) apply successfully. As a result, when - // we consider `N+12` or `N+15` for application, we must *not* carry out (2) - // again, or we break replay protection. In other words, the `MaxLeaseIndex` - // of the command being applied must be compared with the `MaxLeaseIndex` of - // the command in the proposals map; only if they match do we know that this - // is the most recent (in MaxLeaseIndex order) copy of the command, and only - // then can (2) engage. In addition, an entry that doesn't pass this equality - // check must not signal the proposer and/or unlink from the proposals map (as a - // newer reproposal which might succeed is likely in the log)[^4]. - // - // Another way of framing the above is that `proposals[id].Cmd.MaxLeaseIndex` - // actually tracks the maximum `MaxLeaseIndex` of all copies that may be present in - // the log. - // - // If (2) results in an error (for example, since the proposal now fails to - // respect the closed timestamp), that error will finalize the proposal and - // is returned to the client. - // - // [^1]: https://github.com/cockroachdb/cockroach/blob/59ce13b6052a99a0318e3dfe017908ff5630db30/pkg/kv/kvserver/replica_raft.go#L1224 - // [^2]: https://github.com/cockroachdb/cockroach/blob/59ce13b6052a99a0318e3dfe017908ff5630db30/pkg/kv/kvserver/replica_application_result.go#L148 - // [^3]: it's debatable how useful this below-raft reproposal mechanism is. - // It was introduced in https://github.com/cockroachdb/cockroach/pull/35261, - // and perhaps could be phased out again if we also did - // https://github.com/cockroachdb/cockroach/issues/21849. Historical - // evidence points to https://github.com/cockroachdb/cockroach/issues/28876 - // as the motivation for introducing this mechanism, i.e. it was about - // reducing failure rates early in the life of a cluster when raft - // leaderships were being determined. Perhaps we could "simply" disable - // async writes unless leadership was stable instead, by blocking on the - // proposal anyway. - // [^4]: https://github.com/cockroachdb/cockroach/blob/ab6a8650621ae798377f12bbfc1eee2fbec95480/pkg/kv/kvserver/replica_application_decoder.go#L100-L114 + // Due to Raft reproposals, multiple in-flight Raft entries can have + // the same CmdIDKey, all corresponding to the same KV request. However, + // not all Raft entries with a given command ID will correspond directly + // to the *RaftCommand contained in its associated *ProposalData. This + // is because the *RaftCommand can be mutated during reproposals by + // Replica.tryReproposeWithNewLeaseIndex. proposals map[kvserverbase.CmdIDKey]*ProposalData // Indicates that the replica is in the process of applying log entries. // Updated to true in handleRaftReady before entries are removed from diff --git a/pkg/kv/kvserver/replica_application_decoder.go b/pkg/kv/kvserver/replica_application_decoder.go index 53fe8401e12e..c4aeafc59a35 100644 --- a/pkg/kv/kvserver/replica_application_decoder.go +++ b/pkg/kv/kvserver/replica_application_decoder.go @@ -111,8 +111,7 @@ func (d *replicaDecoder) retrieveLocalProposals(ctx context.Context) (anyLocal b // criterion. While such proposals can be reproposed, only the first // instance that gets applied matters and so removing the command is // always what we want to happen. - !cmd.proposal.Supersedes(cmd.Cmd.MaxLeaseIndex) - + cmd.Cmd.MaxLeaseIndex == cmd.proposal.command.MaxLeaseIndex if shouldRemove { // Delete the proposal from the proposals map. There may be reproposals // of the proposal in the pipeline, but those will all have the same max diff --git a/pkg/kv/kvserver/replica_application_result.go b/pkg/kv/kvserver/replica_application_result.go index 21681a1c6fc2..c908a24d98c7 100644 --- a/pkg/kv/kvserver/replica_application_result.go +++ b/pkg/kv/kvserver/replica_application_result.go @@ -108,8 +108,7 @@ func (r *Replica) prepareLocalResult(ctx context.Context, cmd *replicatedCmd) { // new one. This is important for pipelined writes, since they don't // have a client watching to retry, so a failure to eventually apply // the proposal would be a user-visible error. - pErr = tryReproposeWithNewLeaseIndex(ctx, cmd, (*replicaReproposer)(r)) - + pErr = r.tryReproposeWithNewLeaseIndex(ctx, cmd) if pErr != nil { // An error from tryReproposeWithNewLeaseIndex implies that the current // entry is not superseded (i.e. we don't have a reproposal at a higher @@ -134,14 +133,6 @@ func (r *Replica) prepareLocalResult(ctx context.Context, cmd *replicatedCmd) { // at time of writing, through TestReplicaReproposalWithNewLeaseIndexError). log.Infof(ctx, "failed to repropose %s at idx %d with new lease index: %s", cmd.ID, cmd.Index(), pErr) cmd.response.Err = pErr - - r.mu.RLock() - _, inMap := r.mu.proposals[cmd.ID] - r.mu.RUnlock() - - if inMap { - log.Fatalf(ctx, "failed reproposal unexpectedly in proposals map: %+v", cmd) - } } else { // Unbind the entry's local proposal because we just succeeded // in reproposing it and we don't want to acknowledge the client @@ -166,77 +157,36 @@ func (r *Replica) prepareLocalResult(ctx context.Context, cmd *replicatedCmd) { } } -// reproposer is used by tryReproposeWithNewLeaseIndex. -type reproposer interface { - trackEvaluatingRequest(context.Context, hlc.Timestamp) (hlc.Timestamp, TrackedRequestToken) - propose(context.Context, *ProposalData, TrackedRequestToken) *kvpb.Error - newNotLeaseHolderError(string) *kvpb.NotLeaseHolderError -} - -type replicaReproposer Replica - -var _ reproposer = (*replicaReproposer)(nil) - -func (r *replicaReproposer) trackEvaluatingRequest( - ctx context.Context, wts hlc.Timestamp, -) (hlc.Timestamp, TrackedRequestToken) { - // NB: must not hold r.mu here, the propBuf acquires it itself. - return r.mu.proposalBuf.TrackEvaluatingRequest(ctx, wts) -} - -func (r *replicaReproposer) propose( - ctx context.Context, p *ProposalData, tok TrackedRequestToken, -) *kvpb.Error { - return (*Replica)(r).propose(ctx, p, tok) -} - -func (r *replicaReproposer) newNotLeaseHolderError(msg string) *kvpb.NotLeaseHolderError { - r.mu.RLock() - defer r.mu.RUnlock() - return kvpb.NewNotLeaseHolderError( - *r.mu.state.Lease, - r.store.StoreID(), - r.mu.state.Desc, - msg, - ) -} - // tryReproposeWithNewLeaseIndex is used by prepareLocalResult to repropose // commands that have gotten an illegal lease index error, and that we know // could not have applied while their lease index was valid (that is, we // observed all applied entries between proposal and the lease index becoming // invalid, as opposed to skipping some of them by applying a snapshot). // -// Returns a nil error if the command has already been successfully applied or -// has been reproposed here or by a different entry for the same proposal that -// hit an illegal lease index error. -// -// If this returns a nil error once, it will return a nil error for future calls -// as well, assuming that trackEvaluatingRequest returns monotonically increasing -// timestamps across subsequent calls. -func tryReproposeWithNewLeaseIndex( - ctx context.Context, cmd *replicatedCmd, r reproposer, +// It is not intended for use elsewhere and is only a top-level function so that +// it can avoid the below_raft_protos check. Returns a nil error if the command +// has already been successfully applied or has been reproposed here or by a +// different entry for the same proposal that hit an illegal lease index error. +func (r *Replica) tryReproposeWithNewLeaseIndex( + ctx context.Context, cmd *replicatedCmd, ) *kvpb.Error { // Note that we don't need to validate anything about the proposal's // lease here - if we got this far, we know that everything but the // index is valid at this point in the log. p := cmd.proposal - if p.applied || p.Supersedes(cmd.Cmd.MaxLeaseIndex) { - // If the command associated with this rejected raft entry already applied - // then we don't want to repropose it. Doing so could lead to duplicate - // application of the same proposal. (We can see hit this case if an application - // batch contains multiple copies of the same proposal, in which case they are - // all marked as local, the first one will apply (and set p.applied) and the - // remaining copies will hit this branch). + if p.applied || cmd.Cmd.MaxLeaseIndex != p.command.MaxLeaseIndex { + // If the command associated with this rejected raft entry already + // applied then we don't want to repropose it. Doing so could lead + // to duplicate application of the same proposal. // - // Similarly, if the proposal associated with this rejected raft entry is - // superseded by a different (larger) MaxLeaseIndex than the one we decoded - // from the entry itself, the command must have already passed through - // tryReproposeWithNewLeaseIndex previously (this can happen if there are - // multiple copies of the command in the logs; see - // TestReplicaRefreshMultiple). We must not create multiple copies with - // multiple lease indexes, so don't repropose it again. This ensures that at - // any time, there is only up to a single lease index that has a chance of + // Similarly, if the command associated with this rejected raft + // entry has a different (larger) MaxLeaseIndex than the one we + // decoded from the entry itself, the command must have already + // been reproposed (this can happen if there are multiple copies + // of the command in the logs; see TestReplicaRefreshMultiple). + // We must not create multiple copies with multiple lease indexes, + // so don't repropose it again. This ensures that at any time, + // there is only up to a single lease index that has a chance of // succeeding in the Raft log for a given command. return nil } @@ -245,31 +195,27 @@ func tryReproposeWithNewLeaseIndex( // it gets reproposed. // TODO(andrei): Only track if the request consults the ts cache. Some // requests (e.g. EndTxn) don't care about closed timestamps. - minTS, tok := r.trackEvaluatingRequest(ctx, p.Request.WriteTimestamp()) + minTS, tok := r.mu.proposalBuf.TrackEvaluatingRequest(ctx, p.Request.WriteTimestamp()) defer tok.DoneIfNotMoved(ctx) // NB: p.Request.Timestamp reflects the action of ba.SetActiveTimestamp. if p.Request.AppliesTimestampCache() && p.Request.WriteTimestamp().LessEq(minTS) { // The tracker wants us to forward the request timestamp, but we can't // do that without re-evaluating, so give up. The error returned here - // will go back to DistSender, so send something it can digest. - return kvpb.NewError(r.newNotLeaseHolderError("reproposal failed due to closed timestamp")) + // will go to back to DistSender, so send something it can digest. + err := kvpb.NewNotLeaseHolderError( + *r.mu.state.Lease, + r.store.StoreID(), + r.mu.state.Desc, + "reproposal failed due to closed timestamp", + ) + return kvpb.NewError(err) } // Some tests check for this log message in the trace. log.VEventf(ctx, 2, "retry: proposalIllegalLeaseIndex") - // Reset the command for reproposal. - prevMaxLeaseIndex := p.command.MaxLeaseIndex - prevEncodedCommand := p.encodedCommand - p.command.MaxLeaseIndex = 0 - p.encodedCommand = nil pErr := r.propose(ctx, p, tok.Move(ctx)) if pErr != nil { - // On error, reset the fields we zeroed out to their old value. - // This ensures that the proposal doesn't count as Superseded - // now. - p.command.MaxLeaseIndex = prevMaxLeaseIndex - p.encodedCommand = prevEncodedCommand return pErr } log.VEventf(ctx, 2, "reproposed command %x", cmd.ID) diff --git a/pkg/kv/kvserver/replica_application_state_machine.go b/pkg/kv/kvserver/replica_application_state_machine.go index baf87b49268f..2615b0cf6cde 100644 --- a/pkg/kv/kvserver/replica_application_state_machine.go +++ b/pkg/kv/kvserver/replica_application_state_machine.go @@ -223,24 +223,12 @@ func (sm *replicaStateMachine) ApplySideEffects( sm.r.handleReadWriteLocalEvalResult(ctx, *cmd.localResult) } - if higherReproposalsExist := cmd.proposal.Supersedes(cmd.Cmd.MaxLeaseIndex); higherReproposalsExist { - // If the command wasn't rejected, we just applied it and no higher - // reproposal must exist (since that one may also apply). - // - // If the command was rejected with ProposalRejectionPermanent, no higher - // reproposal should exist (after all, whoever made that reproposal should - // also have seen a permanent rejection). - // - // If it was rejected with ProposalRejectionIllegalLeaseIndex, then the - // subsequent call to tryReproposeWithNewLeaseIndex[^1] must have returned an - // error (or the proposal would not be IsLocal() now). But that call - // cannot return an error for a proposal that is already superseded - // initially. - // - // [^1]: see (*replicaDecoder).retrieveLocalProposals() - log.Fatalf(ctx, "finishing proposal with outstanding reproposal at a higher max lease index: %+v", cmd) + rejected := cmd.Rejected() + higherReproposalsExist := cmd.Cmd.MaxLeaseIndex != cmd.proposal.command.MaxLeaseIndex + if !rejected && higherReproposalsExist { + log.Fatalf(ctx, "finishing proposal with outstanding reproposal at a higher max lease index") } - if !cmd.Rejected() && cmd.proposal.applied { + if !rejected && cmd.proposal.applied { // If the command already applied then we shouldn't be "finishing" its // application again because it should only be able to apply successfully // once. We expect that when any reproposal for the same command attempts @@ -248,6 +236,18 @@ func (sm *replicaStateMachine) ApplySideEffects( // index check in checkForcedErr. log.Fatalf(ctx, "command already applied: %+v; unexpected successful result", cmd) } + // If any reproposals at a higher MaxLeaseIndex exist we know that they will + // never successfully apply, remove them from the map to avoid future + // reproposals. If there is no command referencing this proposal at a higher + // MaxLeaseIndex then it will already have been removed (see + // shouldRemove in replicaDecoder.retrieveLocalProposals()). It is possible + // that a later command in this batch referred to this proposal but it must + // have failed because it carried the same MaxLeaseIndex. + if higherReproposalsExist { + sm.r.mu.Lock() + delete(sm.r.mu.proposals, cmd.ID) + sm.r.mu.Unlock() + } cmd.proposal.applied = true } return cmd, nil diff --git a/pkg/kv/kvserver/replica_proposal.go b/pkg/kv/kvserver/replica_proposal.go index 4ece23897ff3..3b3084c7816c 100644 --- a/pkg/kv/kvserver/replica_proposal.go +++ b/pkg/kv/kvserver/replica_proposal.go @@ -127,20 +127,6 @@ type ProposalData struct { tok TrackedRequestToken } -// Supersedes takes the MaxLeaseIndex of a RaftCommand obtained from a log -// entry. It returns true if the ProposalData tracks a different MaxIndex, -// implying that the log entry has been reproposed under an updated -// MaxLeaseIndex. This implies that the current log entry will have been -// rejected and should not be reproposed. -// -// Note that some commands such as lease requests (but not transfers) don't use -// MaxLeaseIndex. For these, false will be returned. -// -// See (*Replica).mu.proposals for a detailed explanation of reproposals. -func (proposal *ProposalData) Supersedes(entryMaxLeaseIndex uint64) bool { - return proposal.command.MaxLeaseIndex != entryMaxLeaseIndex -} - // finishApplication is called when a command application has finished. The // method will be called downstream of Raft if the command required consensus, // but can be called upstream of Raft if the command did not and was never diff --git a/pkg/kv/kvserver/replica_raft.go b/pkg/kv/kvserver/replica_raft.go index e9c1b16cb02f..e5fe3e3ec6dd 100644 --- a/pkg/kv/kvserver/replica_raft.go +++ b/pkg/kv/kvserver/replica_raft.go @@ -317,45 +317,32 @@ func (r *Replica) evalAndPropose( // propose encodes a command, starts tracking it, and proposes it to Raft. // -// On success, the method hands ownership of the command over to the Raft -// machinery. After the method returns with a nil error, all access to the -// command must be performed while holding Replica.mu and Replica.raftMu. -// If the method returns with an error, the error is permanent for the -// proposal, that is, the caller must notify the client that the proposal -// failed and the client can retry, making a new proposal in the process. +// The method hands ownership of the command over to the Raft machinery. After +// the method returns, all access to the command must be performed while holding +// Replica.mu and Replica.raftMu. // -// propose takes ownership of the supplied token, even on error; the caller -// should tok.Move() it into this method. It will be used to untrack the request -// once it comes out of the proposal buffer. If the method returns with an error, -// the token is released, since, as explained above, an error is permanent. -// -// The ProposalData must not be reproposed or reused should an error be returned -// from this method. Its MaxLeaseIndex and encodedCommand fields must be empty. -// Reproposals are a rich source of complexity. See the comment on `r.mu.proposals` -// for details. +// propose takes ownership of the supplied token; the caller should tok.Move() +// it into this method. It will be used to untrack the request once it comes out +// of the proposal buffer. func (r *Replica) propose( ctx context.Context, p *ProposalData, tok TrackedRequestToken, ) (pErr *kvpb.Error) { defer tok.DoneIfNotMoved(ctx) - defer func() { - // An error for this method + // If an error occurs reset the command's MaxLeaseIndex to its initial value. + // Failure to propose will propagate to the client. An invariant of this + // package is that proposals which are finished carry a raft command with a + // MaxLeaseIndex equal to the proposal command's max lease index. + defer func(prev uint64) { if pErr != nil { - p.encodedCommand = nil + p.command.MaxLeaseIndex = prev } - }() + }(p.command.MaxLeaseIndex) - if p.command.MaxLeaseIndex > 0 { - // MaxLeaseIndex should not be populated now. It is set only when the proposal buffer - // flushes this proposal into the local raft instance. - return kvpb.NewError(errors.AssertionFailedf("MaxLeaseIndex set: %+v", p)) - } - if p.encodedCommand != nil { - // This indicates someone took an existing proposal and handed it to this method - // again. The caller needs to properly reset the proposal if they're going to do - // that. - return kvpb.NewError(errors.AssertionFailedf("encodedCommand set: %+v", p)) - } + // Make sure the maximum lease index is unset. This field will be set in + // propBuf.Insert and its encoded bytes will be appended to the encoding + // buffer as a MaxLeaseFooter. + p.command.MaxLeaseIndex = 0 // Determine the encoding style for the Raft command. prefix := true From 7b981d6ec59a8f7a6a6176ec81990e96367b981b Mon Sep 17 00:00:00 2001 From: Tobias Grieger Date: Mon, 13 Mar 2023 12:41:48 +0100 Subject: [PATCH 08/14] kvserver: resuscitate comment on `r.mu.proposals` Epic: none Release note: None --- pkg/kv/kvserver/replica.go | 155 ++++++++++++++++++++++++++++++++++--- 1 file changed, 146 insertions(+), 9 deletions(-) diff --git a/pkg/kv/kvserver/replica.go b/pkg/kv/kvserver/replica.go index 8f4ff92792e3..61f519a0589a 100644 --- a/pkg/kv/kvserver/replica.go +++ b/pkg/kv/kvserver/replica.go @@ -501,9 +501,12 @@ type Replica struct { // Instead, the buffer internally holds a reference to mu and will use // it appropriately. proposalBuf propBuf - // proposals stores the Raft in-flight commands which originated at - // this Replica, i.e. all commands for which propose has been called, - // but which have not yet applied. + // proposals stores the Raft in-flight commands which originated at this + // Replica, i.e. all commands for which propose has been called, but which + // have not yet applied. A proposal is "pending" until it is "finalized", + // meaning that `finishApplication` has been invoked on the proposal (which + // informs the client that the proposal has now been applied, optionally + // with an error, which may be an AmbiguousResultError). // // The *ProposalData in the map are "owned" by it. Elements from the // map must only be referenced while the Replica.mu is held, except @@ -512,12 +515,146 @@ type Replica struct { // underneath raft. See comments on ProposalData fields for synchronization // requirements. // - // Due to Raft reproposals, multiple in-flight Raft entries can have - // the same CmdIDKey, all corresponding to the same KV request. However, - // not all Raft entries with a given command ID will correspond directly - // to the *RaftCommand contained in its associated *ProposalData. This - // is because the *RaftCommand can be mutated during reproposals by - // Replica.tryReproposeWithNewLeaseIndex. + // Due to Raft reproposals, multiple in-flight Raft entries can have the + // same CmdIDKey. There are two kinds of reproposals: + // + // (1) the exact same entry is handed to raft (possibly despite already being + // present in the log), usually after a timeout[^1]. + // + // (2) an existing proposal is updated with a new MaxLeaseIndex and handed to + // raft, i.e. we're intentionally creating a duplicate. This exists because + // for pipelined proposals, the client's goroutine returns without waiting + // for the proposal to apply.[^2][^3] When (2) is carried out, the existing + // copies of the proposal in the log will be "Superseded", see below. Note + // that (2) will only be invoked for proposals that aren't currently in the + // proposals map any more because they're in the middle of being applied; + // as part of (2), they are re-added to the map. + // + // To understand reproposals, we need a broad overview of entry application, + // which is batched (i.e. may process multiple log entries to be applied in + // a batched fashion). In entry application, the following steps are taken: + // + // 1. retrieve all local proposals: iterate through the entries in order, + // and look them up in the proposals map. For each "local" entry (i.e. + // tracked in the map), remove it from the map (unless the proposal + // is not superseded, see below) and attach the value to the entry. + // 2. for each entry: + // - stage written and in-memory effects of the entry (some may apply as no-ops + // if they fail below-raft checks such as the MaxLeaseIndex check) + // - Assuming the MaxLeaseIndex is violated and additional constraints are + // satisfied, carry out (2) from above. On success, we know now that there + // will be a reproposal in the log that can successfully apply. We unbind + // the local proposal (so we don't signal it) and apply the current entry + // as a no-op. + // 3. carry out additional side effects of the entire batch (stats updates etc). + // + // A prerequisite for (2) is that there currently aren't any copies of the proposal + // in the log that may ultimately apply, or we risk doubly applying commands - a + // correctness bug. After (2), any copies of the entry present in the log will have + // a MaxLeaseIndex strictly less than that of the in-memory command, and will be + // Superseded() by it. + // + // We can always safely create an identical copy (i.e. (1)) because of the + // replay protection conferred by the MaxLeaseIndex - all but the first + // proposal (that reach consensus) will be rejected (i.e. apply as a no-op). + // + // Naively, one might hope that by invoking (2) upon applying an entry for + // a command that is rejected due to a MaxLeaseIndex one could achieve the + // invariant that there is only ever one unapplied copy of the entry in the + // log, and then the in-memory proposal could reflect the MaxLeaseIndex + // assigned to this unapplied copy at all times. + // + // Unfortunately, for various reasons, this invariant does not hold: + // - entry application isn't durable, so upon a restart, we might roll + // back to a log position that yet has to catch up over multiple previous + // incarnations of (2), i.e. we will see the same entry multiple times at + // various MaxLeaseIndex values. + // (This technically not a problem, since we're losing the in-memory proposal + // during the restart anyway, but should be kept in mind anyway). + // - Raft proposal forwarding due to (1)-type reproposals could "in + // principle" lead to an old copy of the entry appearing again in the + // unapplied log, at least if we make the reasonable assumption that + // forwarded proposals may arrive at the leader with arbitrary delays. + // + // As a result, we can't "just" invoke (2) when seeing a rejected command, + // we additionally have to verify that there isn't a more recent reproposal + // underway that could apply successfully and supersedes the one we're + // currently looking at. + // So we carry out (2) only if the MaxLeaseIndex of the in-mem proposal matches + // that of the current entry, and update the in-mem MaxLeaseIndex with the result + // of (2) if it did. + // + // An example follows. Consider the following situation (where N is some base + // index not relevant to the example) in which we have one inflight proposal which + // has been triplicated in the log (due to [^1]): + // + // proposals[id] = p{Cmd{MaxLeaseIndex: 100, ...}} + // + // ... (unrelated entries) + // raftlog[N] = Cmd{MaxLeaseIndex: 100, ...} + // ... (unrelated entries) + // raftlog[N+12] = (same as N) + // ... (unrelated entries) + // raftlog[N+15] = (same as N) + // + // where we assume that the `MaxLeaseIndex` 100 is invalid, i.e. when we see + // the first copy of the command being applied, we've already applied some + // command with equal or higher `MaxLeaseIndex`. In a world without + // mechanism (2), `N` would be rejected, and would finalize the proposal + // (i.e. signal the client with an error and remove the entry from + // `proposals`). Later, `N+12` and `N+15` would similarly be rejected (but + // they wouldn't even be regarded as local proposals any more due to not + // being present in `proposals`). + // + // However, (2) exists and it will engage during application of `N`: realizing + // that the current copies of the entry are all going to be rejected, it will + // modify the proposal by assigning a new `MaxLeaseIndex` to it, and handing + // it to `(*Replica).propose` again (which hands it to the proposal buffer, + // which will at some point flush it, leading to re-insertion into the raft + // log and the `proposals` map). The result will be this picture: + // + // proposals[id] = p{Cmd{MaxLeaseIndex: 192, ...}} <-- modified + // + // ... (unrelated entries) + // raftlog[N] = Cmd{MaxLeaseIndex: 100, ...} <-- applied (as no-op) + // ... (unrelated entries) + // raftlog[N+12] = (same as N) (superseded) + // ... (unrelated entries) + // raftlog[N+15] = (same as N) (superseded) + // ... (unrelated entries) + // raftlog[N+18] = Cmd{MaxLeaseIndex: 192, ...} <-- modified + // + // `N+18` might (in fact, is likely to) apply successfully. As a result, when + // we consider `N+12` or `N+15` for application, we must *not* carry out (2) + // again, or we break replay protection. In other words, the `MaxLeaseIndex` + // of the command being applied must be compared with the `MaxLeaseIndex` of + // the command in the proposals map; only if they match do we know that this + // is the most recent (in MaxLeaseIndex order) copy of the command, and only + // then can (2) engage. In addition, an entry that doesn't pass this equality + // check must not signal the proposer and/or unlink from the proposals map (as a + // newer reproposal which might succeed is likely in the log)[^4]. + // + // Another way of framing the above is that `proposals[id].Cmd.MaxLeaseIndex` + // actually tracks the maximum `MaxLeaseIndex` of all copies that may be present in + // the log. + // + // If (2) results in an error (for example, since the proposal now fails to + // respect the closed timestamp), that error will finalize the proposal and + // is returned to the client. + // + // [^1]: https://github.com/cockroachdb/cockroach/blob/59ce13b6052a99a0318e3dfe017908ff5630db30/pkg/kv/kvserver/replica_raft.go#L1224 + // [^2]: https://github.com/cockroachdb/cockroach/blob/59ce13b6052a99a0318e3dfe017908ff5630db30/pkg/kv/kvserver/replica_application_result.go#L148 + // [^3]: it's debatable how useful this below-raft reproposal mechanism is. + // It was introduced in https://github.com/cockroachdb/cockroach/pull/35261, + // and perhaps could be phased out again if we also did + // https://github.com/cockroachdb/cockroach/issues/21849. Historical + // evidence points to https://github.com/cockroachdb/cockroach/issues/28876 + // as the motivation for introducing this mechanism, i.e. it was about + // reducing failure rates early in the life of a cluster when raft + // leaderships were being determined. Perhaps we could "simply" disable + // async writes unless leadership was stable instead, by blocking on the + // proposal anyway. + // [^4]: https://github.com/cockroachdb/cockroach/blob/ab6a8650621ae798377f12bbfc1eee2fbec95480/pkg/kv/kvserver/replica_application_decoder.go#L100-L114 proposals map[kvserverbase.CmdIDKey]*ProposalData // Indicates that the replica is in the process of applying log entries. // Updated to true in handleRaftReady before entries are removed from From 93a59b4190016f6c5d981736210d08ae514815aa Mon Sep 17 00:00:00 2001 From: Tobias Grieger Date: Mon, 13 Mar 2023 12:46:53 +0100 Subject: [PATCH 09/14] kvserver: resuscitate comments in prepareLocalResult Epic: none Release note: None --- pkg/kv/kvserver/replica_application_result.go | 77 +++++++++++++++++-- 1 file changed, 72 insertions(+), 5 deletions(-) diff --git a/pkg/kv/kvserver/replica_application_result.go b/pkg/kv/kvserver/replica_application_result.go index c908a24d98c7..3ab270040d2a 100644 --- a/pkg/kv/kvserver/replica_application_result.go +++ b/pkg/kv/kvserver/replica_application_result.go @@ -104,10 +104,65 @@ func (r *Replica) prepareLocalResult(ctx context.Context, cmd *replicatedCmd) { case kvserverbase.ProposalRejectionPermanent: cmd.response.Err = pErr case kvserverbase.ProposalRejectionIllegalLeaseIndex: + // Reset the error as it's now going to be determined by the outcome of + // reproposing (or not); note that tryReproposeWithNewLeaseIndex will + // return `nil` if the entry is not eligible for reproposals. + // + // If pErr gets "reset" here as a result, we will mark the proposal as + // non-local at the end of this block and return, so we're not hitting an + // NPE near the end of this method where we're attempting to reach into + // `cmd.proposal`. + // + // This control flow is sketchy but it preserves existing behavior + // that would be too risky to change at the time of writing. + // // If we failed to apply at the right lease index, try again with a // new one. This is important for pipelined writes, since they don't // have a client watching to retry, so a failure to eventually apply // the proposal would be a user-visible error. + // + // If the command associated with this rejected raft entry already applied + // then we don't want to repropose it. Doing so could lead to duplicate + // application of the same proposal. (We can see hit this case if an application + // batch contains multiple copies of the same proposal, in which case they are + // all marked as local, the first one will apply (and set p.applied) and the + // remaining copies will hit this branch). + // + // Similarly, if the proposal associated with this rejected raft entry is + // superseded by a different (larger) MaxLeaseIndex than the one we decoded + // from the entry itself, the command must have already passed through + // tryReproposeWithNewLeaseIndex previously (this can happen if there are + // multiple copies of the command in the logs; see + // TestReplicaRefreshMultiple). We must not create multiple copies with + // multiple lease indexes, so don't repropose it again. This ensures that at + // any time, there is only up to a single lease index that has a chance of + // succeeding in the Raft log for a given command. + // + // Taking a looking glass to the last paragraph, we see that the situation + // is much more subtle. As tryReproposeWithNewLeaseIndex gets to the stage + // where it calls `(*Replica).propose` (i.e. it passed the closed + // timestamp checks), it *resets* `cmd.proposal.MaxLeaseIndex` to zero. + // This means that the proposal immediately supersedes any other copy + // presently in the log, including for the remainder of application of the + // current log entry (since the current entry's LAI is certainly not equal + // to zero). However, the proposal buffer adds another layer of + // possibility on top of this. Typically, the buffer does not flush + // immediately: this only happens at the beginning of the *next* raft + // handling cycle, i.e. the proposal will not re-enter the proposals map + // while the current batches of commands (recall, which may contain an + // arbitrary number of copies of the current command, both with various + // LAIs all but at most one of which are too small) are applied. *But*, + // the proposal buffer has a capacity, and if it does fill up in + // `(*Replica).propose` it will synchronously flush, meaning that a new + // LAI will be assigned to `cmd.proposal.MaxLeaseIndex` AND the command + // will have re-entered the proposals map. So even if we remove the + // "zeroing" of the MaxLeaseIndex prior to proposing, we still have to + // contend with the fact that once `tryReproposeWithNewLeaseIndex` may + // or may not be in the map. (At least it is assigned a new LAI if and + // only if it is back in the map!). + // + // These many possible worlds are a major source of complexity, a + // reduction of which is postponed. pErr = r.tryReproposeWithNewLeaseIndex(ctx, cmd) if pErr != nil { // An error from tryReproposeWithNewLeaseIndex implies that the current @@ -126,11 +181,23 @@ func (r *Replica) prepareLocalResult(ctx context.Context, cmd *replicatedCmd) { // It is thus safe to signal the error back to the client, which is also // the only sensible choice at this point. // - // We also know that the proposal is not in the proposals map, since the - // command is local and wasn't superseded, which is the condition in - // retrieveLocalProposals for removing from the map. So we're not leaking - // a map entry here, which we assert against below (and which has coverage, - // at time of writing, through TestReplicaReproposalWithNewLeaseIndexError). + // Note that the proposal may or may not be in the proposals map at this + // point. For example, if we artificially inject invalid LAIs during + // proposal, see the explanatory comment above. If, in the current app + // batch, we had two identical copies of an entry (which maps to a local + // proposal), and the LAI was stale, then both entries would be local. + // The first could succeed to repropose, which, if the propBuf was full, + // would immediately insert into the proposals map. Normally, when we + // then apply the second entry, it would be superseded and not hit the + // assertion. But, if we injected a stale LAI during this last + // reproposal, we could "accidentally" assign the same LAI again. The + // second entry would then try to repropose again, which is fine, but it + // could bump into the closed timestamp, get an error, and now we are in + // a situation where a reproposal attempt failed with the proposal being + // present in the map. + // + // For proposed simplifications, see: + // https://github.com/cockroachdb/cockroach/issues/97633 log.Infof(ctx, "failed to repropose %s at idx %d with new lease index: %s", cmd.ID, cmd.Index(), pErr) cmd.response.Err = pErr } else { From ebc5be995400915c65bc1a2e66f1c735fa1e897e Mon Sep 17 00:00:00 2001 From: Tobias Grieger Date: Mon, 13 Mar 2023 12:49:49 +0100 Subject: [PATCH 10/14] kvserver: bring back a helpful issue reference Epic: none Release note: None --- pkg/kv/kvserver/replica_proposal_buf.go | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/pkg/kv/kvserver/replica_proposal_buf.go b/pkg/kv/kvserver/replica_proposal_buf.go index 025ced87aa30..ba86d233489f 100644 --- a/pkg/kv/kvserver/replica_proposal_buf.go +++ b/pkg/kv/kvserver/replica_proposal_buf.go @@ -1191,6 +1191,10 @@ func (rp *replicaProposer) registerProposalLocked(p *ProposalData) { if prev := rp.mu.proposals[p.idKey]; prev != nil && prev != p { log.Fatalf(rp.store.AnnotateCtx(context.Background()), "two proposals under same ID:\n%+v,\n%+v", prev, p) } + // NB: we can see finished proposals inserted here. We don't like it but + // it's currently possible. + // + // See: https://github.com/cockroachdb/cockroach/issues/97605 rp.mu.proposals[p.idKey] = p } From 73170081af73c63a6f5750661d14aedd31b1c304 Mon Sep 17 00:00:00 2001 From: Tobias Grieger Date: Mon, 13 Mar 2023 12:56:54 +0100 Subject: [PATCH 11/14] kvserver: resuscitate some detail on `(*Replica).propose` Epic: none Release note: None --- pkg/kv/kvserver/replica_raft.go | 12 ++++++++++++ 1 file changed, 12 insertions(+) diff --git a/pkg/kv/kvserver/replica_raft.go b/pkg/kv/kvserver/replica_raft.go index e5fe3e3ec6dd..7051fb3e66d8 100644 --- a/pkg/kv/kvserver/replica_raft.go +++ b/pkg/kv/kvserver/replica_raft.go @@ -324,6 +324,18 @@ func (r *Replica) evalAndPropose( // propose takes ownership of the supplied token; the caller should tok.Move() // it into this method. It will be used to untrack the request once it comes out // of the proposal buffer. +// +// Note that this method is called for "new" proposals but also by +// `tryReproposeWithNewLeaseIndex`. This second call leaves questions on what +// exactly the desired semantics are - some fields (MaxLeaseIndex, +// ClosedTimestamp) will be set and this re-entrance into `propose` +// is hard to fully understand. (The reset of `MaxLeaseIndex` inside this +// method is a faer-fueled but likely unneeded consequence of this). +// +// TODO(repl): adopt the below issue which will see each proposal passed to this +// method exactly once: +// +// https://github.com/cockroachdb/cockroach/issues/98477 func (r *Replica) propose( ctx context.Context, p *ProposalData, tok TrackedRequestToken, ) (pErr *kvpb.Error) { From 05f5bf234f61dbcc40edd8733b200db2f4e4e81e Mon Sep 17 00:00:00 2001 From: Chengxiong Ruan Date: Mon, 13 Mar 2023 17:19:33 -0400 Subject: [PATCH 12/14] sql: check row level ttl change before truncating a table Fixes: #93443 Release note (sql change): This commit fixed a bug where crdb paniced wehn user tried to truncate a table which is has an ongoing row level ttl change. We still don't support table truncates in this scenario, but a more gentle unimplemented error is returned instead of panic. --- .../tests/3node-tenant/generated_test.go | 7 + .../truncate_with_concurrent_mutation | 123 ++++++++++++++++++ .../tests/fakedist-disk/generated_test.go | 7 + .../tests/fakedist-vec-off/generated_test.go | 7 + .../tests/fakedist/generated_test.go | 7 + .../generated_test.go | 7 + .../tests/local-vec-off/generated_test.go | 7 + .../logictest/tests/local/generated_test.go | 7 + pkg/sql/truncate.go | 5 + 9 files changed, 177 insertions(+) create mode 100644 pkg/sql/logictest/testdata/logic_test/truncate_with_concurrent_mutation diff --git a/pkg/ccl/logictestccl/tests/3node-tenant/generated_test.go b/pkg/ccl/logictestccl/tests/3node-tenant/generated_test.go index 5ef453cac7ba..5fe17a94f130 100644 --- a/pkg/ccl/logictestccl/tests/3node-tenant/generated_test.go +++ b/pkg/ccl/logictestccl/tests/3node-tenant/generated_test.go @@ -1984,6 +1984,13 @@ func TestTenantLogic_truncate( runLogicTest(t, "truncate") } +func TestTenantLogic_truncate_with_concurrent_mutation( + t *testing.T, +) { + defer leaktest.AfterTest(t)() + runLogicTest(t, "truncate_with_concurrent_mutation") +} + func TestTenantLogic_tsvector( t *testing.T, ) { diff --git a/pkg/sql/logictest/testdata/logic_test/truncate_with_concurrent_mutation b/pkg/sql/logictest/testdata/logic_test/truncate_with_concurrent_mutation new file mode 100644 index 000000000000..055b19053f01 --- /dev/null +++ b/pkg/sql/logictest/testdata/logic_test/truncate_with_concurrent_mutation @@ -0,0 +1,123 @@ +statement ok +SET CLUSTER SETTING jobs.registry.interval.adopt = '50ms'; + +statement ok +SET CLUSTER SETTING jobs.registry.interval.cancel = '50ms' + +# Make sure that table cannot be truncated if an index is being dropped +# concurrently. +statement ok +CREATE TABLE t1(a int primary key, b int); + +statement ok +CREATE INDEX idx_b ON t1(b); + +statement ok +SET CLUSTER SETTING jobs.debug.pausepoints = 'schemachanger.before.exec,newschemachanger.before.exec'; + +statement error pq: job \d+ was paused before it completed with reason: pause point +DROP INDEX t1@idx_b; + +statement error pq: unimplemented: cannot perform TRUNCATE on "t1" which has indexes being dropped +TRUNCATE TABLE t1; + +statement ok +SET CLUSTER SETTING jobs.debug.pausepoints = ''; + +# Make sure that table cannot be truncated if a column using UDY is being +# dropped concurrently. +statement ok +CREATE TYPE e AS ENUM ('v1', 'v2'); +CREATE TABLE t2(a int primary key, b e); + +statement ok +SET CLUSTER SETTING jobs.debug.pausepoints = 'schemachanger.before.exec,newschemachanger.before.exec'; + +statement error pq: job \d+ was paused before it completed with reason: pause point +ALTER TABLE t2 DROP COLUMN b; + +statement error pq: unimplemented: cannot perform TRUNCATE on "t2" which has a column \("\w+"\) being dropped which depends on another object +TRUNCATE TABLE t2; + +statement ok +SET CLUSTER SETTING jobs.debug.pausepoints = ''; + +# Make sure that table cannot be truncated when a constraint without index is +# being added concurrently. +statement ok +CREATE TABLE t3(a INT PRIMARY KEY, b INT); + +statement ok +SET CLUSTER SETTING jobs.debug.pausepoints = 'schemachanger.before.exec,newschemachanger.before.exec'; + +statement error pq: job \d+ was paused before it completed with reason: pause point +ALTER TABLE t3 ADD CONSTRAINT ckb CHECK (b > 3); + +statement error pq: unimplemented: cannot perform TRUNCATE on "t3" which has an ongoing CHECK constraint change +TRUNCATE TABLE t3; + +statement ok +SET CLUSTER SETTING jobs.debug.pausepoints = ''; + +# Make sure table cannot be truncated if there is concurrent primary key change. +statement ok +CREATE TABLE t4(a INT PRIMARY KEY, b INT NOT NULL); + +statement ok +SET CLUSTER SETTING jobs.debug.pausepoints = 'schemachanger.before.exec,newschemachanger.before.exec'; + +statement error pq: job \d+ was paused before it completed with reason: pause point +ALTER TABLE t4 ALTER PRIMARY KEY USING COLUMNS (b); + +# In Declarative schema changer we don't generate a primary key swap mutation. +# So the truncate will wait until the concurrent schema change to finish and +# hang this test. So we need to test this only in legacy schema changer. +onlyif config local-legacy-schema-changer +statement error pq: unimplemented: cannot perform TRUNCATE on "t4" which has an ongoing primary key change +TRUNCATE TABLE t4; + +statement ok +SET CLUSTER SETTING jobs.debug.pausepoints = ''; + +# Make sure table cannot be truncated when there is concurrent column type +# change. +statement ok +CREATE TABLE t5(a int primary key, b int); + +statement ok +SET CLUSTER SETTING jobs.debug.pausepoints = 'schemachanger.before.exec,newschemachanger.before.exec'; + +statement ok +SET enable_experimental_alter_column_type_general = true; + +statement error pq: job \d+ was paused before it completed with reason: pause point +ALTER TABLE t5 ALTER COLUMN b TYPE STRING; + +statement error pq: unimplemented: cannot perform TRUNCATE on "t5" which has an ongoing column type change +TRUNCATE TABLE t5; + +statement ok +SET CLUSTER SETTING jobs.debug.pausepoints = ''; + +# Make sure that table cannot be truncated is there is a ongoing row level TTL +# change. +statement ok +CREATE TABLE t6(a int primary key, b int); + +statement ok +SET CLUSTER SETTING jobs.debug.pausepoints = 'schemachanger.before.exec,newschemachanger.before.exec'; + +statement error pq: job \d+ was paused before it completed with reason: pause point +ALTER TABLE t6 SET (ttl_expire_after = '00:10:00':::INTERVAL); + +statement error pq: unimplemented: cannot perform TRUNCATE on "t6" which has an ongoing row level TTL change +TRUNCATE TABLE t6; + +statement ok +SET CLUSTER SETTING jobs.debug.pausepoints = ''; + +statement ok +RESUME JOBS (SELECT job_id FROM crdb_internal.jobs WHERE status = 'paused'); + +statement ok +USE test; diff --git a/pkg/sql/logictest/tests/fakedist-disk/generated_test.go b/pkg/sql/logictest/tests/fakedist-disk/generated_test.go index 60e9c080d7f9..a0a2264d5f9c 100644 --- a/pkg/sql/logictest/tests/fakedist-disk/generated_test.go +++ b/pkg/sql/logictest/tests/fakedist-disk/generated_test.go @@ -1948,6 +1948,13 @@ func TestLogic_truncate( runLogicTest(t, "truncate") } +func TestLogic_truncate_with_concurrent_mutation( + t *testing.T, +) { + defer leaktest.AfterTest(t)() + runLogicTest(t, "truncate_with_concurrent_mutation") +} + func TestLogic_tsvector( t *testing.T, ) { diff --git a/pkg/sql/logictest/tests/fakedist-vec-off/generated_test.go b/pkg/sql/logictest/tests/fakedist-vec-off/generated_test.go index 496b46e5502c..de360899629f 100644 --- a/pkg/sql/logictest/tests/fakedist-vec-off/generated_test.go +++ b/pkg/sql/logictest/tests/fakedist-vec-off/generated_test.go @@ -1955,6 +1955,13 @@ func TestLogic_truncate( runLogicTest(t, "truncate") } +func TestLogic_truncate_with_concurrent_mutation( + t *testing.T, +) { + defer leaktest.AfterTest(t)() + runLogicTest(t, "truncate_with_concurrent_mutation") +} + func TestLogic_tsvector( t *testing.T, ) { diff --git a/pkg/sql/logictest/tests/fakedist/generated_test.go b/pkg/sql/logictest/tests/fakedist/generated_test.go index 370cb806df7a..c1d180b29999 100644 --- a/pkg/sql/logictest/tests/fakedist/generated_test.go +++ b/pkg/sql/logictest/tests/fakedist/generated_test.go @@ -1969,6 +1969,13 @@ func TestLogic_truncate( runLogicTest(t, "truncate") } +func TestLogic_truncate_with_concurrent_mutation( + t *testing.T, +) { + defer leaktest.AfterTest(t)() + runLogicTest(t, "truncate_with_concurrent_mutation") +} + func TestLogic_tsvector( t *testing.T, ) { diff --git a/pkg/sql/logictest/tests/local-legacy-schema-changer/generated_test.go b/pkg/sql/logictest/tests/local-legacy-schema-changer/generated_test.go index ecbf90f63086..fb0844d59f94 100644 --- a/pkg/sql/logictest/tests/local-legacy-schema-changer/generated_test.go +++ b/pkg/sql/logictest/tests/local-legacy-schema-changer/generated_test.go @@ -1934,6 +1934,13 @@ func TestLogic_truncate( runLogicTest(t, "truncate") } +func TestLogic_truncate_with_concurrent_mutation( + t *testing.T, +) { + defer leaktest.AfterTest(t)() + runLogicTest(t, "truncate_with_concurrent_mutation") +} + func TestLogic_tsvector( t *testing.T, ) { diff --git a/pkg/sql/logictest/tests/local-vec-off/generated_test.go b/pkg/sql/logictest/tests/local-vec-off/generated_test.go index 64df77d1252d..66da5b2ab4cc 100644 --- a/pkg/sql/logictest/tests/local-vec-off/generated_test.go +++ b/pkg/sql/logictest/tests/local-vec-off/generated_test.go @@ -1962,6 +1962,13 @@ func TestLogic_truncate( runLogicTest(t, "truncate") } +func TestLogic_truncate_with_concurrent_mutation( + t *testing.T, +) { + defer leaktest.AfterTest(t)() + runLogicTest(t, "truncate_with_concurrent_mutation") +} + func TestLogic_tsvector( t *testing.T, ) { diff --git a/pkg/sql/logictest/tests/local/generated_test.go b/pkg/sql/logictest/tests/local/generated_test.go index f83880ccbb4c..820899609e9f 100644 --- a/pkg/sql/logictest/tests/local/generated_test.go +++ b/pkg/sql/logictest/tests/local/generated_test.go @@ -2151,6 +2151,13 @@ func TestLogic_truncate( runLogicTest(t, "truncate") } +func TestLogic_truncate_with_concurrent_mutation( + t *testing.T, +) { + defer leaktest.AfterTest(t)() + runLogicTest(t, "truncate_with_concurrent_mutation") +} + func TestLogic_tsvector( t *testing.T, ) { diff --git a/pkg/sql/truncate.go b/pkg/sql/truncate.go index 48a6a736ff62..ff67bf0dd2b5 100644 --- a/pkg/sql/truncate.go +++ b/pkg/sql/truncate.go @@ -383,6 +383,11 @@ func checkTableForDisallowedMutationsWithTruncate(desc *tabledesc.Mutable) error "TRUNCATE concurrent with ongoing schema change", "cannot perform TRUNCATE on %q which has an ongoing column type "+ "change", desc.GetName()) + } else if m.AsModifyRowLevelTTL() != nil { + return unimplemented.Newf( + "TRUNCATE concurrent with ongoing schema change", + "cannot perform TRUNCATE on %q which has an ongoing row level TTL "+ + "change", desc.GetName()) } else { return errors.AssertionFailedf("cannot perform TRUNCATE due to "+ "concurrent unknown mutation of type %T for mutation %d in %v", m, i, desc) From b0508fef98bf3fde3a68358fff6d81825b5b0cfb Mon Sep 17 00:00:00 2001 From: Jayant Shrivastava Date: Tue, 14 Mar 2023 09:57:06 -0400 Subject: [PATCH 13/14] cdc: use int64 for emitted bytes telemetry Previously, the stored `emitted_bytes` field was an int32, which can hold a maximum value of 2.1GB. This value is too small because the logging period is 24h and changefeeds can emit much more than 2.1GB in 24h. This change updates the field to be an int64, which solves this problem. Epic: None Release note: None --- pkg/ccl/changefeedccl/telemetry.go | 8 ++++---- pkg/util/log/eventpb/telemetry.proto | 2 +- 2 files changed, 5 insertions(+), 5 deletions(-) diff --git a/pkg/ccl/changefeedccl/telemetry.go b/pkg/ccl/changefeedccl/telemetry.go index e2027528c026..1788b8789069 100644 --- a/pkg/ccl/changefeedccl/telemetry.go +++ b/pkg/ccl/changefeedccl/telemetry.go @@ -69,8 +69,8 @@ func (ptl *periodicTelemetryLogger) recordEmittedBytes(numBytes int) { ptl.sinkTelemetryData.emittedBytes.Add(int64(numBytes)) } -func (ptl *periodicTelemetryLogger) resetEmittedBytes() int { - return int(ptl.sinkTelemetryData.emittedBytes.Swap(0)) +func (ptl *periodicTelemetryLogger) resetEmittedBytes() int64 { + return ptl.sinkTelemetryData.emittedBytes.Swap(0) } // recordEmittedBytes implements the telemetryLogger interface. @@ -96,7 +96,7 @@ func (ptl *periodicTelemetryLogger) maybeFlushLogs() { continuousTelemetryEvent := &eventpb.ChangefeedEmittedBytes{ CommonChangefeedEventDetails: ptl.changefeedDetails, JobId: int64(ptl.job.ID()), - EmittedBytes: int32(ptl.resetEmittedBytes()), + EmittedBytes: ptl.resetEmittedBytes(), LoggingInterval: loggingInterval, } log.StructuredEvent(ptl.ctx, continuousTelemetryEvent) @@ -111,7 +111,7 @@ func (ptl *periodicTelemetryLogger) close() { continuousTelemetryEvent := &eventpb.ChangefeedEmittedBytes{ CommonChangefeedEventDetails: ptl.changefeedDetails, JobId: int64(ptl.job.ID()), - EmittedBytes: int32(ptl.resetEmittedBytes()), + EmittedBytes: ptl.resetEmittedBytes(), LoggingInterval: loggingInterval, Closing: true, } diff --git a/pkg/util/log/eventpb/telemetry.proto b/pkg/util/log/eventpb/telemetry.proto index 67c1a8dff1fa..d6cac6cc507b 100644 --- a/pkg/util/log/eventpb/telemetry.proto +++ b/pkg/util/log/eventpb/telemetry.proto @@ -261,7 +261,7 @@ message ChangefeedEmittedBytes { int64 job_id = 2 [(gogoproto.jsontag) = ",omitempty", (gogoproto.moretags) = "redact:\"nonsensitive\""]; // The number of bytes emitted. - int32 emitted_bytes = 3 [(gogoproto.jsontag) = ",omitempty", (gogoproto.moretags) = "redact:\"nonsensitive\""]; + int64 emitted_bytes = 3 [(gogoproto.jsontag) = ",omitempty", (gogoproto.moretags) = "redact:\"nonsensitive\""]; // The time period in nanoseconds between emitting telemetry events of this type (per-aggregator). int64 logging_interval = 4 [(gogoproto.jsontag) = ",omitempty", (gogoproto.moretags) = "redact:\"nonsensitive\""]; From 5fc67c1d691bdc037a5bd45cc6d82b8711f47d8a Mon Sep 17 00:00:00 2001 From: Ricky Stewart Date: Tue, 14 Mar 2023 10:00:14 -0500 Subject: [PATCH 14/14] ci: allow-list `BUILD_VCS_NUMBER` env var in cloud unit tests This job was filing issues linking to the wrong commit. Epic: none Release note: None --- build/teamcity/cockroach/nightlies/cloud_unit_tests.sh | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/build/teamcity/cockroach/nightlies/cloud_unit_tests.sh b/build/teamcity/cockroach/nightlies/cloud_unit_tests.sh index 1c1052c859bf..669b5f54cb7d 100755 --- a/build/teamcity/cockroach/nightlies/cloud_unit_tests.sh +++ b/build/teamcity/cockroach/nightlies/cloud_unit_tests.sh @@ -8,6 +8,6 @@ source "$dir/teamcity-support.sh" # For $root source "$dir/teamcity-bazel-support.sh" # For run_bazel tc_start_block "Run cloud unit tests" -BAZEL_SUPPORT_EXTRA_DOCKER_ARGS="-e GITHUB_API_TOKEN -e GITHUB_REPO -e TC_BUILDTYPE_ID -e TC_BUILD_BRANCH -e TC_BUILD_ID -e TC_SERVER_URL -e GOOGLE_EPHEMERAL_CREDENTIALS -e GOOGLE_KMS_KEY_NAME -e GOOGLE_LIMITED_KEY_ID -e ASSUME_SERVICE_ACCOUNT -e GOOGLE_LIMITED_BUCKET -e ASSUME_SERVICE_ACCOUNT_CHAIN -e AWS_DEFAULT_REGION -e AWS_SHARED_CREDENTIALS_FILE -e AWS_CONFIG_FILE -e AWS_S3_BUCKET -e AWS_ASSUME_ROLE -e AWS_ROLE_ARN_CHAIN -e AWS_KMS_KEY_ARN -e AWS_S3_ENDPOINT -e AWS_KMS_ENDPOINT -e AWS_KMS_REGION -e AWS_ACCESS_KEY_ID -e AWS_SECRET_ACCESS_KEY -e AZURE_ACCOUNT_KEY -e AZURE_ACCOUNT_NAME -e AZURE_CONTAINER -e AZURE_CLIENT_ID -e AZURE_CLIENT_SECRET -e AZURE_TENANT_ID -e AZURE_VAULT_NAME -e AZURE_KMS_KEY_NAME -e AZURE_KMS_KEY_VERSION" \ +BAZEL_SUPPORT_EXTRA_DOCKER_ARGS="-e GITHUB_API_TOKEN -e GITHUB_REPO -e TC_BUILDTYPE_ID -e TC_BUILD_BRANCH -e TC_BUILD_ID -e TC_SERVER_URL -e GOOGLE_EPHEMERAL_CREDENTIALS -e GOOGLE_KMS_KEY_NAME -e GOOGLE_LIMITED_KEY_ID -e ASSUME_SERVICE_ACCOUNT -e GOOGLE_LIMITED_BUCKET -e ASSUME_SERVICE_ACCOUNT_CHAIN -e AWS_DEFAULT_REGION -e AWS_SHARED_CREDENTIALS_FILE -e AWS_CONFIG_FILE -e AWS_S3_BUCKET -e AWS_ASSUME_ROLE -e AWS_ROLE_ARN_CHAIN -e AWS_KMS_KEY_ARN -e AWS_S3_ENDPOINT -e AWS_KMS_ENDPOINT -e AWS_KMS_REGION -e AWS_ACCESS_KEY_ID -e AWS_SECRET_ACCESS_KEY -e AZURE_ACCOUNT_KEY -e AZURE_ACCOUNT_NAME -e AZURE_CONTAINER -e AZURE_CLIENT_ID -e AZURE_CLIENT_SECRET -e AZURE_TENANT_ID -e AZURE_VAULT_NAME -e AZURE_KMS_KEY_NAME -e AZURE_KMS_KEY_VERSION -e BUILD_VCS_NUMBER" \ run_bazel build/teamcity/cockroach/nightlies/cloud_unit_tests_impl.sh "$@" tc_end_block "Run cloud unit tests"