Skip to content

Commit

Permalink
kvserver: use must in replica_raft.go
Browse files Browse the repository at this point in the history
Epic: none
Release note: None
  • Loading branch information
erikgrinaker committed Jul 10, 2023
1 parent 89acb1d commit 9d2c60c
Show file tree
Hide file tree
Showing 5 changed files with 82 additions and 83 deletions.
1 change: 1 addition & 0 deletions pkg/kv/kvserver/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -205,6 +205,7 @@ go_library(
"//pkg/util/metric",
"//pkg/util/metric/aggmetric",
"//pkg/util/mon",
"//pkg/util/must",
"//pkg/util/pprofutil",
"//pkg/util/protoutil",
"//pkg/util/quotapool",
Expand Down
24 changes: 0 additions & 24 deletions pkg/kv/kvserver/logstore/sideload.go
Original file line number Diff line number Diff line change
Expand Up @@ -224,30 +224,6 @@ func MaybeInlineSideloadedRaftCommand(
return &ent, nil
}

// AssertSideloadedRaftCommandInlined asserts that if the provided entry is a
// sideloaded entry, then its payload has already been inlined. Doing so
// requires unmarshalling the raft command, so this assertion should be kept out
// of performance critical paths.
func AssertSideloadedRaftCommandInlined(ctx context.Context, ent *raftpb.Entry) {
typ, err := raftlog.EncodingOf(*ent)
if err != nil {
log.Fatalf(ctx, "%v", err)
}
if !typ.IsSideloaded() {
return
}

e, err := raftlog.NewEntry(*ent)
if err != nil {
log.Fatalf(ctx, "%v", err)
}

if len(e.Cmd.ReplicatedEvalResult.AddSSTable.Data) == 0 {
// The entry is "thin", which is what this assertion is checking for.
log.Fatalf(ctx, "found thin sideloaded raft command: %+v", e.Cmd)
}
}

// maybePurgeSideloaded removes [firstIndex, ..., lastIndex] at the given term
// and returns the total number of bytes removed. Nonexistent entries are
// silently skipped over.
Expand Down
6 changes: 4 additions & 2 deletions pkg/kv/kvserver/raft_snapshot_queue.go
Original file line number Diff line number Diff line change
Expand Up @@ -140,8 +140,10 @@ func (rq *raftSnapshotQueue) processRaftSnapshot(
// described above, this will cause raft to keep asking for a snap and at
// some point the snapshot lock above will be released and we'll fall
// through to the logic below.
repl.reportSnapshotStatus(ctx, repDesc.ReplicaID, err)
return false, nil
if reportErr := repl.reportSnapshotStatus(ctx, repDesc.ReplicaID, err); reportErr != nil {
return false, err
}
return false, err
}
}

Expand Down
4 changes: 3 additions & 1 deletion pkg/kv/kvserver/replica_command.go
Original file line number Diff line number Diff line change
Expand Up @@ -2782,7 +2782,9 @@ func (r *Replica) sendSnapshotUsingDelegate(
defer func() {
// Report the snapshot status to Raft, which expects us to do this once we
// finish sending the snapshot.
r.reportSnapshotStatus(ctx, recipient.ReplicaID, retErr)
if err := r.reportSnapshotStatus(ctx, recipient.ReplicaID, retErr); err != nil {
retErr = err
}
}()

r.mu.RLock()
Expand Down
130 changes: 74 additions & 56 deletions pkg/kv/kvserver/replica_raft.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/util/hlc"
"github.com/cockroachdb/cockroach/pkg/util/humanizeutil"
"github.com/cockroachdb/cockroach/pkg/util/log"
"github.com/cockroachdb/cockroach/pkg/util/must"
"github.com/cockroachdb/cockroach/pkg/util/quotapool"
"github.com/cockroachdb/cockroach/pkg/util/timeutil"
"github.com/cockroachdb/cockroach/pkg/util/tracing"
Expand Down Expand Up @@ -253,12 +254,14 @@ func (r *Replica) evalAndPropose(
default:
}
proposal.command.ProposerLeaseSequence = seq
} else if !st.Lease.OwnedBy(r.store.StoreID()) {
} else {
// Perform a sanity check that the lease is owned by this replica. This must
// have been ascertained by the callers in
// checkExecutionCanProceedBeforeStorageSnapshot.
log.Fatalf(ctx, "cannot propose %s on follower with remotely owned lease %s", ba, st.Lease)
} else {
if err := must.True(ctx, st.Lease.OwnedBy(r.store.StoreID()),
"cannot propose %s on follower with remotely owned lease %s", ba, st.Lease); err != nil {
return nil, nil, "", nil, kvpb.NewError(err)
}
proposal.command.ProposerLeaseSequence = st.Lease.Sequence
}

Expand Down Expand Up @@ -767,9 +770,9 @@ func (r *Replica) handleRaftReadyRaftMuLocked(
) (stats handleRaftReadyStats, _ error) {
// handleRaftReadyRaftMuLocked is not prepared to handle context cancellation,
// so assert that it's given a non-cancellable context.
if ctx.Done() != nil {
return handleRaftReadyStats{}, errors.AssertionFailedf(
"handleRaftReadyRaftMuLocked cannot be called with a cancellable context")
if err := must.Zero(ctx, ctx.Done(),
"handleRaftReadyRaftMuLocked called with a cancellable context"); err != nil {
return handleRaftReadyStats{}, err
}

// NB: we need to reference the named return parameter here. If `stats` were
Expand Down Expand Up @@ -799,7 +802,9 @@ func (r *Replica) handleRaftReadyRaftMuLocked(
leaderID := r.mu.leaderID
lastLeaderID := leaderID
err := r.withRaftGroupLocked(true, func(raftGroup *raft.RawNode) (bool, error) {
r.deliverLocalRaftMsgsRaftMuLockedReplicaMuLocked(ctx, raftGroup)
if err := r.deliverLocalRaftMsgsRaftMuLockedReplicaMuLocked(ctx, raftGroup); err != nil {
return false, err
}

numFlushed, err := r.mu.proposalBuf.FlushLockedWithRaftGroup(ctx, raftGroup)
if err != nil {
Expand Down Expand Up @@ -916,25 +921,24 @@ func (r *Replica) handleRaftReadyRaftMuLocked(

if hasMsg(msgStorageAppend) {
if msgStorageAppend.Snapshot != nil {
if inSnap.Desc == nil {
// If we didn't expect Raft to have a snapshot but it has one
// regardless, that is unexpected and indicates a programming
// error.
return stats, errors.AssertionFailedf(
"have inSnap=nil, but raft has a snapshot %s",
raft.DescribeSnapshot(*msgStorageAppend.Snapshot),
)
// If we didn't expect Raft to have a snapshot but it has one regardless,
// that is unexpected and indicates a programming error.
if err := must.NotNil(ctx, inSnap.Desc, "have inSnap=nil, but raft has a snapshot %s",
raft.DescribeSnapshot(*msgStorageAppend.Snapshot)); err != nil {
return stats, err
}

snapUUID, err := uuid.FromBytes(msgStorageAppend.Snapshot.Data)
if err != nil {
return stats, errors.Wrap(err, "invalid snapshot id")
return stats, errors.Wrap(err, "invalid snapshot")
}
if inSnap.SnapUUID == (uuid.UUID{}) {
log.Fatalf(ctx, "programming error: a snapshot application was attempted outside of the streaming snapshot codepath")
if err := must.NotZero(ctx, inSnap.SnapUUID,
"snapshot application attempted outside streaming snapshot codepath"); err != nil {
return stats, err
}
if snapUUID != inSnap.SnapUUID {
log.Fatalf(ctx, "incoming snapshot id doesn't match raft snapshot id: %s != %s", snapUUID, inSnap.SnapUUID)
if err := must.Equal(ctx, snapUUID, inSnap.SnapUUID,
"incoming snapshot id doesn't match raft snapshot id"); err != nil {
return stats, err
}

snap := *msgStorageAppend.Snapshot
Expand All @@ -943,16 +947,20 @@ func (r *Replica) handleRaftReadyRaftMuLocked(
Vote: msgStorageAppend.Vote,
Commit: msgStorageAppend.Commit,
}
if len(msgStorageAppend.Entries) != 0 {
log.Fatalf(ctx, "found Entries in MsgStorageAppend with non-empty Snapshot")
if err := must.Empty(ctx, msgStorageAppend.Entries,
"found Entries in MsgStorageAppend with non-empty Snapshot"); err != nil {
return stats, err
}

// Applying this snapshot may require us to subsume one or more of our right
// neighbors. This occurs if this replica is informed about the merges via a
// Raft snapshot instead of a MsgApp containing the merge commits, e.g.,
// because it went offline before the merge commits applied and did not come
// back online until after the merge commits were truncated away.
subsumedRepls, releaseMergeLock := r.maybeAcquireSnapshotMergeLock(ctx, inSnap)
subsumedRepls, releaseMergeLock, err := r.maybeAcquireSnapshotMergeLock(ctx, inSnap)
if err != nil {
return stats, err
}
defer releaseMergeLock()

stats.tSnapBegin = timeutil.Now()
Expand Down Expand Up @@ -988,8 +996,9 @@ func (r *Replica) handleRaftReadyRaftMuLocked(
r.sendRaftMessages(ctx, msgStorageAppend.Responses, nil /* blocked */, true /* willDeliverLocal */)
} else {
// TODO(pavelkalinnikov): find a way to move it to storeEntries.
if msgStorageAppend.Commit != 0 && !r.IsInitialized() {
log.Fatalf(ctx, "setting non-zero HardState.Commit on uninitialized replica %s", r)
if err := must.False(ctx, msgStorageAppend.Commit != 0 && !r.IsInitialized(),
"setting non-zero HardState.Commit on uninitialized replica"); err != nil {
return stats, err
}
// TODO(pavelkalinnikov): construct and store this in Replica.
// TODO(pavelkalinnikov): fields like raftEntryCache are the same across all
Expand Down Expand Up @@ -1113,7 +1122,9 @@ func (r *Replica) handleRaftReadyRaftMuLocked(

r.mu.Lock()
err = r.withRaftGroupLocked(true, func(raftGroup *raft.RawNode) (bool, error) {
r.deliverLocalRaftMsgsRaftMuLockedReplicaMuLocked(ctx, raftGroup)
if err := r.deliverLocalRaftMsgsRaftMuLockedReplicaMuLocked(ctx, raftGroup); err != nil {
return false, err
}

if stats.apply.numConfChangeEntries > 0 {
// If the raft leader got removed, campaign on the leaseholder. Uses
Expand Down Expand Up @@ -1385,9 +1396,8 @@ const (
func (r *Replica) refreshProposalsLocked(
ctx context.Context, refreshAtDelta int, reason refreshRaftReason,
) {
if refreshAtDelta != 0 && reason != reasonTicks {
log.Fatalf(ctx, "refreshAtDelta specified for reason %s != reasonTicks", reason)
}
_ = must.False(ctx, refreshAtDelta != 0 && reason != reasonTicks,
"refreshAtDelta specified for reason %s != reasonTicks", reason)

var maxSlowProposalDurationRequest *kvpb.BatchRequest
// TODO(tbg): don't track exempt requests for tripping the breaker?
Expand Down Expand Up @@ -1625,7 +1635,7 @@ func (r *Replica) sendRaftMessages(
}
switch message.Type {
case raftpb.MsgApp:
if util.RaceEnabled {
must.Expensive(func() {
// Iterate over the entries to assert that all sideloaded commands
// are already inlined. replicaRaftStorage.Entries already performs
// the sideload inlining for stable entries and raft.unstable always
Expand All @@ -1642,25 +1652,26 @@ func (r *Replica) sendRaftMessages(
prevIndex := message.Index // index of entry preceding the append
for j := range message.Entries {
ent := &message.Entries[j]
logstore.AssertSideloadedRaftCommandInlined(ctx, ent)

if prevIndex+1 != ent.Index {
log.Fatalf(ctx,
"index gap in outgoing MsgApp: idx %d followed by %d",
prevIndex, ent.Index,
)
}
prevIndex = ent.Index
if prevTerm > ent.Term {
log.Fatalf(ctx,
"term regression in outgoing MsgApp: idx %d at term=%d "+
"appended with logterm=%d",
ent.Index, ent.Term, message.LogTerm,
)
typ, err := raftlog.EncodingOf(*ent)
_ = must.NoError(ctx, err, "failed to detect entry encoding")
if typ.IsSideloaded() {
e, err := raftlog.NewEntry(*ent)
_ = must.NoError(ctx, err, "failed to populate entry")
_ = must.NotEmpty(ctx, e.Cmd.ReplicatedEvalResult.AddSSTable.Data,
"found thin sideloaded raft command: %+v", e.Cmd)
}

_ = must.Equal(ctx, prevIndex+1, ent.Index,
"index gap in outgoing MsgApp: idx %d followed by %d", prevIndex, ent.Index)
_ = must.LessOrEqual(ctx, prevTerm, ent.Term,
"term regression in outgoing MsgApp: idx %d at term=%d appended with logterm=%d",
ent.Index, ent.Term, message.LogTerm)

prevTerm = ent.Term
prevIndex = ent.Index
}
}
})

case raftpb.MsgAppResp:
// A successful (non-reject) MsgAppResp contains one piece of
Expand Down Expand Up @@ -1718,7 +1729,7 @@ func (r *Replica) sendLocalRaftMsg(msg raftpb.Message, willDeliverLocal bool) {
// the provided raw node.
func (r *Replica) deliverLocalRaftMsgsRaftMuLockedReplicaMuLocked(
ctx context.Context, raftGroup *raft.RawNode,
) {
) error {
r.raftMu.AssertHeld()
r.mu.AssertHeld()
r.localMsgs.Lock()
Expand All @@ -1741,15 +1752,16 @@ func (r *Replica) deliverLocalRaftMsgsRaftMuLockedReplicaMuLocked(

for i, m := range localMsgs {
if err := raftGroup.Step(m); err != nil {
log.Fatalf(ctx, "unexpected error stepping local raft message [%s]: %v",
raftDescribeMessage(m, raftEntryFormatter), err)
return errors.Wrapf(err, "unexpected error stepping local raft message [%s]",
raftDescribeMessage(m, raftEntryFormatter))
}
// NB: we can reset messages in the localMsgs.recycled slice without holding
// the localMsgs mutex because no-one ever writes to localMsgs.recycled and
// we are holding raftMu, which must be held to switch localMsgs.active and
// localMsgs.recycled.
localMsgs[i].Reset() // for GC
}
return nil
}

// sendRaftMessage sends a Raft message.
Expand Down Expand Up @@ -1842,7 +1854,9 @@ func (r *Replica) sendRaftMessageRequest(
return r.store.cfg.Transport.SendAsync(req, r.connectionClass.get())
}

func (r *Replica) reportSnapshotStatus(ctx context.Context, to roachpb.ReplicaID, snapErr error) {
func (r *Replica) reportSnapshotStatus(
ctx context.Context, to roachpb.ReplicaID, snapErr error,
) error {
r.raftMu.Lock()
defer r.raftMu.Unlock()

Expand All @@ -1867,8 +1881,9 @@ func (r *Replica) reportSnapshotStatus(ctx context.Context, to roachpb.ReplicaID
raftGroup.ReportSnapshot(uint64(to), snapStatus)
return true, nil
}); err != nil && !errors.Is(err, errRemoved) {
log.Fatalf(ctx, "%v", err)
return err
}
return nil
}

type snapTruncationInfo struct {
Expand Down Expand Up @@ -2440,7 +2455,7 @@ func (m lastUpdateTimesMap) isFollowerActiveSince(
// for details about the lock itself.
func (r *Replica) maybeAcquireSnapshotMergeLock(
ctx context.Context, inSnap IncomingSnapshot,
) (subsumedRepls []*Replica, releaseMergeLock func()) {
) (subsumedRepls []*Replica, releaseMergeLock func(), err error) {
// Any replicas that overlap with the bounds of the incoming snapshot are ours
// to subsume; further, the end of the last overlapping replica will exactly
// align with the end of the snapshot. How are we guaranteed this? Each merge
Expand All @@ -2454,22 +2469,25 @@ func (r *Replica) maybeAcquireSnapshotMergeLock(
if endKey == nil {
// The existing replica is unitialized, in which case we've already
// installed a placeholder for snapshot's keyspace. No merge lock needed.
return nil, func() {}
return nil, func() {}, nil
}
for endKey.Less(inSnap.Desc.EndKey) {
sRepl := r.store.LookupReplica(endKey)
if sRepl == nil || !endKey.Equal(sRepl.Desc().StartKey) {
log.Fatalf(ctx, "snapshot widens existing replica, but no replica exists for subsumed key %s", endKey)
if err = must.True(ctx, sRepl != nil && endKey.Equal(sRepl.Desc().StartKey),
"snapshot widens existing replica, but no replica exists for subsumed key %s", endKey); err != nil {
return nil, nil, err
}
sRepl.raftMu.Lock()
subsumedRepls = append(subsumedRepls, sRepl)
endKey = sRepl.Desc().EndKey
}
for _, sRepl := range subsumedRepls {
sRepl.raftMu.Lock()
}
return subsumedRepls, func() {
for _, sr := range subsumedRepls {
sr.raftMu.Unlock()
}
}
}, nil
}

// maybeAcquireSplitMergeLock examines the given raftCmd (which need
Expand Down

0 comments on commit 9d2c60c

Please sign in to comment.