From b65d72c5dd9d139c2bddc68344c9676389039097 Mon Sep 17 00:00:00 2001 From: Andrew Werner Date: Tue, 25 Jun 2019 10:28:14 -0400 Subject: [PATCH] storage: batch command application and coalesce applied state per batch MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit This commit batches raft command application where possible. The basic approach is to notice that many commands only "trivially" update the replica state machine. Trivial commands can be processed in a single batch by acting on a copy of the replica state. Non-trivial commands share the same logic but always commit alone as they for one reason or another rely on having a view of the replica or storage engine as of a specific log index. This commit also sneaks in another optimization which batching enables. Each command mutates a portion of replica state called the applied state which tracks a combination of the log index which has been applied and the MVCC stats of the range as of that application. Before this change each entry would update this applied state and each of those writes will end up in the WAL and mem-table just the be compacted away in L1. Now that commands are being applied to the storage engine in a single batch it is easy to only update the applied state for the last entry in the batch. For sequential writes this patch shows a considerable performance win. The below benchmark was run on a 3-node c5d.4xlarge (16 vCPU) cluster with concurrency 128. ``` name old ops/s new ops/s delta KV0-throughput 22.1k ± 1% 32.8k ± 1% +48.59% (p=0.029 n=4+4) name old ms/s new ms/s delta KV0-P50 7.15 ± 2% 6.00 ± 0% -16.08% (p=0.029 n=4+4) KV0-Avg 5.80 ± 0% 3.80 ± 0% -34.48% (p=0.029 n=4+4) ``` Due to the re-organization of logic in the change, the Replica.mu does not need to be acquired as many times during the application of a batch. In the common case it is now acquired exactly twice in the process of applying a batch whereas before it was acquired more than twice per entry. This should hopefully improve performance on large machines which experience mutex contention for a single range. This effect is visible on large machines. Below are results from running a normal KV0 workload on c5d.18xlarge machines (72 vCPU machines) with concurrency 1024 and 16 initial splits. ``` name old ops/s new ops/s delta KV0-throughput 78.1k ± 1% 116.8k ± 5% +49.42% (p=0.029 n=4+4) name old ms/s new ms/s delta KV0-P50 24.4 ± 3% 19.7 ± 7% -19.28% (p=0.029 n=4+4) KV0-Avg 12.6 ± 0% 7.5 ± 9% -40.87% (p=0.029 n=4+4) ``` Fixes #37426. Release note (performance improvement): Batch raft entry application and coalesce writes to applied state for the batch. --- pkg/storage/entry_application_state_buf.go | 166 ++ .../entry_application_state_buf_test.go | 44 + pkg/storage/replica_application.go | 1394 +++++++++++++++++ pkg/storage/replica_proposal.go | 267 ---- pkg/storage/replica_raft.go | 813 +--------- pkg/storage/replica_test.go | 3 +- pkg/storage/track_raft_protos.go | 4 +- 7 files changed, 1621 insertions(+), 1070 deletions(-) create mode 100644 pkg/storage/entry_application_state_buf.go create mode 100644 pkg/storage/entry_application_state_buf_test.go create mode 100644 pkg/storage/replica_application.go diff --git a/pkg/storage/entry_application_state_buf.go b/pkg/storage/entry_application_state_buf.go new file mode 100644 index 000000000000..b68258758753 --- /dev/null +++ b/pkg/storage/entry_application_state_buf.go @@ -0,0 +1,166 @@ +// Copyright 2019 The Cockroach Authors. +// +// Use of this software is governed by the Business Source License +// included in the file licenses/BSL.txt. +// +// As of the Change Date specified in that file, in accordance with +// the Business Source License, use of this software will be governed +// by the Apache License, Version 2.0, included in the file +// licenses/APL.txt. + +package storage + +import ( + "fmt" + "sync" +) + +// entryApplicationStateBufNodeSize is the size of the arrays in an +// entryApplicationStateBufNode. +// TODO(ajwerner): justify this number. +const entryApplicationStateBufNodeSize = 8 + +// entryApplicationStateBuf is an allocation-efficient buffer used during the +// application of raft entries. Initialization occurs lazily upon the first +// call to allocate but used entryApplicationStateBuf objects should be released +// explicitly with the destroy() method to release the allocated buffers back +// to the pool. +type entryApplicationStateBuf struct { + len int32 + head, tail *entryApplicationStateBufNode +} + +var entryApplicationStateBufNodeSyncPool = sync.Pool{ + New: func() interface{} { return new(entryApplicationStateBufNode) }, +} + +type entryApplicationStateBufNode struct { + entryApplicationStateRingBuf + next *entryApplicationStateBufNode +} + +func (buf *entryApplicationStateBuf) last() *entryApplicationState { + return buf.tail.at(buf.tail.len - 1) +} + +// allocate extends the length of buf by one and returns the newly +// added element. If this is the fist call to allocate it will initialize buf. +// After a buf is initialized it should be explicitly destroyed. +func (buf *entryApplicationStateBuf) allocate() *entryApplicationState { + if buf.tail == nil { + n := entryApplicationStateBufNodeSyncPool.Get().(*entryApplicationStateBufNode) + buf.head, buf.tail = n, n + } + buf.len++ + if buf.tail.len == entryApplicationStateBufNodeSize { + newTail := entryApplicationStateBufNodeSyncPool.Get().(*entryApplicationStateBufNode) + buf.tail.next = newTail + buf.tail = newTail + } + return buf.tail.allocate() +} + +// destroy releases allocated nodes back into the sync pool. +// It is illegal to use buf after a call to destroy. +func (buf *entryApplicationStateBuf) destroy() { + for cur := buf.head; cur != nil; { + next := cur.next + *cur = entryApplicationStateBufNode{} + entryApplicationStateBufNodeSyncPool.Put(cur) + cur, buf.head = next, next + } + *buf = entryApplicationStateBuf{} +} + +// truncate clears all of the entries currently in a buffer. +func (buf *entryApplicationStateBuf) truncate() { + for buf.head != buf.tail { + buf.len -= buf.head.len + buf.head.truncate(buf.head.len) + oldHead := buf.head + newHead := oldHead.next + buf.head = newHead + *oldHead = entryApplicationStateBufNode{} + entryApplicationStateBufNodeSyncPool.Put(oldHead) + } + buf.head.truncate(buf.len) + buf.len = 0 +} + +// entryApplicationStateRingBuf is a ring-buffer of entryApplicationState. +// It offers indexing and truncation from the front. +type entryApplicationStateRingBuf struct { + len int32 + head int32 + buf [entryApplicationStateBufNodeSize]entryApplicationState +} + +// at returns the application state at the requested idx. +func (rb *entryApplicationStateRingBuf) at(idx int32) *entryApplicationState { + if idx >= rb.len { + panic(fmt.Sprintf("index out of range %v, %v", idx, rb.len)) + } + return &rb.buf[(rb.head+idx)%entryApplicationStateBufNodeSize] +} + +// allocate extends the length of the ring buffer by one and returns the newly +// added element. It is illegal to call allocate on a full ring buf. +func (rb *entryApplicationStateRingBuf) allocate() *entryApplicationState { + if rb.len == entryApplicationStateBufNodeSize { + panic("cannot push onto a full entryApplicationStateRingBuf") + } + ret := &rb.buf[(rb.head+rb.len)%entryApplicationStateBufNodeSize] + rb.len++ + return ret +} + +// truncate removes the first n elements from the buffer. +// It is illegal to pass a number greater than the current len. +func (rb *entryApplicationStateRingBuf) truncate(n int32) { + if n > rb.len { + panic("cannot truncate more than have") + } + // TODO(ajwerner): consider removing this as an optimization. + for i := int32(0); i < n; i++ { + *rb.at(i) = entryApplicationState{} + } + rb.len -= n + rb.head += n + rb.head %= entryApplicationStateBufNodeSize +} + +type entryApplicationStateBufIterator struct { + idx int32 + offset int32 + buf *entryApplicationStateBuf + node *entryApplicationStateBufNode +} + +func (it *entryApplicationStateBufIterator) init(buf *entryApplicationStateBuf) bool { + *it = entryApplicationStateBufIterator{ + buf: buf, + node: buf.head, + } + return it.buf.len > 0 +} + +func (it *entryApplicationStateBufIterator) state() *entryApplicationState { + return it.node.at(it.offset) +} + +func (it *entryApplicationStateBufIterator) isLast() bool { + return it.idx+1 == it.buf.len +} + +func (it *entryApplicationStateBufIterator) next() bool { + if it.idx+1 == it.buf.len { + return false + } + it.idx++ + it.offset++ + if it.offset == entryApplicationStateBufNodeSize { + it.node = it.node.next + it.offset = 0 + } + return true +} diff --git a/pkg/storage/entry_application_state_buf_test.go b/pkg/storage/entry_application_state_buf_test.go new file mode 100644 index 000000000000..874368959d24 --- /dev/null +++ b/pkg/storage/entry_application_state_buf_test.go @@ -0,0 +1,44 @@ +// Copyright 2019 The Cockroach Authors. +// +// Use of this software is governed by the Business Source License +// included in the file licenses/BSL.txt. +// +// As of the Change Date specified in that file, in accordance with +// the Business Source License, use of this software will be governed +// by the Apache License, Version 2.0, included in the file +// licenses/APL.txt. + +package storage + +import ( + "testing" + + "github.com/cockroachdb/cockroach/pkg/util/leaktest" + "github.com/stretchr/testify/assert" +) + +// TestApplicationStateBuf is an overly simplistic test of the +// entryApplicationStateBuf behavior. +func TestApplicationStateBuf(t *testing.T) { + defer leaktest.AfterTest(t)() + var buf entryApplicationStateBuf + var states []*entryApplicationState + for i := 0; i < 5*entryApplicationStateBufNodeSize+1; i++ { + assert.Equal(t, i, int(buf.len)) + states = append(states, buf.allocate()) + assert.Equal(t, i+1, int(buf.len)) + } + last := states[len(states)-1] + assert.Equal(t, last, buf.last()) + var it entryApplicationStateBufIterator + i := 0 + for ok := it.init(&buf); ok; ok = it.next() { + assert.Equal(t, states[i], it.state()) + i++ + } + buf.truncate() + assert.Equal(t, 0, int(buf.len)) + assert.Equal(t, last, buf.last()) + buf.destroy() + assert.EqualValues(t, buf, entryApplicationStateBuf{}) +} diff --git a/pkg/storage/replica_application.go b/pkg/storage/replica_application.go new file mode 100644 index 000000000000..f75439580b76 --- /dev/null +++ b/pkg/storage/replica_application.go @@ -0,0 +1,1394 @@ +// Copyright 2019 The Cockroach Authors. +// +// Use of this software is governed by the Business Source License +// included in the file licenses/BSL.txt. +// +// As of the Change Date specified in that file, in accordance with +// the Business Source License, use of this software will be governed +// by the Apache License, Version 2.0, included in the file +// licenses/APL.txt. + +package storage + +import ( + "context" + "fmt" + "sync" + + "github.com/cockroachdb/cockroach/pkg/roachpb" + "github.com/cockroachdb/cockroach/pkg/storage/batcheval/result" + "github.com/cockroachdb/cockroach/pkg/storage/engine" + "github.com/cockroachdb/cockroach/pkg/storage/engine/enginepb" + "github.com/cockroachdb/cockroach/pkg/storage/rditer" + "github.com/cockroachdb/cockroach/pkg/storage/stateloader" + "github.com/cockroachdb/cockroach/pkg/storage/storagebase" + "github.com/cockroachdb/cockroach/pkg/storage/storagepb" + "github.com/cockroachdb/cockroach/pkg/util" + "github.com/cockroachdb/cockroach/pkg/util/hlc" + "github.com/cockroachdb/cockroach/pkg/util/log" + "github.com/cockroachdb/cockroach/pkg/util/protoutil" + "github.com/cockroachdb/cockroach/pkg/util/timeutil" + "github.com/kr/pretty" + "github.com/pkg/errors" + "go.etcd.io/etcd/raft" + "go.etcd.io/etcd/raft/raftpb" +) + +// handleCommittedEntriesRaftMuLocked deals with the complexities involved in +// moving the Replica's replicated state machine forward given committed raft +// entries. All changes to r.mu.state occur downstream of this call. +// +// The stats argument is updated to reflect the number of entries which are +// processed. The refreshReason argument is the current refreshReason for this +// raftReady and the returned refreshRaftReason reflects any update to this +// value encountered during the processing of these entries. +// +// At a high level, it receives committed entries which each contains the +// evaluation of a batch (at its heart a WriteBatch, to be applied to the +// underlying storage engine), which it decodes into batches of entries which +// are safe to apply atomically together. +// +// The pseudocode looks like: +// +// while entries: +// for each entry in entries: +// decode front, add to batch, pop front from entries +// if entry is non-trivial: +// break +// for each entry in batch: +// check if failed +// if not: +// apply to batch +// commit batch +// update Replica.state +// for each entry in batch: +// ack client, release latches +// +// The processing of committed entries proceeds in 4 stages, decoding, +// local preparation, staging, and application. Commands may be applied together +// so long as their implied state change is "trivial" (see isTrivial). Once +// decoding has discovered a batch boundary, the commands are prepared by +// reading the current replica state from underneath the Replica.mu and +// determining whether any of the commands were proposed locally. Next each +// command is written to the engine.Batch and have the "trivial" component of +// their ReplicatedEvalResult applied to the batch's view of the ReplicaState. +// Finally the batch is written to the storage engine and its side effects on +// the Replica's state are applied. +func (r *Replica) handleCommittedEntriesRaftMuLocked( + ctx context.Context, + committedEntries []raftpb.Entry, + stats *handleRaftReadyStats, + prevReason refreshRaftReason, +) (refreshReason refreshRaftReason, expl string, err error) { + + // NB: We want to batch application of commands which have a "trivial" + // impact on replicated range state. + // + // To deal with this we create a batch up front. For each command we check its + // type. If it's a conf change we know it'll be handled on its own. + // If it's a regular entry with an interesting ReplicatedEvalResult then we + // also want to handle it on its own. + // + // For things which we do not want to handle on their own we call + // processRaftCommand with the current batch and the current view of in-memory + // state. + b := getEntryApplicationBatch() + defer releaseEntryApplicationBatch(b) + for len(committedEntries) > 0 { + var sawEmptyEntry bool + sawEmptyEntry, committedEntries, expl, err = b.decode(ctx, committedEntries) + // etcd raft occasionally adds a nil entry (our own commands are never + // empty). This happens in two situations: + // When a new leader is elected, and when a config change is dropped due + // to the "one at a time" rule. In both cases we may need to resubmit our + // pending proposals (In the former case we resubmit everything because + // we proposed them to a former leader that is no longer able to commit + // them. In the latter case we only need to resubmit pending config + // changes, but it's hard to distinguish so we resubmit everything + // anyway). We delay resubmission until after we have processed the + // entire batch of entries. + if sawEmptyEntry { + // Overwrite unconditionally since this is the most aggressive + // reproposal mode. + if !r.store.TestingKnobs().DisableRefreshReasonNewLeaderOrConfigChange { + refreshReason = reasonNewLeaderOrConfigChange + } + } + if err != nil { + return refreshReason, expl, err + } + r.retreiveLocalProposals(ctx, b) + b.batch = r.store.engine.NewBatch() + // Stage each of the commands which will write them into the newly created + // engine.Batch and update b's view of the replicaState. + var it entryApplicationStateBufIterator + for ok := it.init(&b.appStates); ok; ok = it.next() { + s := it.state() + r.stageRaftCommand(s.ctx, s, b.batch, &b.replicaState, it.isLast()) + updatedTruncatedState := applyTrivialReplicatedEvalResult(s.ctx, + s.rResult(), s.e.Index, s.leaseIndex, &b.replicaState) + b.updatedTruncatedState = b.updatedTruncatedState || updatedTruncatedState + } + if expl, err = r.applyBatch(ctx, b, stats); err != nil { + return refreshReason, expl, err + } + } + return refreshReason, "", nil +} + +func (r *Replica) retreiveLocalProposals(ctx context.Context, b *entryApplicationBatch) { + r.mu.Lock() + defer r.mu.Unlock() + b.replicaState = r.mu.state + // Copy stats as it gets updated in-place in applyRaftCommandToBatch. + b.replicaState.Stats = &b.stats + *b.replicaState.Stats = *r.mu.state.Stats + var it entryApplicationStateBufIterator + for ok := it.init(&b.appStates); ok; ok = it.next() { + s := it.state() + s.proposal = r.mu.proposals[s.idKey] + if s.proposedLocally() { + // We initiated this command, so use the caller-supplied context. + s.ctx = s.proposal.ctx + delete(r.mu.proposals, s.idKey) + // At this point we're not guaranteed to have proposalQuota initialized, + // the same is true for quotaReleaseQueues. Only queue the proposal's + // quota for release if the proposalQuota is initialized. + if r.mu.proposalQuota != nil { + r.mu.quotaReleaseQueue = append(r.mu.quotaReleaseQueue, s.proposal.quotaSize) + } + } + } +} + +// stageRaftCommand handles the first phase of applying a command to the +// replica state machine. +// +// The proposal also contains auxiliary data which needs to be verified in +// order to decide whether the proposal should be applied: the command's +// MaxLeaseIndex must move the state machine's LeaseAppliedIndex forward, and +// the proposer's lease (or rather its sequence number) must match that of the +// state machine, and lastly the GCThreshold is validated. If any of the checks +// fail, the proposal's content is wiped and we apply an empty log entry +// instead, returning an error to the caller to handle. The two typical cases +// are the lease mismatch (in which case the caller tries to send the command +// to the actual leaseholder) and violations of the LeaseAppliedIndex (in which +// case the proposal is retried if it was proposed locally). +// +// Assuming all checks were passed, the command is applied to the batch, +// which is done by the aptly named applyRaftCommandToBatch. +// +// For trivial proposals this is the whole story, but some commands trigger +// additional code in this method. The standard way in which this is triggered +// is via a side effect communicated in the proposal's ReplicatedEvalResult +// and, for local proposals, the LocalEvalResult. These might, for example, +// trigger an update of the Replica's in-memory state to match updates to the +// on-disk state, or pass intents to the intent resolver. Some commands don't +// fit this simple schema and need to hook deeper into the code. Notably splits +// and merges need to acquire locks on their right-hand side Replicas and may +// need to add data to the WriteBatch before it is applied; similarly, changes +// to the disk layout of internal state typically require a migration which +// shows up here. Any of this logic however is deferred until after the batch +// has been written to the storage engine. +func (r *Replica) stageRaftCommand( + ctx context.Context, + s *entryApplicationState, + batch engine.Batch, + replicaState *storagepb.ReplicaState, + writeAppliedState bool, +) { + if s.e.Index == 0 { + log.Fatalf(ctx, "processRaftCommand requires a non-zero index") + } + if log.V(4) { + log.Infof(ctx, "processing command %x: maxLeaseIndex=%d", + s.idKey, s.raftCmd.MaxLeaseIndex) + } + + var ts hlc.Timestamp + if s.idKey != "" { + ts = s.rResult().Timestamp + } + + s.leaseIndex, s.proposalRetry, s.forcedErr = checkForcedErr(ctx, + s.idKey, s.raftCmd, s.proposal, s.proposedLocally(), replicaState) + if s.forcedErr == nil { + // Verify that the batch timestamp is after the GC threshold. This is + // necessary because not all commands declare read access on the GC + // threshold key, even though they implicitly depend on it. This means + // that access to this state will not be serialized by latching, + // so we must perform this check upstream and downstream of raft. + // See #14833. + // + // We provide an empty key span because we already know that the Raft + // command is allowed to apply within its key range. This is guaranteed + // by checks upstream of Raft, which perform the same validation, and by + // span latches, which assure that any modifications to the range's + // boundaries will be serialized with this command. Finally, the + // leaseAppliedIndex check in checkForcedErrLocked ensures that replays + // outside of the spanlatch manager's control which break this + // serialization ordering will already by caught and an error will be + // thrown. + s.forcedErr = roachpb.NewError(r.requestCanProceed(roachpb.RSpan{}, ts)) + } + + // applyRaftCommandToBatch will return "expected" errors, but may also indicate + // replica corruption (as of now, signaled by a replicaCorruptionError). + // We feed its return through maybeSetCorrupt to act when that happens. + if s.forcedErr != nil { + log.VEventf(ctx, 1, "applying command with forced error: %s", s.forcedErr) + } else { + log.Event(ctx, "applying command") + + if splitMergeUnlock, err := r.maybeAcquireSplitMergeLock(ctx, s.raftCmd); err != nil { + log.Eventf(ctx, "unable to acquire split lock: %s", err) + // Send a crash report because a former bug in the error handling might have + // been the root cause of #19172. + _ = r.store.stopper.RunAsyncTask(ctx, "crash report", func(ctx context.Context) { + log.SendCrashReport( + ctx, + &r.store.cfg.Settings.SV, + 0, // depth + "while acquiring split lock: %s", + []interface{}{err}, + log.ReportTypeError, + ) + }) + + s.forcedErr = roachpb.NewError(err) + } else if splitMergeUnlock != nil { + // Close over raftCmd to capture its value at execution time; we clear + // ReplicatedEvalResult on certain errors. + s.splitMergeUnlock = splitMergeUnlock + } + } + + var writeBatch *storagepb.WriteBatch + { + if filter := r.store.cfg.TestingKnobs.TestingApplyFilter; s.forcedErr == nil && filter != nil { + var newPropRetry int + newPropRetry, s.forcedErr = filter(storagebase.ApplyFilterArgs{ + CmdID: s.idKey, + ReplicatedEvalResult: *s.rResult(), + StoreID: r.store.StoreID(), + RangeID: r.RangeID, + }) + if s.proposalRetry == 0 { + s.proposalRetry = proposalReevaluationReason(newPropRetry) + } + } + + if s.forcedErr != nil { + // Apply an empty entry. + *s.rResult() = storagepb.ReplicatedEvalResult{} + s.raftCmd.WriteBatch = nil + s.raftCmd.LogicalOpLog = nil + } + + // Update the node clock with the serviced request. This maintains + // a high water mark for all ops serviced, so that received ops without + // a timestamp specified are guaranteed one higher than any op already + // executed for overlapping keys. + r.store.Clock().Update(ts) + + if s.raftCmd.WriteBatch != nil { + writeBatch = s.raftCmd.WriteBatch + } + + if deprecatedDelta := s.rResult().DeprecatedDelta; deprecatedDelta != nil { + s.rResult().Delta = deprecatedDelta.ToStatsDelta() + s.rResult().DeprecatedDelta = nil + } + + // AddSSTable ingestions run before the actual batch. This makes sure + // that when the Raft command is applied, the ingestion has definitely + // succeeded. Note that we have taken precautions during command + // evaluation to avoid having mutations in the WriteBatch that affect + // the SSTable. Not doing so could result in order reversal (and missing + // values) here. If the key range we are ingesting into isn't empty, + // we're not using AddSSTable but a plain WriteBatch. + if s.rResult().AddSSTable != nil { + copied := addSSTablePreApply( + ctx, + r.store.cfg.Settings, + r.store.engine, + r.raftMu.sideloaded, + s.e.Term, + s.e.Index, + *s.rResult().AddSSTable, + r.store.limiters.BulkIOWriteRate, + ) + r.store.metrics.AddSSTableApplications.Inc(1) + if copied { + r.store.metrics.AddSSTableApplicationCopies.Inc(1) + } + s.rResult().AddSSTable = nil + } + + if s.rResult().Split != nil { + // Splits require a new HardState to be written to the new RHS + // range (and this needs to be atomic with the main batch). This + // cannot be constructed at evaluation time because it differs + // on each replica (votes may have already been cast on the + // uninitialized replica). Transform the write batch to add the + // updated HardState. + // See https://github.com/cockroachdb/cockroach/issues/20629 + // + // This is not the most efficient, but it only happens on splits, + // which are relatively infrequent and don't write much data. + tmpBatch := r.store.engine.NewBatch() + if err := tmpBatch.ApplyBatchRepr(writeBatch.Data, false); err != nil { + log.Fatal(ctx, err) + } + splitPreApply(ctx, tmpBatch, s.rResult().Split.SplitTrigger) + writeBatch.Data = tmpBatch.Repr() + tmpBatch.Close() + } + + if merge := s.rResult().Merge; merge != nil { + // Merges require the subsumed range to be atomically deleted when the + // merge transaction commits. + // + // This is not the most efficient, but it only happens on merges, + // which are relatively infrequent and don't write much data. + tmpBatch := r.store.engine.NewBatch() + if err := tmpBatch.ApplyBatchRepr(writeBatch.Data, false); err != nil { + log.Fatal(ctx, err) + } + rhsRepl, err := r.store.GetReplica(merge.RightDesc.RangeID) + if err != nil { + log.Fatal(ctx, err) + } + const destroyData = false + err = rhsRepl.preDestroyRaftMuLocked(ctx, tmpBatch, tmpBatch, merge.RightDesc.NextReplicaID, destroyData) + if err != nil { + log.Fatal(ctx, err) + } + writeBatch.Data = tmpBatch.Repr() + tmpBatch.Close() + } + + { + var err error + *s.rResult(), err = r.applyRaftCommandToBatch( + s.ctx, s.idKey, *s.rResult(), s.e.Index, s.leaseIndex, writeBatch, replicaState, batch, writeAppliedState) + + // applyRaftCommandToBatch returned an error, which usually indicates + // either a serious logic bug in CockroachDB or a disk + // corruption/out-of-space issue. Make sure that these fail with + // descriptive message so that we can differentiate the root causes. + if err != nil { + log.Errorf(ctx, "unable to update the state machine: %s", err) + // Report the fatal error separately and only with the error, as that + // triggers an optimization for which we directly report the error to + // sentry (which in turn allows sentry to distinguish different error + // types). + log.Fatal(ctx, err) + } + } + } +} + +func checkForcedErr( + ctx context.Context, + idKey storagebase.CmdIDKey, + raftCmd storagepb.RaftCommand, + proposal *ProposalData, + proposedLocally bool, + replicaState *storagepb.ReplicaState, +) (uint64, proposalReevaluationReason, *roachpb.Error) { + leaseIndex := replicaState.LeaseAppliedIndex + isLeaseRequest := raftCmd.ReplicatedEvalResult.IsLeaseRequest + var requestedLease roachpb.Lease + if isLeaseRequest { + requestedLease = *raftCmd.ReplicatedEvalResult.State.Lease + } + if idKey == "" { + // This is an empty Raft command (which is sent by Raft after elections + // to trigger reproposals or during concurrent configuration changes). + // Nothing to do here except making sure that the corresponding batch + // (which is bogus) doesn't get executed (for it is empty and so + // properties like key range are undefined). + return leaseIndex, proposalNoReevaluation, roachpb.NewErrorf("no-op on empty Raft entry") + } + + // Verify the lease matches the proposer's expectation. We rely on + // the proposer's determination of whether the existing lease is + // held, and can be used, or is expired, and can be replaced. + // Verify checks that the lease has not been modified since proposal + // due to Raft delays / reorderings. + // To understand why this lease verification is necessary, see comments on the + // proposer_lease field in the proto. + leaseMismatch := false + if raftCmd.DeprecatedProposerLease != nil { + // VersionLeaseSequence must not have been active when this was proposed. + // + // This does not prevent the lease race condition described below. The + // reason we don't fix this here as well is because fixing the race + // requires a new cluster version which implies that we'll already be + // using lease sequence numbers and will fall into the case below. + leaseMismatch = !raftCmd.DeprecatedProposerLease.Equivalent(*replicaState.Lease) + } else { + leaseMismatch = raftCmd.ProposerLeaseSequence != replicaState.Lease.Sequence + if !leaseMismatch && isLeaseRequest { + // Lease sequence numbers are a reflection of lease equivalency + // between subsequent leases. However, Lease.Equivalent is not fully + // symmetric, meaning that two leases may be Equivalent to a third + // lease but not Equivalent to each other. If these leases are + // proposed under that same third lease, neither will be able to + // detect whether the other has applied just by looking at the + // current lease sequence number because neither will will increment + // the sequence number. + // + // This can lead to inversions in lease expiration timestamps if + // we're not careful. To avoid this, if a lease request's proposer + // lease sequence matches the current lease sequence and the current + // lease sequence also matches the requested lease sequence, we make + // sure the requested lease is Equivalent to current lease. + if replicaState.Lease.Sequence == requestedLease.Sequence { + // It is only possible for this to fail when expiration-based + // lease extensions are proposed concurrently. + leaseMismatch = !replicaState.Lease.Equivalent(requestedLease) + } + + // This is a check to see if the lease we proposed this lease request against is the same + // lease that we're trying to update. We need to check proposal timestamps because + // extensions don't increment sequence numbers. Without this check a lease could + // be extended and then another lease proposed against the original lease would + // be applied over the extension. + if raftCmd.ReplicatedEvalResult.PrevLeaseProposal != nil && + (*raftCmd.ReplicatedEvalResult.PrevLeaseProposal != *replicaState.Lease.ProposedTS) { + leaseMismatch = true + } + } + } + if leaseMismatch { + log.VEventf( + ctx, 1, + "command proposed from replica %+v with lease #%d incompatible to %v", + raftCmd.ProposerReplica, raftCmd.ProposerLeaseSequence, *replicaState.Lease, + ) + if isLeaseRequest { + // For lease requests we return a special error that + // redirectOnOrAcquireLease() understands. Note that these + // requests don't go through the DistSender. + return leaseIndex, proposalNoReevaluation, roachpb.NewError(&roachpb.LeaseRejectedError{ + Existing: *replicaState.Lease, + Requested: requestedLease, + Message: "proposed under invalid lease", + }) + } + // We return a NotLeaseHolderError so that the DistSender retries. + nlhe := newNotLeaseHolderError( + replicaState.Lease, raftCmd.ProposerReplica.StoreID, replicaState.Desc) + nlhe.CustomMsg = fmt.Sprintf( + "stale proposal: command was proposed under lease #%d but is being applied "+ + "under lease: %s", raftCmd.ProposerLeaseSequence, replicaState.Lease) + return leaseIndex, proposalNoReevaluation, roachpb.NewError(nlhe) + } + + if isLeaseRequest { + // Lease commands are ignored by the counter (and their MaxLeaseIndex is ignored). This + // makes sense since lease commands are proposed by anyone, so we can't expect a coherent + // MaxLeaseIndex. Also, lease proposals are often replayed, so not making them update the + // counter makes sense from a testing perspective. + // + // However, leases get special vetting to make sure we don't give one to a replica that was + // since removed (see #15385 and a comment in redirectOnOrAcquireLease). + if _, ok := replicaState.Desc.GetReplicaDescriptor(requestedLease.Replica.StoreID); !ok { + return leaseIndex, proposalNoReevaluation, roachpb.NewError(&roachpb.LeaseRejectedError{ + Existing: *replicaState.Lease, + Requested: requestedLease, + Message: "replica not part of range", + }) + } + } else if replicaState.LeaseAppliedIndex < raftCmd.MaxLeaseIndex { + // The happy case: the command is applying at or ahead of the minimal + // permissible index. It's ok if it skips a few slots (as can happen + // during rearrangement); this command will apply, but later ones which + // were proposed at lower indexes may not. Overall though, this is more + // stable and simpler than requiring commands to apply at their exact + // lease index: Handling the case in which MaxLeaseIndex > oldIndex+1 + // is otherwise tricky since we can't tell the client to try again + // (reproposals could exist and may apply at the right index, leading + // to a replay), and assigning the required index would be tedious + // seeing that it would have to rewind sometimes. + leaseIndex = raftCmd.MaxLeaseIndex + } else { + // The command is trying to apply at a past log position. That's + // unfortunate and hopefully rare; the client on the proposer will try + // again. Note that in this situation, the leaseIndex does not advance. + retry := proposalNoReevaluation + if proposedLocally { + log.VEventf( + ctx, 1, + "retry proposal %x: applied at lease index %d, required < %d", + proposal.idKey, leaseIndex, raftCmd.MaxLeaseIndex, + ) + retry = proposalIllegalLeaseIndex + } + return leaseIndex, retry, roachpb.NewErrorf( + "command observed at lease index %d, but required < %d", leaseIndex, raftCmd.MaxLeaseIndex, + ) + } + return leaseIndex, proposalNoReevaluation, nil +} + +// applyRaftCommandToBatch applies a raft command from the replicated log to the +// current batch's view of the underlying state machine. When the state machine +// cannot be updated, an error (which is likely fatal!) is returned and must be +// handled by the caller. +// The returned ReplicatedEvalResult replaces the caller's. +func (r *Replica) applyRaftCommandToBatch( + ctx context.Context, + idKey storagebase.CmdIDKey, + rResult storagepb.ReplicatedEvalResult, + raftAppliedIndex, leaseAppliedIndex uint64, + writeBatch *storagepb.WriteBatch, + replicaState *storagepb.ReplicaState, + batch engine.Batch, + writeAppliedState bool, +) (storagepb.ReplicatedEvalResult, error) { + if writeBatch != nil && len(writeBatch.Data) > 0 { + // Record the write activity, passing a 0 nodeID because replica.writeStats + // intentionally doesn't track the origin of the writes. + mutationCount, err := engine.RocksDBBatchCount(writeBatch.Data) + if err != nil { + log.Errorf(ctx, "unable to read header of committed WriteBatch: %s", err) + } else { + r.writeStats.recordCount(float64(mutationCount), 0 /* nodeID */) + } + } + + // Exploit the fact that a split will result in a full stats + // recomputation to reset the ContainsEstimates flag. + // + // TODO(tschottdorf): We want to let the usual MVCCStats-delta + // machinery update our stats for the left-hand side. But there is no + // way to pass up an MVCCStats object that will clear out the + // ContainsEstimates flag. We should introduce one, but the migration + // makes this worth a separate effort (ContainsEstimates would need to + // have three possible values, 'UNCHANGED', 'NO', and 'YES'). + // Until then, we're left with this rather crude hack. + if rResult.Split != nil { + replicaState.Stats.ContainsEstimates = false + } + ms := replicaState.Stats + + if raftAppliedIndex != replicaState.RaftAppliedIndex+1 { + // If we have an out of order index, there's corruption. No sense in + // trying to update anything or running the command. Simply return + // a corruption error. + return storagepb.ReplicatedEvalResult{}, errors.Errorf("applied index jumped from %d to %d", + replicaState.RaftAppliedIndex, raftAppliedIndex) + } + + haveTruncatedState := rResult.State != nil && rResult.State.TruncatedState != nil + + if writeBatch != nil { + if err := batch.ApplyBatchRepr(writeBatch.Data, false); err != nil { + return storagepb.ReplicatedEvalResult{}, errors.Wrap(err, "unable to apply WriteBatch") + } + } + + // The only remaining use of the batch is for range-local keys which we know + // have not been previously written within this batch. + writer := batch.Distinct() + + // Special-cased MVCC stats handling to exploit commutativity of stats delta + // upgrades. Thanks to commutativity, the spanlatch manager does not have to + // serialize on the stats key. + deltaStats := rResult.Delta.ToStats() + usingAppliedStateKey := replicaState.UsingAppliedStateKey + if !usingAppliedStateKey && rResult.State != nil && rResult.State.UsingAppliedStateKey { + // The Raft command wants us to begin using the RangeAppliedState key + // and we haven't performed the migration yet. Delete the old keys + // that this new key is replacing. + err := r.raftMu.stateLoader.MigrateToRangeAppliedStateKey(ctx, writer, &deltaStats) + if err != nil { + return storagepb.ReplicatedEvalResult{}, errors.Wrap(err, "unable to migrate to range applied state") + } + usingAppliedStateKey = true + } + + if !writeAppliedState { + // Don't do anything with the truncated state. + } else if usingAppliedStateKey { + // Note that calling ms.Add will never result in ms.LastUpdateNanos + // decreasing (and thus LastUpdateNanos tracks the maximum LastUpdateNanos + // across all deltaStats). + ms.Add(deltaStats) + + // Set the range applied state, which includes the last applied raft and + // lease index along with the mvcc stats, all in one key. + if err := r.raftMu.stateLoader.SetRangeAppliedState(ctx, writer, + raftAppliedIndex, leaseAppliedIndex, ms); err != nil { + return storagepb.ReplicatedEvalResult{}, errors.Wrap(err, "unable to set range applied state") + } + ms.Subtract(deltaStats) + } else { + // Advance the last applied index. We use a blind write in order to avoid + // reading the previous applied index keys on every write operation. This + // requires a little additional work in order maintain the MVCC stats. + var appliedIndexNewMS enginepb.MVCCStats + if err := r.raftMu.stateLoader.SetLegacyAppliedIndexBlind(ctx, writer, &appliedIndexNewMS, + raftAppliedIndex, leaseAppliedIndex); err != nil { + return storagepb.ReplicatedEvalResult{}, errors.Wrap(err, "unable to set applied index") + } + deltaStats.SysBytes += appliedIndexNewMS.SysBytes - + r.raftMu.stateLoader.CalcAppliedIndexSysBytes(replicaState.RaftAppliedIndex, replicaState.LeaseAppliedIndex) + + // Note that calling ms.Add will never result in ms.LastUpdateNanos + // decreasing (and thus LastUpdateNanos tracks the maximum LastUpdateNanos + // across all deltaStats). + ms.Add(deltaStats) + if err := r.raftMu.stateLoader.SetMVCCStats(ctx, writer, ms); err != nil { + return storagepb.ReplicatedEvalResult{}, errors.Wrap(err, "unable to update MVCCStats") + } + ms.Subtract(deltaStats) + } + + if haveTruncatedState { + apply, err := handleTruncatedStateBelowRaft(ctx, replicaState.TruncatedState, rResult.State.TruncatedState, r.raftMu.stateLoader, writer) + if err != nil { + return storagepb.ReplicatedEvalResult{}, err + } + if !apply { + // The truncated state was discarded, so make sure we don't apply + // it to our in-memory state. + rResult.State.TruncatedState = nil + rResult.RaftLogDelta = 0 + // TODO(ajwerner): consider moving this code. + // We received a truncation that doesn't apply to us, so we know that + // there's a leaseholder out there with a log that has earlier entries + // than ours. That leader also guided our log size computations by + // giving us RaftLogDeltas for past truncations, and this was likely + // off. Mark our Raft log size is not trustworthy so that, assuming + // we step up as leader at some point in the future, we recompute + // our numbers. + r.mu.Lock() + r.mu.raftLogSizeTrusted = false + r.mu.Unlock() + } + } + + // TODO(peter): We did not close the writer in an earlier version of + // the code, which went undetected even though we used the batch after + // (though only to commit it). We should add an assertion to prevent that in + // the future. + writer.Close() + + start := timeutil.Now() + + var assertHS *raftpb.HardState + if util.RaceEnabled && rResult.Split != nil { + rsl := stateloader.Make(rResult.Split.RightDesc.RangeID) + oldHS, err := rsl.LoadHardState(ctx, batch) + if err != nil { + return storagepb.ReplicatedEvalResult{}, errors.Wrap(err, "unable to load HardState") + } + assertHS = &oldHS + } + + if assertHS != nil { + // Load the HardState that was just committed (if any). + rsl := stateloader.Make(rResult.Split.RightDesc.RangeID) + newHS, err := rsl.LoadHardState(ctx, batch) + if err != nil { + return storagepb.ReplicatedEvalResult{}, errors.Wrap(err, "unable to load HardState") + } + // Assert that nothing moved "backwards". + if newHS.Term < assertHS.Term || (newHS.Term == assertHS.Term && newHS.Commit < assertHS.Commit) { + log.Fatalf(ctx, "clobbered HardState: %s\n\npreviously: %s\noverwritten with: %s", + pretty.Diff(newHS, *assertHS), pretty.Sprint(*assertHS), pretty.Sprint(newHS)) + } + } + + elapsed := timeutil.Since(start) + r.store.metrics.RaftCommandCommitLatency.RecordValue(elapsed.Nanoseconds()) + rResult.Delta = deltaStats.ToStatsDelta() + return rResult, nil +} + +// handleTrivialReplicatedEvalResult applies the trivial portions of rResult to +// the supplied state value and returns whether the change implied an updated to +// the replica's truncated state. This function modifies replicaState but does +// not modify rResult in order to give the TestingPostApplyFilter testing knob +// an opportunity to inspect the command's ReplicatedEvalResult. +func applyTrivialReplicatedEvalResult( + ctx context.Context, + rResult *storagepb.ReplicatedEvalResult, + raftAppliedIndex, leaseAppliedIndex uint64, + replicaState *storagepb.ReplicaState, +) (truncatedStateUpdated bool) { + deltaStats := rResult.Delta.ToStats() + replicaState.Stats.Add(deltaStats) + if raftAppliedIndex != 0 { + replicaState.RaftAppliedIndex = raftAppliedIndex + } + if leaseAppliedIndex != 0 { + replicaState.LeaseAppliedIndex = leaseAppliedIndex + } + haveState := rResult.State != nil + truncatedStateUpdated = haveState && rResult.State.TruncatedState != nil + if truncatedStateUpdated { + replicaState.TruncatedState = rResult.State.TruncatedState + } + return truncatedStateUpdated +} + +// applyBatch handles the logic of writing a batch to the storage engine and +// applying it to the Replica state machine. Upon success this method clears b +// for reuse before returning. +func (r *Replica) applyBatch( + ctx context.Context, b *entryApplicationBatch, stats *handleRaftReadyStats, +) (expl string, err error) { + defer b.reset() + if log.V(4) { + log.Infof(ctx, "flushing batch %v of %d entries", b.replicaState, b.appStates.len) + } + // Entry application is not done without syncing to disk. + // The atomicity guarantees of the batch and the fact that the applied state + // is stored in this batch, ensure that if the batch ends up not being durably + // committed then the entries in this batch will be applied again upon + // startup. + if err := b.batch.Commit(false); err != nil { + log.Fatalf(ctx, "failed to commit Raft entry batch: %v", err) + } + b.batch.Close() + b.batch = nil + // NB: we compute the triviality of the batch here again rather than storing + // it as it may have changed during processing. In particular, the upgrade of + // applied state will have already occurred. + var batchIsNonTrivial bool + // Non-trivial entries are always last in a batch + if s := b.appStates.last(); !isTrivial(s.rResult(), b.replicaState.UsingAppliedStateKey) { + batchIsNonTrivial = true + // Deal with locking sometimes associated with complex commands. + if unlock := s.splitMergeUnlock; unlock != nil { + defer unlock(s.rResult()) + s.splitMergeUnlock = nil + } + if s.rResult().BlockReads { + r.readOnlyCmdMu.Lock() + defer r.readOnlyCmdMu.Unlock() + s.rResult().BlockReads = false + } + } + // Now that the batch is committed we can go about applying the side effects + // of the update to the truncated state. Note that this is safe only if the + // new truncated state is durably on disk (i.e.) synced. + var truncationDelta int64 + if b.updatedTruncatedState { + truncState := b.replicaState.TruncatedState + r.store.raftEntryCache.Clear(r.RangeID, truncState.Index+1) + // Truncate the sideloaded storage. This is true at the time of writing but unfortunately + // could rot. + { + log.VEventf(ctx, 1, "truncating sideloaded storage up to (and including) index %d", truncState.Index) + if truncationDelta, _, err = r.raftMu.sideloaded.TruncateTo(ctx, truncState.Index+1); err != nil { + // We don't *have* to remove these entries for correctness. Log a + // loud error, but keep humming along. + log.Errorf(ctx, "while removing sideloaded files during log truncation: %s", err) + } + } + } + applyRaftLogDelta := func(raftLogDelta int64) { + if raftLogDelta == 0 { + return + } + r.mu.raftLogSize += raftLogDelta + if r.mu.raftLogSize < 0 { + r.mu.raftLogSize = 0 + } + r.mu.raftLogLastCheckSize += raftLogDelta + if r.mu.raftLogLastCheckSize < 0 { + r.mu.raftLogLastCheckSize = 0 + } + } + // deltaStats will store the delta from the current state to the new state + // which will be used to update the metrics. + r.mu.Lock() + r.mu.state.RaftAppliedIndex = b.replicaState.RaftAppliedIndex + r.mu.state.LeaseAppliedIndex = b.replicaState.LeaseAppliedIndex + prevStats := *r.mu.state.Stats + *r.mu.state.Stats = *b.replicaState.Stats + // Iterate through the commands and their replicated eval results to apply + // their raft log deltas. + // Finally apply the truncation delta. + // Store the queuing conditions + var it entryApplicationStateBufIterator + for ok := it.init(&b.appStates); ok; ok = it.next() { + s := it.state() + applyRaftLogDelta(s.rResult().RaftLogDelta) + s.rResult().RaftLogDelta = 0 + } + applyRaftLogDelta(-1 * truncationDelta) + checkRaftLog := r.mu.raftLogSize-r.mu.raftLogLastCheckSize >= RaftLogQueueStaleSize + needsSplitBySize := r.needsSplitBySizeRLocked() + needsMergeBySize := r.needsMergeBySizeRLocked() + if b.updatedTruncatedState { + r.mu.state.TruncatedState = b.replicaState.TruncatedState + } + r.mu.Unlock() + deltaStats := *b.replicaState.Stats + deltaStats.Subtract(prevStats) + r.store.metrics.addMVCCStats(deltaStats) + // NB: the bootstrap store has a nil split queue. + // TODO(tbg): the above is probably a lie now. + if r.store.splitQueue != nil && needsSplitBySize && r.splitQueueThrottle.ShouldProcess(timeutil.Now()) { + r.store.splitQueue.MaybeAddAsync(ctx, r, r.store.Clock().Now()) + } + // The bootstrap store has a nil merge queue. + // TODO(tbg): the above is probably a lie now. + if r.store.mergeQueue != nil && needsMergeBySize && r.mergeQueueThrottle.ShouldProcess(timeutil.Now()) { + // TODO(tbg): for ranges which are small but protected from merges by + // other means (zone configs etc), this is called on every command, and + // fires off a goroutine each time. Make this trigger (and potentially + // the split one above, though it hasn't been observed to be as + // bothersome) less aggressive. + r.store.mergeQueue.MaybeAddAsync(ctx, r, r.store.Clock().Now()) + } + if checkRaftLog { + r.store.raftLogQueue.MaybeAddAsync(ctx, r, r.store.Clock().Now()) + } + for ok := it.init(&b.appStates); ok; ok = it.next() { + s := it.state() + rResult := s.rResult() + for _, sc := range rResult.SuggestedCompactions { + r.store.compactor.Suggest(s.ctx, sc) + } + rResult.SuggestedCompactions = nil + } + + for ok := it.init(&b.appStates); ok; ok = it.next() { + s := it.state() + // Set up the local result prior to handling the ReplicatedEvalResult to + // give testing knobs an opportunity to inspect it. + r.prepareLocalResult(s.ctx, s) + // Handle the ReplicatedEvalResult, executing any side effects of the last + // state machine transition. + // + // Note that this must happen after committing (the engine.Batch), but + // before notifying a potentially waiting client. + clearTrivialReplicatedEvalResultFields(s.rResult(), b.replicaState.UsingAppliedStateKey) + isNonTrivialEntry := batchIsNonTrivial && it.isLast() + if isNonTrivialEntry && !r.handleComplexReplicatedEvalResult(s.ctx, *s.rResult()) { + panic("non-trivial batch did not require state assertion") + } + if !isNonTrivialEntry && !s.rResult().Equal(storagepb.ReplicatedEvalResult{}) { + panic(fmt.Errorf("failed to handle all side-effects of ReplicatedEvalResult: %v", s.rResult())) + } + + // NB: Perform state assertion before acknowledging the client. + // Some tests (TestRangeStatsInit) assumes that once the store has started + // and the first range has a lease that there will not be a later hard-state. + if isNonTrivialEntry { + // Assert that the on-disk state doesn't diverge from the in-memory + // state as a result of the side effects. + r.mu.Lock() + r.assertStateLocked(ctx, r.store.Engine()) + r.mu.Unlock() + } + + if s.localResult != nil { + r.handleLocalEvalResult(s.ctx, *s.localResult) + } + r.finishRaftCommand(s.ctx, s) + stats.processed++ + switch s.e.Type { + case raftpb.EntryNormal: + if s.rResult().ChangeReplicas != nil { + log.Fatalf(s.ctx, "unexpected replication change from command %s", &s.raftCmd) + } + case raftpb.EntryConfChange: + if s.rResult().ChangeReplicas == nil { + s.cc = raftpb.ConfChange{} + } + if err := r.withRaftGroup(true, func(raftGroup *raft.RawNode) (bool, error) { + raftGroup.ApplyConfChange(s.cc) + return true, nil + }); err != nil { + const expl = "during ApplyConfChange" + return expl, errors.Wrap(err, expl) + } + } + + } + return "", nil +} + +// clearTrivialReplicatedEvalResultFields is used to zero out the fields of a +// ReplicatedEvalResult that have already been consumed when staging the +// corresponding command and applying it to the current batch's view of the +// ReplicaState. This function is called after a batch has been written to the +// storage engine. For trivial commands this function should result is a zero +// value rResult. +func clearTrivialReplicatedEvalResultFields( + rResult *storagepb.ReplicatedEvalResult, usingAppliedStateKey bool, +) { + // Fields for which no action is taken in this method are zeroed so that + // they don't trigger an assertion at the end of the application process + // (which checks that all fields were handled). + rResult.IsLeaseRequest = false + rResult.Timestamp = hlc.Timestamp{} + rResult.PrevLeaseProposal = nil + // The state fields cleared here were already applied to the in-memory view of + // replica state for this batch. + if haveState := rResult.State != nil; haveState { + rResult.State.Stats = nil + rResult.State.TruncatedState = nil + + // If we're already using the AppliedStateKey then there's nothing + // to do. This flag is idempotent so it's ok that we see this flag + // multiple times, but we want to make sure it doesn't cause us to + // perform repeated state assertions, so clear it before the + // shouldAssert determination. + // A reader might wonder if using the value of usingAppliedState key from + // after applying an entire batch is valid to determine whether this command + // implied a transition, but if it had implied a transition then the batch + // would not have been considered trivial and the current view of will still + // be false as complex state transitions are handled after this call. + if usingAppliedStateKey { + rResult.State.UsingAppliedStateKey = false + } + // ReplicaState.Stats was previously non-nullable which caused nodes to + // send a zero-value MVCCStats structure. If the proposal was generated by + // an old node, we'll have decoded that zero-value structure setting + // ReplicaState.Stats to a non-nil value which would trigger the "unhandled + // field in ReplicatedEvalResult" assertion to fire if we didn't clear it. + if rResult.State.Stats != nil && (*rResult.State.Stats == enginepb.MVCCStats{}) { + rResult.State.Stats = nil + } + if *rResult.State == (storagepb.ReplicaState{}) { + rResult.State = nil + } + } + rResult.Delta = enginepb.MVCCStatsDelta{} +} + +func (r *Replica) handleComplexReplicatedEvalResult( + ctx context.Context, rResult storagepb.ReplicatedEvalResult, +) (shouldAssert bool) { + + // The rest of the actions are "nontrivial" and may have large effects on the + // in-memory and on-disk ReplicaStates. If any of these actions are present, + // we want to assert that these two states do not diverge. + shouldAssert = !rResult.Equal(storagepb.ReplicatedEvalResult{}) + + // Process Split or Merge. This needs to happen after stats update because + // of the ContainsEstimates hack. + if rResult.Split != nil { + splitPostApply( + r.AnnotateCtx(ctx), + rResult.Split.RHSDelta, + &rResult.Split.SplitTrigger, + r, + ) + rResult.Split = nil + } + + if rResult.Merge != nil { + if err := r.store.MergeRange( + ctx, r, rResult.Merge.LeftDesc, rResult.Merge.RightDesc, rResult.Merge.FreezeStart, + ); err != nil { + // Our in-memory state has diverged from the on-disk state. + log.Fatalf(ctx, "failed to update store after merging range: %s", err) + } + rResult.Merge = nil + } + + // Update the remaining ReplicaState. + + if rResult.State != nil { + if newDesc := rResult.State.Desc; newDesc != nil { + r.setDesc(ctx, newDesc) + rResult.State.Desc = nil + } + + if newLease := rResult.State.Lease; newLease != nil { + r.leasePostApply(ctx, *newLease, false /* permitJump */) + rResult.State.Lease = nil + } + + if newThresh := rResult.State.GCThreshold; newThresh != nil { + if (*newThresh != hlc.Timestamp{}) { + r.mu.Lock() + r.mu.state.GCThreshold = newThresh + r.mu.Unlock() + } + rResult.State.GCThreshold = nil + } + + if newThresh := rResult.State.TxnSpanGCThreshold; newThresh != nil { + if (*newThresh != hlc.Timestamp{}) { + r.mu.Lock() + r.mu.state.TxnSpanGCThreshold = newThresh + r.mu.Unlock() + } + rResult.State.TxnSpanGCThreshold = nil + } + + if rResult.State.UsingAppliedStateKey { + r.mu.Lock() + r.mu.state.UsingAppliedStateKey = true + r.mu.Unlock() + rResult.State.UsingAppliedStateKey = false + } + + if (*rResult.State == storagepb.ReplicaState{}) { + rResult.State = nil + } + } + + if change := rResult.ChangeReplicas; change != nil { + if change.ChangeType == roachpb.REMOVE_REPLICA && + r.store.StoreID() == change.Replica.StoreID { + // This wants to run as late as possible, maximizing the chances + // that the other nodes have finished this command as well (since + // processing the removal from the queue looks up the Range at the + // lease holder, being too early here turns this into a no-op). + // Lock ordering dictates that we don't hold any mutexes when adding, + // so we fire it off in a task. + r.store.replicaGCQueue.AddAsync(ctx, r, replicaGCPriorityRemoved) + } + rResult.ChangeReplicas = nil + } + + if rResult.ComputeChecksum != nil { + r.computeChecksumPostApply(ctx, *rResult.ComputeChecksum) + rResult.ComputeChecksum = nil + } + + if !rResult.Equal(storagepb.ReplicatedEvalResult{}) { + log.Fatalf(ctx, "unhandled field in ReplicatedEvalResult: %s", pretty.Diff(rResult, storagepb.ReplicatedEvalResult{})) + } + return shouldAssert +} + +// prepareLocalResultis performed after the command has been committed to the +// engine but before its side-effects have been applied to the Replica's +// in-memory state. This method gives the command an opportunity to interact +// with testing knobs and to set up its local result if it was proposed +// locally. This is performed prior to handling the command's +// ReplicatedEvalResult because the process of handling the replicated eval +// result will zero-out the struct to ensure that is has properly performed all +// of the implied side-effects. +func (r *Replica) prepareLocalResult(ctx context.Context, s *entryApplicationState) { + var pErr *roachpb.Error + if filter := r.store.cfg.TestingKnobs.TestingPostApplyFilter; filter != nil { + var newPropRetry int + newPropRetry, pErr = filter(storagebase.ApplyFilterArgs{ + CmdID: s.idKey, + ReplicatedEvalResult: *s.rResult(), + StoreID: r.store.StoreID(), + RangeID: r.RangeID, + }) + if s.proposalRetry == 0 { + s.proposalRetry = proposalReevaluationReason(newPropRetry) + } + // calling maybeSetCorrupt here is mostly for tests and looks. The + // interesting errors originate in applyRaftCommandToBatch, and they are + // already handled above. + pErr = r.maybeSetCorrupt(ctx, pErr) + } + if pErr == nil { + pErr = s.forcedErr + } + + if s.proposedLocally() { + if s.proposalRetry != proposalNoReevaluation && pErr == nil { + log.Fatalf(ctx, "proposal with nontrivial retry behavior, but no error: %+v", s.proposal) + } + if pErr != nil { + // A forced error was set (i.e. we did not apply the proposal, + // for instance due to its log position) or the Replica is now + // corrupted. + // If proposalRetry is set, we don't also return an error, as per the + // proposalResult contract. + if s.proposalRetry == proposalNoReevaluation { + s.response.Err = pErr + } + } else if s.proposal.Local.Reply != nil { + s.response.Reply = s.proposal.Local.Reply + } else { + log.Fatalf(ctx, "proposal must return either a reply or an error: %+v", s.proposal) + } + s.response.Intents = s.proposal.Local.DetachIntents() + s.response.EndTxns = s.proposal.Local.DetachEndTxns(pErr != nil) + if pErr == nil { + s.localResult = s.proposal.Local + } + } + if pErr != nil && s.localResult != nil { + log.Fatalf(ctx, "shouldn't have a local result if command processing failed. pErr: %s", pErr) + } + if log.ExpensiveLogEnabled(ctx, 2) { + log.VEvent(ctx, 2, s.localResult.String()) + } +} + +// finishRaftCommand is called after a command's side effects have been applied +// in order to acknowledge clients and release latches. +func (r *Replica) finishRaftCommand( + ctx context.Context, s *entryApplicationState, +) (changedReplicas bool) { + + // Provide the command's corresponding logical operations to the + // Replica's rangefeed. Only do so if the WriteBatch is nonnil, + // otherwise it's valid for the logical op log to be nil, which + // would shut down all rangefeeds. If no rangefeed is running, + // this call will be a noop. + if s.raftCmd.WriteBatch != nil { + r.handleLogicalOpLogRaftMuLocked(ctx, s.raftCmd.LogicalOpLog) + } else if s.raftCmd.LogicalOpLog != nil { + log.Fatalf(ctx, "nonnil logical op log with nil write batch: %v", s.raftCmd) + } + + // When set to true, recomputes the stats for the LHS and RHS of splits and + // makes sure that they agree with the state's range stats. + const expensiveSplitAssertion = false + + if expensiveSplitAssertion && s.rResult().Split != nil { + split := s.rResult().Split + lhsStatsMS := r.GetMVCCStats() + lhsComputedMS, err := rditer.ComputeStatsForRange(&split.LeftDesc, r.store.Engine(), lhsStatsMS.LastUpdateNanos) + if err != nil { + log.Fatal(ctx, err) + } + + rightReplica, err := r.store.GetReplica(split.RightDesc.RangeID) + if err != nil { + log.Fatal(ctx, err) + } + + rhsStatsMS := rightReplica.GetMVCCStats() + rhsComputedMS, err := rditer.ComputeStatsForRange(&split.RightDesc, r.store.Engine(), rhsStatsMS.LastUpdateNanos) + if err != nil { + log.Fatal(ctx, err) + } + + if diff := pretty.Diff(lhsStatsMS, lhsComputedMS); len(diff) > 0 { + log.Fatalf(ctx, "LHS split stats divergence: diff(claimed, computed) = %s", pretty.Diff(lhsStatsMS, lhsComputedMS)) + } + if diff := pretty.Diff(rhsStatsMS, rhsComputedMS); len(diff) > 0 { + log.Fatalf(ctx, "RHS split stats divergence diff(claimed, computed) = %s", pretty.Diff(rhsStatsMS, rhsComputedMS)) + } + } + + if s.proposedLocally() { + // If we failed to apply at the right lease index, try again with + // a new one. This is important for pipelined writes, since they + // don't have a client watching to retry, so a failure to + // eventually apply the proposal would be a uservisible error. + // TODO(nvanbenschoten): This reproposal is not tracked by the + // quota pool. We should fix that. + if s.proposalRetry == proposalIllegalLeaseIndex && r.tryReproposeWithNewLeaseIndex(s.proposal) { + return false + } + // Otherwise, signal the command's status to the client. + s.proposal.finishApplication(s.response) + } else if s.response.Err != nil { + log.VEventf(ctx, 1, "applying raft command resulted in error: %s", s.response.Err) + } + + return s.rResult().ChangeReplicas != nil +} + +// entryApplicationState stores the state required to apply a single raft +// entry to a replica. The state is accumulated in stages which occur in +// Replica.handleCommittedEntriesRaftMuLocked. From a high level, a command is +// decoded into an entryApplicationBatch, then if it was proposed locally the +// proposal is populated from the replica's proposals map, then the command +// is staged into the batch by writing its update to the batch's engine.Batch +// and applying its "trivial" side-effects to the batch's view of ReplicaState. +// Then the batch is committed, the side-effects are applied and the local +// result is processed. +type entryApplicationState struct { + // ctx is initialized to a non-nil value but may be overwritten with the + // proposal's context if this entry was proposed locally. + ctx context.Context + // e is the Entry being applied. + e *raftpb.Entry + + // These four fields are decoded from e. + idKey storagebase.CmdIDKey + raftCmd storagepb.RaftCommand + cc raftpb.ConfChange // only populated for conf changes + ccCtx ConfChangeContext // only populated for conf changes + + // proposal is populated on the proposing Replica only and comes from the + // Replica's proposal map. + proposal *ProposalData + + // The below fields are set during stageRaftCommand when we validate that + // a command applies given the current lease in checkForcedErr. + leaseIndex uint64 + forcedErr *roachpb.Error + proposalRetry proposalReevaluationReason + // splitMergeUnlock is acquired for splits and merges. + splitMergeUnlock func(*storagepb.ReplicatedEvalResult) + + // The below fields are set after the data has been written to the storage + // engine in prepareLocalResult. + localResult *result.LocalResult + response proposalResult +} + +func (s *entryApplicationState) rResult() *storagepb.ReplicatedEvalResult { + return &s.raftCmd.ReplicatedEvalResult +} + +func (s *entryApplicationState) proposedLocally() bool { + return s.proposal != nil +} + +// decode initialized the entryApplicationState with ctx and e and then +// decodes the entry into the decoded fields of s. +func (s *entryApplicationState) decode( + ctx context.Context, e *raftpb.Entry, +) (expl string, err error) { + *s = entryApplicationState{ctx: ctx, e: e} + // etcd raft sometimes inserts nil commands, ours are never nil. + // This case is handled upstream of this call. + if len(s.e.Data) == 0 { + return "", nil + } + switch s.e.Type { + case raftpb.EntryNormal: + return s.decodeNormalEntry(s.e) + case raftpb.EntryConfChange: + return s.decodeConfChangeEntry(s.e) + default: + log.Fatalf(s.ctx, "unexpected Raft entry: %v", s.e) + return "", nil // unreachable + } +} + +func (s *entryApplicationState) decodeNormalEntry(e *raftpb.Entry) (expl string, err error) { + var encodedCommand []byte + s.idKey, encodedCommand = DecodeRaftCommand(e.Data) + // An empty command is used to unquiesce a range and wake the + // leader. Clear commandID so it's ignored for processing. + if len(encodedCommand) == 0 { + s.idKey = "" + } else if err := protoutil.Unmarshal(encodedCommand, &s.raftCmd); err != nil { + const expl = "while unmarshalling entry" + return expl, errors.Wrap(err, expl) + } + return "", nil +} + +func (s *entryApplicationState) decodeConfChangeEntry(e *raftpb.Entry) (expl string, err error) { + if err := protoutil.Unmarshal(e.Data, &s.cc); err != nil { + const expl = "while unmarshaling ConfChange" + return expl, errors.Wrap(err, expl) + } + if err := protoutil.Unmarshal(s.cc.Context, &s.ccCtx); err != nil { + const expl = "while unmarshaling ConfChangeContext" + return expl, errors.Wrap(err, expl) + } + if err := protoutil.Unmarshal(s.ccCtx.Payload, &s.raftCmd); err != nil { + const expl = "while unmarshaling RaftCommand" + return expl, errors.Wrap(err, expl) + } + s.idKey = storagebase.CmdIDKey(s.ccCtx.CommandID) + return "", nil +} + +// entryApplicationBatch accumulates state due to the application of raft +// commands. Committed raft commands are applied to the batch in a multi-stage +// process whereby individual commands are decoded, prepared for application +// relative to the current view of replicaState, committed to the storage +// engine, applied to the Replica's in-memory state and then finished by +// releasing their latches and notifying clients. +type entryApplicationBatch struct { + batch engine.Batch + replicaState storagepb.ReplicaState + // stats is stored on the application batch to avoid an allocation in tracking + // the batch's view of replicaState. All pointer fields in replicaState other + // than Stats are overwritten completely rather than updated in-place. + stats enginepb.MVCCStats + + updatedTruncatedState bool + appStates entryApplicationStateBuf +} + +// entryApplicationBatch structs are needed to apply raft commands, which is to +// say, frequently, so best to pool them rather than allocated under the raftMu. +var entryApplicationBatchSyncPool = sync.Pool{ + New: func() interface{} { + return new(entryApplicationBatch) + }, +} + +func getEntryApplicationBatch() *entryApplicationBatch { + return entryApplicationBatchSyncPool.Get().(*entryApplicationBatch) +} + +func releaseEntryApplicationBatch(b *entryApplicationBatch) { + b.appStates.destroy() + *b = entryApplicationBatch{} + entryApplicationBatchSyncPool.Put(b) +} + +// decode commands into the batch until a non-trivial entry is found or all of +// toProcess has been added to the batch. sawEmptyEntry is true if the consumed +// portion of toProcess contained a command with a zero byte Data field which +// indicates that pending proposals need to be resubmitted. +func (b *entryApplicationBatch) decode( + ctx context.Context, toProcess []raftpb.Entry, +) (sawEmptyEntry bool, remaining []raftpb.Entry, expl string, err error) { + for i := range toProcess { + e := &toProcess[i] + if len(e.Data) == 0 { + sawEmptyEntry = true + } + // Allocate a *entryApplicationState for this entry. + s := b.appStates.allocate() + if expl, err := s.decode(ctx, e); err != nil { + return sawEmptyEntry, nil, expl, err + } + // This is a non-trivial entry which needs to be processed last in this + // batch. + if !isTrivial(s.rResult(), b.replicaState.UsingAppliedStateKey) { + return sawEmptyEntry, toProcess[i+1:], "", nil + } + } + return sawEmptyEntry, nil, "", nil +} + +// isTrivial determines whether the side-effects of a ReplicatedEvalResult are +// "trivial", which implies that it can be applied in a batch with other trivial +// commands. A result is fundamentally considered "trivial" if it does not have +// side effects which rely on the durable state of the replica exactly matching +// the in-memory state of the replica at the corresponding log position. +// +// At the time of writing it is possible that the current conditions are too +// strict but they are certainly sufficient. +func isTrivial(r *storagepb.ReplicatedEvalResult, usingStateAppliedKey bool) bool { + return !r.IsLeaseRequest && + !r.BlockReads && + r.Split == nil && + r.Merge == nil && + r.ComputeChecksum == nil && + r.ChangeReplicas == nil && + r.AddSSTable == nil && + (r.State == nil || + (r.State.GCThreshold == nil && + r.State.TxnSpanGCThreshold == nil && + r.State.Lease == nil && + r.State.Desc == nil && + // An entry is non-trivial if an upgrade to UsingAppliedState is + // required. If we're already usingStateAppliedKey or this entry does + // not imply an upgrade then it is trivial. + (usingStateAppliedKey || !r.State.UsingAppliedStateKey))) +} + +func (b *entryApplicationBatch) reset() { + b.appStates.truncate() + b.batch = nil + b.updatedTruncatedState = false + // TODO(ajwerner): could clear the replicaState and stats but it's not needed + // for correctness. +} diff --git a/pkg/storage/replica_proposal.go b/pkg/storage/replica_proposal.go index bac303278ff4..712925cd66fa 100644 --- a/pkg/storage/replica_proposal.go +++ b/pkg/storage/replica_proposal.go @@ -531,254 +531,6 @@ func addSSTablePreApply( return copied } -func (r *Replica) handleReplicatedEvalResult( - ctx context.Context, - rResult storagepb.ReplicatedEvalResult, - raftAppliedIndex, leaseAppliedIndex uint64, -) (shouldAssert bool) { - // Fields for which no action is taken in this method are zeroed so that - // they don't trigger an assertion at the end of the method (which checks - // that all fields were handled). - { - rResult.IsLeaseRequest = false - rResult.Timestamp = hlc.Timestamp{} - rResult.PrevLeaseProposal = nil - } - - if rResult.BlockReads { - r.readOnlyCmdMu.Lock() - defer r.readOnlyCmdMu.Unlock() - rResult.BlockReads = false - } - - // Update MVCC stats and Raft portion of ReplicaState. - deltaStats := rResult.Delta.ToStats() - r.mu.Lock() - r.mu.state.Stats.Add(deltaStats) - if raftAppliedIndex != 0 { - r.mu.state.RaftAppliedIndex = raftAppliedIndex - } - if leaseAppliedIndex != 0 { - r.mu.state.LeaseAppliedIndex = leaseAppliedIndex - } - needsSplitBySize := r.needsSplitBySizeRLocked() - needsMergeBySize := r.needsMergeBySizeRLocked() - r.mu.Unlock() - - r.store.metrics.addMVCCStats(deltaStats) - rResult.Delta = enginepb.MVCCStatsDelta{} - - // NB: the bootstrap store has a nil split queue. - // TODO(tbg): the above is probably a lie now. - if r.store.splitQueue != nil && needsSplitBySize && r.splitQueueThrottle.ShouldProcess(timeutil.Now()) { - r.store.splitQueue.MaybeAddAsync(ctx, r, r.store.Clock().Now()) - } - - // The bootstrap store has a nil merge queue. - // TODO(tbg): the above is probably a lie now. - if r.store.mergeQueue != nil && needsMergeBySize && r.mergeQueueThrottle.ShouldProcess(timeutil.Now()) { - // TODO(tbg): for ranges which are small but protected from merges by - // other means (zone configs etc), this is called on every command, and - // fires off a goroutine each time. Make this trigger (and potentially - // the split one above, though it hasn't been observed to be as - // bothersome) less aggressive. - r.store.mergeQueue.MaybeAddAsync(ctx, r, r.store.Clock().Now()) - } - - // The above are always present. The following are not always present but - // should not trigger a ReplicaState assertion because they are either too - // frequent to do so or because they do not change the ReplicaState. - - if rResult.State != nil { - // Raft log truncation is too frequent to justify a replica state - // assertion. - if newTruncState := rResult.State.TruncatedState; newTruncState != nil { - rResult.State.TruncatedState = nil // for assertion - - r.mu.Lock() - r.mu.state.TruncatedState = newTruncState - r.mu.Unlock() - - // Clear any entries in the Raft log entry cache for this range up - // to and including the most recently truncated index. - r.store.raftEntryCache.Clear(r.RangeID, newTruncState.Index+1) - - // Truncate the sideloaded storage. Note that this is safe only if the new truncated state - // is durably on disk (i.e.) synced. This is true at the time of writing but unfortunately - // could rot. - { - log.Eventf(ctx, "truncating sideloaded storage up to (and including) index %d", newTruncState.Index) - if size, _, err := r.raftMu.sideloaded.TruncateTo(ctx, newTruncState.Index+1); err != nil { - // We don't *have* to remove these entries for correctness. Log a - // loud error, but keep humming along. - log.Errorf(ctx, "while removing sideloaded files during log truncation: %s", err) - } else { - rResult.RaftLogDelta -= size - } - } - } - - // ReplicaState.Stats was previously non-nullable which caused nodes to - // send a zero-value MVCCStats structure. If the proposal was generated by - // an old node, we'll have decoded that zero-value structure setting - // ReplicaState.Stats to a non-nil value which would trigger the "unhandled - // field in ReplicatedEvalResult" assertion to fire if we didn't clear it. - if rResult.State.Stats != nil && (*rResult.State.Stats == enginepb.MVCCStats{}) { - rResult.State.Stats = nil - } - - if rResult.State.UsingAppliedStateKey { - r.mu.Lock() - // If we're already using the AppliedStateKey then there's nothing - // to do. This flag is idempotent so it's ok that we see this flag - // multiple times, but we want to make sure it doesn't cause us to - // perform repeated state assertions, so clear it before the - // shouldAssert determination. - if r.mu.state.UsingAppliedStateKey { - rResult.State.UsingAppliedStateKey = false - } - r.mu.Unlock() - } - - if (*rResult.State == storagepb.ReplicaState{}) { - rResult.State = nil - } - } - - if rResult.RaftLogDelta != 0 { - r.mu.Lock() - r.mu.raftLogSize += rResult.RaftLogDelta - r.mu.raftLogLastCheckSize += rResult.RaftLogDelta - // Ensure raftLog{,LastCheck}Size is not negative since it isn't persisted - // between server restarts. - if r.mu.raftLogSize < 0 { - r.mu.raftLogSize = 0 - } - if r.mu.raftLogLastCheckSize < 0 { - r.mu.raftLogLastCheckSize = 0 - } - r.mu.Unlock() - rResult.RaftLogDelta = 0 - } else { - // Check for whether to queue the range for Raft log truncation if this is - // not a Raft log truncation command itself. We don't want to check the - // Raft log for truncation on every write operation or even every operation - // which occurs after the Raft log exceeds RaftLogQueueStaleSize. The logic - // below queues the replica for possible Raft log truncation whenever an - // additional RaftLogQueueStaleSize bytes have been written to the Raft - // log. - r.mu.Lock() - checkRaftLog := r.mu.raftLogSize-r.mu.raftLogLastCheckSize >= RaftLogQueueStaleSize - if checkRaftLog { - r.mu.raftLogLastCheckSize = r.mu.raftLogSize - } - r.mu.Unlock() - if checkRaftLog { - r.store.raftLogQueue.MaybeAddAsync(ctx, r, r.store.Clock().Now()) - } - } - - for _, sc := range rResult.SuggestedCompactions { - r.store.compactor.Suggest(ctx, sc) - } - rResult.SuggestedCompactions = nil - - // The rest of the actions are "nontrivial" and may have large effects on the - // in-memory and on-disk ReplicaStates. If any of these actions are present, - // we want to assert that these two states do not diverge. - shouldAssert = !rResult.Equal(storagepb.ReplicatedEvalResult{}) - - // Process Split or Merge. This needs to happen after stats update because - // of the ContainsEstimates hack. - - if rResult.Split != nil { - splitPostApply( - r.AnnotateCtx(ctx), - rResult.Split.RHSDelta, - &rResult.Split.SplitTrigger, - r, - ) - rResult.Split = nil - } - - if rResult.Merge != nil { - if err := r.store.MergeRange( - ctx, r, rResult.Merge.LeftDesc, rResult.Merge.RightDesc, rResult.Merge.FreezeStart, - ); err != nil { - // Our in-memory state has diverged from the on-disk state. - log.Fatalf(ctx, "failed to update store after merging range: %s", err) - } - rResult.Merge = nil - } - - // Update the remaining ReplicaState. - - if rResult.State != nil { - if newDesc := rResult.State.Desc; newDesc != nil { - r.setDesc(ctx, newDesc) - rResult.State.Desc = nil - } - - if newLease := rResult.State.Lease; newLease != nil { - r.leasePostApply(ctx, *newLease, false /* permitJump */) - rResult.State.Lease = nil - } - - if newThresh := rResult.State.GCThreshold; newThresh != nil { - if (*newThresh != hlc.Timestamp{}) { - r.mu.Lock() - r.mu.state.GCThreshold = newThresh - r.mu.Unlock() - } - rResult.State.GCThreshold = nil - } - - if newThresh := rResult.State.TxnSpanGCThreshold; newThresh != nil { - if (*newThresh != hlc.Timestamp{}) { - r.mu.Lock() - r.mu.state.TxnSpanGCThreshold = newThresh - r.mu.Unlock() - } - rResult.State.TxnSpanGCThreshold = nil - } - - if rResult.State.UsingAppliedStateKey { - r.mu.Lock() - r.mu.state.UsingAppliedStateKey = true - r.mu.Unlock() - rResult.State.UsingAppliedStateKey = false - } - - if (*rResult.State == storagepb.ReplicaState{}) { - rResult.State = nil - } - } - - if change := rResult.ChangeReplicas; change != nil { - if change.ChangeType == roachpb.REMOVE_REPLICA && - r.store.StoreID() == change.Replica.StoreID { - // This wants to run as late as possible, maximizing the chances - // that the other nodes have finished this command as well (since - // processing the removal from the queue looks up the Range at the - // lease holder, being too early here turns this into a no-op). - // Lock ordering dictates that we don't hold any mutexes when adding, - // so we fire it off in a task. - r.store.replicaGCQueue.AddAsync(ctx, r, replicaGCPriorityRemoved) - } - rResult.ChangeReplicas = nil - } - - if rResult.ComputeChecksum != nil { - r.computeChecksumPostApply(ctx, *rResult.ComputeChecksum) - rResult.ComputeChecksum = nil - } - - if !rResult.Equal(storagepb.ReplicatedEvalResult{}) { - log.Fatalf(ctx, "unhandled field in ReplicatedEvalResult: %s", pretty.Diff(rResult, storagepb.ReplicatedEvalResult{})) - } - return shouldAssert -} - func (r *Replica) handleLocalEvalResult(ctx context.Context, lResult result.LocalResult) { // Fields for which no action is taken in this method are zeroed so that // they don't trigger an assertion at the end of the method (which checks @@ -858,25 +610,6 @@ func (r *Replica) handleLocalEvalResult(ctx context.Context, lResult result.Loca } } -func (r *Replica) handleEvalResultRaftMuLocked( - ctx context.Context, - lResult *result.LocalResult, - rResult storagepb.ReplicatedEvalResult, - raftAppliedIndex, leaseAppliedIndex uint64, -) { - shouldAssert := r.handleReplicatedEvalResult(ctx, rResult, raftAppliedIndex, leaseAppliedIndex) - if lResult != nil { - r.handleLocalEvalResult(ctx, *lResult) - } - if shouldAssert { - // Assert that the on-disk state doesn't diverge from the in-memory - // state as a result of the side effects. - r.mu.Lock() - r.assertStateLocked(ctx, r.store.Engine()) - r.mu.Unlock() - } -} - // proposalResult indicates the result of a proposal. Exactly one of // Reply and Err is set, and it represents the result of the proposal. type proposalResult struct { diff --git a/pkg/storage/replica_raft.go b/pkg/storage/replica_raft.go index e55999d4ca7a..049102d8c01d 100644 --- a/pkg/storage/replica_raft.go +++ b/pkg/storage/replica_raft.go @@ -20,10 +20,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/keys" "github.com/cockroachdb/cockroach/pkg/roachpb" "github.com/cockroachdb/cockroach/pkg/settings/cluster" - "github.com/cockroachdb/cockroach/pkg/storage/batcheval/result" "github.com/cockroachdb/cockroach/pkg/storage/engine" - "github.com/cockroachdb/cockroach/pkg/storage/engine/enginepb" - "github.com/cockroachdb/cockroach/pkg/storage/rditer" "github.com/cockroachdb/cockroach/pkg/storage/spanset" "github.com/cockroachdb/cockroach/pkg/storage/stateloader" "github.com/cockroachdb/cockroach/pkg/storage/storagebase" @@ -37,7 +34,6 @@ import ( "github.com/cockroachdb/cockroach/pkg/util/timeutil" "github.com/cockroachdb/cockroach/pkg/util/tracing" "github.com/cockroachdb/cockroach/pkg/util/uuid" - "github.com/kr/pretty" "github.com/pkg/errors" "go.etcd.io/etcd/raft" "go.etcd.io/etcd/raft/raftpb" @@ -699,92 +695,19 @@ func (r *Replica) handleRaftReadyRaftMuLocked( r.store.raftEntryCache.Add(r.RangeID, rd.Entries, true /* truncate */) r.sendRaftMessages(ctx, otherMsgs) r.traceEntries(rd.CommittedEntries, "committed, before applying any entries") - applicationStart := timeutil.Now() - for _, e := range rd.CommittedEntries { - switch e.Type { - case raftpb.EntryNormal: - // NB: Committed entries are handed to us by Raft. Raft does not - // know about sideloading. Consequently the entries here are all - // already inlined. - - var commandID storagebase.CmdIDKey - var command storagepb.RaftCommand - - // Process committed entries. etcd raft occasionally adds a nil entry - // (our own commands are never empty). This happens in two situations: - // When a new leader is elected, and when a config change is dropped due - // to the "one at a time" rule. In both cases we may need to resubmit our - // pending proposals (In the former case we resubmit everything because - // we proposed them to a former leader that is no longer able to commit - // them. In the latter case we only need to resubmit pending config - // changes, but it's hard to distinguish so we resubmit everything - // anyway). We delay resubmission until after we have processed the - // entire batch of entries. - if len(e.Data) == 0 { - // Overwrite unconditionally since this is the most aggressive - // reproposal mode. - if !r.store.TestingKnobs().DisableRefreshReasonNewLeaderOrConfigChange { - refreshReason = reasonNewLeaderOrConfigChange - } - commandID = "" // special-cased value, command isn't used - } else { - var encodedCommand []byte - commandID, encodedCommand = DecodeRaftCommand(e.Data) - // An empty command is used to unquiesce a range and wake the - // leader. Clear commandID so it's ignored for processing. - if len(encodedCommand) == 0 { - commandID = "" - } else if err := protoutil.Unmarshal(encodedCommand, &command); err != nil { - const expl = "while unmarshalling entry" - return stats, expl, errors.Wrap(err, expl) - } - } - - if changedRepl := r.processRaftCommand(ctx, commandID, e.Term, e.Index, command); changedRepl { - log.Fatalf(ctx, "unexpected replication change from command %s", &command) - } - r.store.metrics.RaftCommandsApplied.Inc(1) - stats.processed++ - - case raftpb.EntryConfChange: - var cc raftpb.ConfChange - if err := protoutil.Unmarshal(e.Data, &cc); err != nil { - const expl = "while unmarshaling ConfChange" - return stats, expl, errors.Wrap(err, expl) - } - var ccCtx ConfChangeContext - if err := protoutil.Unmarshal(cc.Context, &ccCtx); err != nil { - const expl = "while unmarshaling ConfChangeContext" - return stats, expl, errors.Wrap(err, expl) - } - var command storagepb.RaftCommand - if err := protoutil.Unmarshal(ccCtx.Payload, &command); err != nil { - const expl = "while unmarshaling RaftCommand" - return stats, expl, errors.Wrap(err, expl) - } - commandID := storagebase.CmdIDKey(ccCtx.CommandID) - if changedRepl := r.processRaftCommand( - ctx, commandID, e.Term, e.Index, command, - ); !changedRepl { - // If we did not apply the config change, tell raft that the config change was aborted. - cc = raftpb.ConfChange{} - } - stats.processed++ - - if err := r.withRaftGroup(true, func(raftGroup *raft.RawNode) (bool, error) { - raftGroup.ApplyConfChange(cc) - return true, nil - }); err != nil { - const expl = "during ApplyConfChange" - return stats, expl, errors.Wrap(err, expl) - } - default: - log.Fatalf(ctx, "unexpected Raft entry: %v", e) + applicationStart := timeutil.Now() + if len(rd.CommittedEntries) > 0 { + var expl string + refreshReason, expl, err = r.handleCommittedEntriesRaftMuLocked(ctx, + rd.CommittedEntries, &stats, refreshReason) + if err != nil { + return stats, expl, err } } applicationElapsed := timeutil.Since(applicationStart).Nanoseconds() r.store.metrics.RaftApplyCommittedLatency.RecordValue(applicationElapsed) + if refreshReason != noReason { r.mu.Lock() r.refreshProposalsLocked(0, refreshReason) @@ -1196,151 +1119,6 @@ func (r *Replica) reportSnapshotStatus(ctx context.Context, to roachpb.ReplicaID } } -func (r *Replica) checkForcedErrLocked( - ctx context.Context, - idKey storagebase.CmdIDKey, - raftCmd storagepb.RaftCommand, - proposal *ProposalData, - proposedLocally bool, -) (uint64, proposalReevaluationReason, *roachpb.Error) { - leaseIndex := r.mu.state.LeaseAppliedIndex - - isLeaseRequest := raftCmd.ReplicatedEvalResult.IsLeaseRequest - var requestedLease roachpb.Lease - if isLeaseRequest { - requestedLease = *raftCmd.ReplicatedEvalResult.State.Lease - } - if idKey == "" { - // This is an empty Raft command (which is sent by Raft after elections - // to trigger reproposals or during concurrent configuration changes). - // Nothing to do here except making sure that the corresponding batch - // (which is bogus) doesn't get executed (for it is empty and so - // properties like key range are undefined). - return leaseIndex, proposalNoReevaluation, roachpb.NewErrorf("no-op on empty Raft entry") - } - - // Verify the lease matches the proposer's expectation. We rely on - // the proposer's determination of whether the existing lease is - // held, and can be used, or is expired, and can be replaced. - // Verify checks that the lease has not been modified since proposal - // due to Raft delays / reorderings. - // To understand why this lease verification is necessary, see comments on the - // proposer_lease field in the proto. - leaseMismatch := false - if raftCmd.DeprecatedProposerLease != nil { - // VersionLeaseSequence must not have been active when this was proposed. - // - // This does not prevent the lease race condition described below. The - // reason we don't fix this here as well is because fixing the race - // requires a new cluster version which implies that we'll already be - // using lease sequence numbers and will fall into the case below. - leaseMismatch = !raftCmd.DeprecatedProposerLease.Equivalent(*r.mu.state.Lease) - } else { - leaseMismatch = raftCmd.ProposerLeaseSequence != r.mu.state.Lease.Sequence - if !leaseMismatch && isLeaseRequest { - // Lease sequence numbers are a reflection of lease equivalency - // between subsequent leases. However, Lease.Equivalent is not fully - // symmetric, meaning that two leases may be Equivalent to a third - // lease but not Equivalent to each other. If these leases are - // proposed under that same third lease, neither will be able to - // detect whether the other has applied just by looking at the - // current lease sequence number because neither will will increment - // the sequence number. - // - // This can lead to inversions in lease expiration timestamps if - // we're not careful. To avoid this, if a lease request's proposer - // lease sequence matches the current lease sequence and the current - // lease sequence also matches the requested lease sequence, we make - // sure the requested lease is Equivalent to current lease. - if r.mu.state.Lease.Sequence == requestedLease.Sequence { - // It is only possible for this to fail when expiration-based - // lease extensions are proposed concurrently. - leaseMismatch = !r.mu.state.Lease.Equivalent(requestedLease) - } - - // This is a check to see if the lease we proposed this lease request against is the same - // lease that we're trying to update. We need to check proposal timestamps because - // extensions don't increment sequence numbers. Without this check a lease could - // be extended and then another lease proposed against the original lease would - // be applied over the extension. - if raftCmd.ReplicatedEvalResult.PrevLeaseProposal != nil && - (*raftCmd.ReplicatedEvalResult.PrevLeaseProposal != *r.mu.state.Lease.ProposedTS) { - leaseMismatch = true - } - } - } - if leaseMismatch { - log.VEventf( - ctx, 1, - "command proposed from replica %+v with lease #%d incompatible to %v", - raftCmd.ProposerReplica, raftCmd.ProposerLeaseSequence, *r.mu.state.Lease, - ) - if isLeaseRequest { - // For lease requests we return a special error that - // redirectOnOrAcquireLease() understands. Note that these - // requests don't go through the DistSender. - return leaseIndex, proposalNoReevaluation, roachpb.NewError(&roachpb.LeaseRejectedError{ - Existing: *r.mu.state.Lease, - Requested: requestedLease, - Message: "proposed under invalid lease", - }) - } - // We return a NotLeaseHolderError so that the DistSender retries. - nlhe := newNotLeaseHolderError( - r.mu.state.Lease, raftCmd.ProposerReplica.StoreID, r.mu.state.Desc) - nlhe.CustomMsg = fmt.Sprintf( - "stale proposal: command was proposed under lease #%d but is being applied "+ - "under lease: %s", raftCmd.ProposerLeaseSequence, r.mu.state.Lease) - return leaseIndex, proposalNoReevaluation, roachpb.NewError(nlhe) - } - - if isLeaseRequest { - // Lease commands are ignored by the counter (and their MaxLeaseIndex is ignored). This - // makes sense since lease commands are proposed by anyone, so we can't expect a coherent - // MaxLeaseIndex. Also, lease proposals are often replayed, so not making them update the - // counter makes sense from a testing perspective. - // - // However, leases get special vetting to make sure we don't give one to a replica that was - // since removed (see #15385 and a comment in redirectOnOrAcquireLease). - if _, ok := r.mu.state.Desc.GetReplicaDescriptor(requestedLease.Replica.StoreID); !ok { - return leaseIndex, proposalNoReevaluation, roachpb.NewError(&roachpb.LeaseRejectedError{ - Existing: *r.mu.state.Lease, - Requested: requestedLease, - Message: "replica not part of range", - }) - } - } else if r.mu.state.LeaseAppliedIndex < raftCmd.MaxLeaseIndex { - // The happy case: the command is applying at or ahead of the minimal - // permissible index. It's ok if it skips a few slots (as can happen - // during rearrangement); this command will apply, but later ones which - // were proposed at lower indexes may not. Overall though, this is more - // stable and simpler than requiring commands to apply at their exact - // lease index: Handling the case in which MaxLeaseIndex > oldIndex+1 - // is otherwise tricky since we can't tell the client to try again - // (reproposals could exist and may apply at the right index, leading - // to a replay), and assigning the required index would be tedious - // seeing that it would have to rewind sometimes. - leaseIndex = raftCmd.MaxLeaseIndex - } else { - // The command is trying to apply at a past log position. That's - // unfortunate and hopefully rare; the client on the proposer will try - // again. Note that in this situation, the leaseIndex does not advance. - retry := proposalNoReevaluation - if proposedLocally { - log.VEventf( - ctx, 1, - "retry proposal %x: applied at lease index %d, required < %d", - proposal.idKey, leaseIndex, raftCmd.MaxLeaseIndex, - ) - retry = proposalIllegalLeaseIndex - } - return leaseIndex, retry, roachpb.NewErrorf( - "command observed at lease index %d, but required < %d", leaseIndex, raftCmd.MaxLeaseIndex, - ) - } - return leaseIndex, proposalNoReevaluation, nil -} - type snapTruncationInfo struct { index uint64 deadline time.Time @@ -1591,383 +1369,6 @@ func (m lastUpdateTimesMap) isFollowerActive( return now.Sub(lastUpdateTime) <= MaxQuotaReplicaLivenessDuration } -// processRaftCommand handles the complexities involved in moving the Raft -// state of a Replica forward. At a high level, it receives a proposal, which -// contains the evaluation of a batch (at its heart a WriteBatch, to be applied -// to the underlying storage engine), which it applies and for which it signals -// the client waiting for it (if it's waiting on this Replica). -// -// The proposal also contains auxiliary data which needs to be verified in order -// to decide whether the proposal should be applied: the command's MaxLeaseIndex -// must move the state machine's LeaseAppliedIndex forward, and the proposer's -// lease (or rather its sequence number) must match that of the state machine. -// Furthermore, the GCThreshold is validated and it is checked whether the -// request's key span is contained in the Replica's (it is unclear whether all -// of these checks are necessary). If any of the checks fail, the proposal's -// content is wiped and we apply an empty log entry instead, returning an error -// to the caller to handle. The two typical cases are the lease mismatch (in -// which case the caller tries to send the command to the actual leaseholder) -// and violations of the LeaseAppliedIndex (in which the caller tries again). -// -// Assuming all checks were passed, the command should be applied to the engine, -// which is done by the aptly named applyRaftCommand. -// -// For simple proposals this is the whole story, but some commands trigger -// additional code in this method. The standard way in which this is triggered -// is via a side effect communicated in the proposal's ReplicatedEvalResult and, -// for local proposals, the LocalEvalResult. These might, for example, trigger -// an update of the Replica's in-memory state to match updates to the on-disk -// state, or pass intents to the intent resolver. Some commands don't fit this -// simple schema and need to hook deeper into the code. Notably splits and merges -// need to acquire locks on their right-hand side Replicas and may need to add -// data to the WriteBatch before it is applied; similarly, changes to the disk -// layout of internal state typically require a migration which shows up here. -// -// This method returns true if the command successfully applied a replica -// change. -func (r *Replica) processRaftCommand( - ctx context.Context, - idKey storagebase.CmdIDKey, - term, raftIndex uint64, - raftCmd storagepb.RaftCommand, -) (changedRepl bool) { - if raftIndex == 0 { - log.Fatalf(ctx, "processRaftCommand requires a non-zero index") - } - - if log.V(4) { - log.Infof(ctx, "processing command %x: maxLeaseIndex=%d", idKey, raftCmd.MaxLeaseIndex) - } - - var ts hlc.Timestamp - if idKey != "" { - ts = raftCmd.ReplicatedEvalResult.Timestamp - } - - r.mu.Lock() - proposal, proposedLocally := r.mu.proposals[idKey] - - // TODO(tschottdorf): consider the Trace situation here. - if proposedLocally { - // We initiated this command, so use the caller-supplied context. - ctx = proposal.ctx - delete(r.mu.proposals, idKey) - - // At this point we're not guaranteed to have proposalQuota initialized, - // the same is true for quotaReleaseQueues. Only queue the proposal's - // quota for release if the proposalQuota is initialized. - if r.mu.proposalQuota != nil { - r.mu.quotaReleaseQueue = append(r.mu.quotaReleaseQueue, proposal.quotaSize) - } - } - - leaseIndex, proposalRetry, forcedErr := r.checkForcedErrLocked(ctx, idKey, raftCmd, proposal, proposedLocally) - - r.mu.Unlock() - - if forcedErr == nil { - // Verify that the batch timestamp is after the GC threshold. This is - // necessary because not all commands declare read access on the GC - // threshold key, even though they implicitly depend on it. This means - // that access to this state will not be serialized by latching, - // so we must perform this check upstream and downstream of raft. - // See #14833. - // - // We provide an empty key span because we already know that the Raft - // command is allowed to apply within its key range. This is guaranteed - // by checks upstream of Raft, which perform the same validation, and by - // span latches, which assure that any modifications to the range's - // boundaries will be serialized with this command. Finally, the - // leaseAppliedIndex check in checkForcedErrLocked ensures that replays - // outside of the spanlatch manager's control which break this - // serialization ordering will already by caught and an error will be - // thrown. - forcedErr = roachpb.NewError(r.requestCanProceed(roachpb.RSpan{}, ts)) - } - - // applyRaftCommand will return "expected" errors, but may also indicate - // replica corruption (as of now, signaled by a replicaCorruptionError). - // We feed its return through maybeSetCorrupt to act when that happens. - if forcedErr != nil { - log.VEventf(ctx, 1, "applying command with forced error: %s", forcedErr) - } else { - log.Event(ctx, "applying command") - - if splitMergeUnlock, err := r.maybeAcquireSplitMergeLock(ctx, raftCmd); err != nil { - log.Eventf(ctx, "unable to acquire split lock: %s", err) - // Send a crash report because a former bug in the error handling might have - // been the root cause of #19172. - _ = r.store.stopper.RunAsyncTask(ctx, "crash report", func(ctx context.Context) { - log.SendCrashReport( - ctx, - &r.store.cfg.Settings.SV, - 0, // depth - "while acquiring split lock: %s", - []interface{}{err}, - log.ReportTypeError, - ) - }) - - forcedErr = roachpb.NewError(err) - } else if splitMergeUnlock != nil { - // Close over raftCmd to capture its value at execution time; we clear - // ReplicatedEvalResult on certain errors. - defer func() { - splitMergeUnlock(raftCmd.ReplicatedEvalResult) - }() - } - } - - var response proposalResult - var writeBatch *storagepb.WriteBatch - { - if filter := r.store.cfg.TestingKnobs.TestingApplyFilter; forcedErr == nil && filter != nil { - var newPropRetry int - newPropRetry, forcedErr = filter(storagebase.ApplyFilterArgs{ - CmdID: idKey, - ReplicatedEvalResult: raftCmd.ReplicatedEvalResult, - StoreID: r.store.StoreID(), - RangeID: r.RangeID, - }) - if proposalRetry == 0 { - proposalRetry = proposalReevaluationReason(newPropRetry) - } - } - - if forcedErr != nil { - // Apply an empty entry. - raftCmd.ReplicatedEvalResult = storagepb.ReplicatedEvalResult{} - raftCmd.WriteBatch = nil - raftCmd.LogicalOpLog = nil - } - - // Update the node clock with the serviced request. This maintains - // a high water mark for all ops serviced, so that received ops without - // a timestamp specified are guaranteed one higher than any op already - // executed for overlapping keys. - r.store.Clock().Update(ts) - - var pErr *roachpb.Error - if raftCmd.WriteBatch != nil { - writeBatch = raftCmd.WriteBatch - } - - if deprecatedDelta := raftCmd.ReplicatedEvalResult.DeprecatedDelta; deprecatedDelta != nil { - raftCmd.ReplicatedEvalResult.Delta = deprecatedDelta.ToStatsDelta() - raftCmd.ReplicatedEvalResult.DeprecatedDelta = nil - } - - // AddSSTable ingestions run before the actual batch. This makes sure - // that when the Raft command is applied, the ingestion has definitely - // succeeded. Note that we have taken precautions during command - // evaluation to avoid having mutations in the WriteBatch that affect - // the SSTable. Not doing so could result in order reversal (and missing - // values) here. If the key range we are ingesting into isn't empty, - // we're not using AddSSTable but a plain WriteBatch. - if raftCmd.ReplicatedEvalResult.AddSSTable != nil { - copied := addSSTablePreApply( - ctx, - r.store.cfg.Settings, - r.store.engine, - r.raftMu.sideloaded, - term, - raftIndex, - *raftCmd.ReplicatedEvalResult.AddSSTable, - r.store.limiters.BulkIOWriteRate, - ) - r.store.metrics.AddSSTableApplications.Inc(1) - if copied { - r.store.metrics.AddSSTableApplicationCopies.Inc(1) - } - raftCmd.ReplicatedEvalResult.AddSSTable = nil - } - - if raftCmd.ReplicatedEvalResult.Split != nil { - // Splits require a new HardState to be written to the new RHS - // range (and this needs to be atomic with the main batch). This - // cannot be constructed at evaluation time because it differs - // on each replica (votes may have already been cast on the - // uninitialized replica). Transform the write batch to add the - // updated HardState. - // See https://github.com/cockroachdb/cockroach/issues/20629 - // - // This is not the most efficient, but it only happens on splits, - // which are relatively infrequent and don't write much data. - tmpBatch := r.store.engine.NewBatch() - if err := tmpBatch.ApplyBatchRepr(writeBatch.Data, false); err != nil { - log.Fatal(ctx, err) - } - splitPreApply(ctx, tmpBatch, raftCmd.ReplicatedEvalResult.Split.SplitTrigger) - writeBatch.Data = tmpBatch.Repr() - tmpBatch.Close() - } - - if merge := raftCmd.ReplicatedEvalResult.Merge; merge != nil { - // Merges require the subsumed range to be atomically deleted when the - // merge transaction commits. - // - // This is not the most efficient, but it only happens on merges, - // which are relatively infrequent and don't write much data. - tmpBatch := r.store.engine.NewBatch() - if err := tmpBatch.ApplyBatchRepr(writeBatch.Data, false); err != nil { - log.Fatal(ctx, err) - } - rhsRepl, err := r.store.GetReplica(merge.RightDesc.RangeID) - if err != nil { - log.Fatal(ctx, err) - } - const destroyData = false - err = rhsRepl.preDestroyRaftMuLocked(ctx, tmpBatch, tmpBatch, merge.RightDesc.NextReplicaID, destroyData) - if err != nil { - log.Fatal(ctx, err) - } - writeBatch.Data = tmpBatch.Repr() - tmpBatch.Close() - } - - { - var err error - raftCmd.ReplicatedEvalResult, err = r.applyRaftCommand( - ctx, idKey, raftCmd.ReplicatedEvalResult, raftIndex, leaseIndex, writeBatch) - - // applyRaftCommand returned an error, which usually indicates - // either a serious logic bug in CockroachDB or a disk - // corruption/out-of-space issue. Make sure that these fail with - // descriptive message so that we can differentiate the root causes. - if err != nil { - log.Errorf(ctx, "unable to update the state machine: %s", err) - // Report the fatal error separately and only with the error, as that - // triggers an optimization for which we directly report the error to - // sentry (which in turn allows sentry to distinguish different error - // types). - log.Fatal(ctx, err) - } - } - - if filter := r.store.cfg.TestingKnobs.TestingPostApplyFilter; pErr == nil && filter != nil { - var newPropRetry int - newPropRetry, pErr = filter(storagebase.ApplyFilterArgs{ - CmdID: idKey, - ReplicatedEvalResult: raftCmd.ReplicatedEvalResult, - StoreID: r.store.StoreID(), - RangeID: r.RangeID, - }) - if proposalRetry == 0 { - proposalRetry = proposalReevaluationReason(newPropRetry) - } - - } - - // calling maybeSetCorrupt here is mostly for tests and looks. The - // interesting errors originate in applyRaftCommand, and they are - // already handled above. - pErr = r.maybeSetCorrupt(ctx, pErr) - if pErr == nil { - pErr = forcedErr - } - - var lResult *result.LocalResult - if proposedLocally { - if proposalRetry != proposalNoReevaluation && pErr == nil { - log.Fatalf(ctx, "proposal with nontrivial retry behavior, but no error: %+v", proposal) - } - if pErr != nil { - // A forced error was set (i.e. we did not apply the proposal, - // for instance due to its log position) or the Replica is now - // corrupted. - // If proposalRetry is set, we don't also return an error, as per the - // proposalResult contract. - if proposalRetry == proposalNoReevaluation { - response.Err = pErr - } - } else if proposal.Local.Reply != nil { - response.Reply = proposal.Local.Reply - } else { - log.Fatalf(ctx, "proposal must return either a reply or an error: %+v", proposal) - } - response.Intents = proposal.Local.DetachIntents() - response.EndTxns = proposal.Local.DetachEndTxns(pErr != nil) - if pErr == nil { - lResult = proposal.Local - } - } - if pErr != nil && lResult != nil { - log.Fatalf(ctx, "shouldn't have a local result if command processing failed. pErr: %s", pErr) - } - if log.ExpensiveLogEnabled(ctx, 2) { - log.VEvent(ctx, 2, lResult.String()) - } - - // Handle the Result, executing any side effects of the last - // state machine transition. - // - // Note that this must happen after committing (the engine.Batch), but - // before notifying a potentially waiting client. - r.handleEvalResultRaftMuLocked(ctx, lResult, - raftCmd.ReplicatedEvalResult, raftIndex, leaseIndex) - - // Provide the command's corresponding logical operations to the - // Replica's rangefeed. Only do so if the WriteBatch is non-nil, - // otherwise it's valid for the logical op log to be nil, which - // would shut down all rangefeeds. If no rangefeed is running, - // this call will be a no-op. - if raftCmd.WriteBatch != nil { - r.handleLogicalOpLogRaftMuLocked(ctx, raftCmd.LogicalOpLog) - } else if raftCmd.LogicalOpLog != nil { - log.Fatalf(ctx, "non-nil logical op log with nil write batch: %v", raftCmd) - } - } - - // When set to true, recomputes the stats for the LHS and RHS of splits and - // makes sure that they agree with the state's range stats. - const expensiveSplitAssertion = false - - if expensiveSplitAssertion && raftCmd.ReplicatedEvalResult.Split != nil { - split := raftCmd.ReplicatedEvalResult.Split - lhsStatsMS := r.GetMVCCStats() - lhsComputedMS, err := rditer.ComputeStatsForRange(&split.LeftDesc, r.store.Engine(), lhsStatsMS.LastUpdateNanos) - if err != nil { - log.Fatal(ctx, err) - } - - rightReplica, err := r.store.GetReplica(split.RightDesc.RangeID) - if err != nil { - log.Fatal(ctx, err) - } - - rhsStatsMS := rightReplica.GetMVCCStats() - rhsComputedMS, err := rditer.ComputeStatsForRange(&split.RightDesc, r.store.Engine(), rhsStatsMS.LastUpdateNanos) - if err != nil { - log.Fatal(ctx, err) - } - - if diff := pretty.Diff(lhsStatsMS, lhsComputedMS); len(diff) > 0 { - log.Fatalf(ctx, "LHS split stats divergence: diff(claimed, computed) = %s", pretty.Diff(lhsStatsMS, lhsComputedMS)) - } - if diff := pretty.Diff(rhsStatsMS, rhsComputedMS); len(diff) > 0 { - log.Fatalf(ctx, "RHS split stats divergence diff(claimed, computed) = %s", pretty.Diff(rhsStatsMS, rhsComputedMS)) - } - } - - if proposedLocally { - // If we failed to apply at the right lease index, try again with - // a new one. This is important for pipelined writes, since they - // don't have a client watching to retry, so a failure to - // eventually apply the proposal would be a user-visible error. - // TODO(nvanbenschoten): This reproposal is not tracked by the - // quota pool. We should fix that. - if proposalRetry == proposalIllegalLeaseIndex && r.tryReproposeWithNewLeaseIndex(proposal) { - return false - } - // Otherwise, signal the command's status to the client. - proposal.finishApplication(response) - } else if response.Err != nil { - log.VEventf(ctx, 1, "applying raft command resulted in error: %s", response.Err) - } - - return raftCmd.ReplicatedEvalResult.ChangeReplicas != nil -} - // tryReproposeWithNewLeaseIndex is used by processRaftCommand to // repropose commands that have gotten an illegal lease index error, // and that we know could not have applied while their lease index was @@ -2073,7 +1474,7 @@ func (r *Replica) maybeAcquireSnapshotMergeLock( // applying the command to perform any necessary cleanup. func (r *Replica) maybeAcquireSplitMergeLock( ctx context.Context, raftCmd storagepb.RaftCommand, -) (func(storagepb.ReplicatedEvalResult), error) { +) (func(*storagepb.ReplicatedEvalResult), error) { if split := raftCmd.ReplicatedEvalResult.Split; split != nil { return r.acquireSplitLock(ctx, &split.SplitTrigger) } else if merge := raftCmd.ReplicatedEvalResult.Merge; merge != nil { @@ -2084,7 +1485,7 @@ func (r *Replica) maybeAcquireSplitMergeLock( func (r *Replica) acquireSplitLock( ctx context.Context, split *roachpb.SplitTrigger, -) (func(storagepb.ReplicatedEvalResult), error) { +) (func(*storagepb.ReplicatedEvalResult), error) { rightRng, created, err := r.store.getOrCreateReplica(ctx, split.RightDesc.RangeID, 0, nil) if err != nil { return nil, err @@ -2101,7 +1502,7 @@ func (r *Replica) acquireSplitLock( // commands that have reproposals interacting with retries (i.e. we don't // treat splits differently). - return func(rResult storagepb.ReplicatedEvalResult) { + return func(rResult *storagepb.ReplicatedEvalResult) { if rResult.Split == nil && created && !rightRng.IsInitialized() { // An error occurred during processing of the split and the RHS is still // uninitialized. Mark the RHS destroyed and remove it from the replica's @@ -2127,7 +1528,7 @@ func (r *Replica) acquireSplitLock( func (r *Replica) acquireMergeLock( ctx context.Context, merge *roachpb.MergeTrigger, -) (func(storagepb.ReplicatedEvalResult), error) { +) (func(*storagepb.ReplicatedEvalResult), error) { // The merge lock is the right-hand replica's raftMu. The right-hand replica // is required to exist on this store. Otherwise, an incoming snapshot could // create the right-hand replica before the merge trigger has a chance to @@ -2145,199 +1546,11 @@ func (r *Replica) acquireMergeLock( log.Fatalf(ctx, "RHS of merge %s <- %s not present on store; found %s in place of the RHS", merge.LeftDesc, merge.RightDesc, rightDesc) } - return func(storagepb.ReplicatedEvalResult) { + return func(*storagepb.ReplicatedEvalResult) { rightRepl.raftMu.Unlock() }, nil } -// applyRaftCommand applies a raft command from the replicated log to the -// underlying state machine (i.e. the engine). When the state machine can not be -// updated, an error (which is likely fatal!) is returned and must be handled by -// the caller. -// The returned ReplicatedEvalResult replaces the caller's. -func (r *Replica) applyRaftCommand( - ctx context.Context, - idKey storagebase.CmdIDKey, - rResult storagepb.ReplicatedEvalResult, - raftAppliedIndex, leaseAppliedIndex uint64, - writeBatch *storagepb.WriteBatch, -) (storagepb.ReplicatedEvalResult, error) { - if writeBatch != nil && len(writeBatch.Data) > 0 { - // Record the write activity, passing a 0 nodeID because replica.writeStats - // intentionally doesn't track the origin of the writes. - mutationCount, err := engine.RocksDBBatchCount(writeBatch.Data) - if err != nil { - log.Errorf(ctx, "unable to read header of committed WriteBatch: %s", err) - } else { - r.writeStats.recordCount(float64(mutationCount), 0 /* nodeID */) - } - } - - r.mu.Lock() - usingAppliedStateKey := r.mu.state.UsingAppliedStateKey - oldRaftAppliedIndex := r.mu.state.RaftAppliedIndex - oldLeaseAppliedIndex := r.mu.state.LeaseAppliedIndex - oldTruncatedState := r.mu.state.TruncatedState - - // Exploit the fact that a split will result in a full stats - // recomputation to reset the ContainsEstimates flag. - // - // TODO(tschottdorf): We want to let the usual MVCCStats-delta - // machinery update our stats for the left-hand side. But there is no - // way to pass up an MVCCStats object that will clear out the - // ContainsEstimates flag. We should introduce one, but the migration - // makes this worth a separate effort (ContainsEstimates would need to - // have three possible values, 'UNCHANGED', 'NO', and 'YES'). - // Until then, we're left with this rather crude hack. - if rResult.Split != nil { - r.mu.state.Stats.ContainsEstimates = false - } - ms := *r.mu.state.Stats - r.mu.Unlock() - - if raftAppliedIndex != oldRaftAppliedIndex+1 { - // If we have an out of order index, there's corruption. No sense in - // trying to update anything or running the command. Simply return - // a corruption error. - return storagepb.ReplicatedEvalResult{}, errors.Errorf("applied index jumped from %d to %d", - oldRaftAppliedIndex, raftAppliedIndex) - } - - haveTruncatedState := rResult.State != nil && rResult.State.TruncatedState != nil - var batch engine.Batch - if !haveTruncatedState { - batch = r.store.Engine().NewWriteOnlyBatch() - } else { - // When we update the truncated state, we may need to read the batch - // and can't use a WriteOnlyBatch. This is fine since log truncations - // are tiny batches. - batch = r.store.Engine().NewBatch() - } - defer batch.Close() - - if writeBatch != nil { - if err := batch.ApplyBatchRepr(writeBatch.Data, false); err != nil { - return storagepb.ReplicatedEvalResult{}, errors.Wrap(err, "unable to apply WriteBatch") - } - } - - // The only remaining use of the batch is for range-local keys which we know - // have not been previously written within this batch. - writer := batch.Distinct() - - // Special-cased MVCC stats handling to exploit commutativity of stats delta - // upgrades. Thanks to commutativity, the spanlatch manager does not have to - // serialize on the stats key. - deltaStats := rResult.Delta.ToStats() - - if !usingAppliedStateKey && rResult.State != nil && rResult.State.UsingAppliedStateKey { - // The Raft command wants us to begin using the RangeAppliedState key - // and we haven't performed the migration yet. Delete the old keys - // that this new key is replacing. - err := r.raftMu.stateLoader.MigrateToRangeAppliedStateKey(ctx, writer, &deltaStats) - if err != nil { - return storagepb.ReplicatedEvalResult{}, errors.Wrap(err, "unable to migrate to range applied state") - } - usingAppliedStateKey = true - } - - if usingAppliedStateKey { - // Note that calling ms.Add will never result in ms.LastUpdateNanos - // decreasing (and thus LastUpdateNanos tracks the maximum LastUpdateNanos - // across all deltaStats). - ms.Add(deltaStats) - - // Set the range applied state, which includes the last applied raft and - // lease index along with the mvcc stats, all in one key. - if err := r.raftMu.stateLoader.SetRangeAppliedState(ctx, writer, - raftAppliedIndex, leaseAppliedIndex, &ms); err != nil { - return storagepb.ReplicatedEvalResult{}, errors.Wrap(err, "unable to set range applied state") - } - } else { - // Advance the last applied index. We use a blind write in order to avoid - // reading the previous applied index keys on every write operation. This - // requires a little additional work in order maintain the MVCC stats. - var appliedIndexNewMS enginepb.MVCCStats - if err := r.raftMu.stateLoader.SetLegacyAppliedIndexBlind(ctx, writer, &appliedIndexNewMS, - raftAppliedIndex, leaseAppliedIndex); err != nil { - return storagepb.ReplicatedEvalResult{}, errors.Wrap(err, "unable to set applied index") - } - deltaStats.SysBytes += appliedIndexNewMS.SysBytes - - r.raftMu.stateLoader.CalcAppliedIndexSysBytes(oldRaftAppliedIndex, oldLeaseAppliedIndex) - - // Note that calling ms.Add will never result in ms.LastUpdateNanos - // decreasing (and thus LastUpdateNanos tracks the maximum LastUpdateNanos - // across all deltaStats). - ms.Add(deltaStats) - if err := r.raftMu.stateLoader.SetMVCCStats(ctx, writer, &ms); err != nil { - return storagepb.ReplicatedEvalResult{}, errors.Wrap(err, "unable to update MVCCStats") - } - } - - if haveTruncatedState { - apply, err := handleTruncatedStateBelowRaft(ctx, oldTruncatedState, rResult.State.TruncatedState, r.raftMu.stateLoader, writer) - if err != nil { - return storagepb.ReplicatedEvalResult{}, err - } - if !apply { - // The truncated state was discarded, so make sure we don't apply - // it to our in-memory state. - rResult.State.TruncatedState = nil - rResult.RaftLogDelta = 0 - // We received a truncation that doesn't apply to us, so we know that - // there's a leaseholder out there with a log that has earlier entries - // than ours. That leader also guided our log size computations by - // giving us RaftLogDeltas for past truncations, and this was likely - // off. Mark our Raft log size is not trustworthy so that, assuming - // we step up as leader at some point in the future, we recompute - // our numbers. - r.mu.Lock() - r.mu.raftLogSizeTrusted = false - r.mu.Unlock() - } - } - - // TODO(peter): We did not close the writer in an earlier version of - // the code, which went undetected even though we used the batch after - // (though only to commit it). We should add an assertion to prevent that in - // the future. - writer.Close() - - start := timeutil.Now() - - var assertHS *raftpb.HardState - if util.RaceEnabled && rResult.Split != nil { - rsl := stateloader.Make(rResult.Split.RightDesc.RangeID) - oldHS, err := rsl.LoadHardState(ctx, r.store.Engine()) - if err != nil { - return storagepb.ReplicatedEvalResult{}, errors.Wrap(err, "unable to load HardState") - } - assertHS = &oldHS - } - if err := batch.Commit(false); err != nil { - return storagepb.ReplicatedEvalResult{}, errors.Wrap(err, "could not commit batch") - } - - if assertHS != nil { - // Load the HardState that was just committed (if any). - rsl := stateloader.Make(rResult.Split.RightDesc.RangeID) - newHS, err := rsl.LoadHardState(ctx, r.store.Engine()) - if err != nil { - return storagepb.ReplicatedEvalResult{}, errors.Wrap(err, "unable to load HardState") - } - // Assert that nothing moved "backwards". - if newHS.Term < assertHS.Term || (newHS.Term == assertHS.Term && newHS.Commit < assertHS.Commit) { - log.Fatalf(ctx, "clobbered HardState: %s\n\npreviously: %s\noverwritten with: %s", - pretty.Diff(newHS, *assertHS), pretty.Sprint(*assertHS), pretty.Sprint(newHS)) - } - } - - elapsed := timeutil.Since(start) - r.store.metrics.RaftCommandCommitLatency.RecordValue(elapsed.Nanoseconds()) - rResult.Delta = deltaStats.ToStatsDelta() - return rResult, nil -} - // handleTruncatedStateBelowRaft is called when a Raft command updates the truncated // state. This isn't 100% trivial for two reasons: // - in 19.1 we're making the TruncatedState key unreplicated, so there's a migration diff --git a/pkg/storage/replica_test.go b/pkg/storage/replica_test.go index 99d9c9b9a230..a92ff73d3504 100644 --- a/pkg/storage/replica_test.go +++ b/pkg/storage/replica_test.go @@ -900,8 +900,9 @@ func TestLeaseReplicaNotInDesc(t *testing.T) { }, } tc.repl.mu.Lock() - _, _, pErr := tc.repl.checkForcedErrLocked( + _, _, pErr := checkForcedErr( context.Background(), makeIDKey(), raftCmd, nil /* proposal */, false, /* proposedLocally */ + &tc.repl.mu.state, ) tc.repl.mu.Unlock() if _, isErr := pErr.GetDetail().(*roachpb.LeaseRejectedError); !isErr { diff --git a/pkg/storage/track_raft_protos.go b/pkg/storage/track_raft_protos.go index 313ab5cac8d8..23c1028bd867 100644 --- a/pkg/storage/track_raft_protos.go +++ b/pkg/storage/track_raft_protos.go @@ -34,7 +34,7 @@ func funcName(f interface{}) string { // instrumentation and returns the list of downstream-of-raft protos. func TrackRaftProtos() func() []reflect.Type { // Grab the name of the function that roots all raft operations. - processRaftFunc := funcName((*Replica).processRaftCommand) + stageRaftFunc := funcName((*Replica).stageRaftCommand) // We only need to track protos that could cause replica divergence // by being written to disk downstream of raft. whitelist := []string{ @@ -104,7 +104,7 @@ func TrackRaftProtos() func() []reflect.Type { break } - if strings.Contains(f.Function, processRaftFunc) { + if strings.Contains(f.Function, stageRaftFunc) { belowRaftProtos.Lock() belowRaftProtos.inner[t] = struct{}{} belowRaftProtos.Unlock()