Skip to content

Commit

Permalink
storage: s/replica.mu.proposals/replica.mu.localProposals/
Browse files Browse the repository at this point in the history
The reverse of 192a828.

Release note: None
  • Loading branch information
nvanbenschoten committed Nov 26, 2018
1 parent 762fc5c commit 62bb20b
Show file tree
Hide file tree
Showing 3 changed files with 26 additions and 26 deletions.
40 changes: 20 additions & 20 deletions pkg/storage/replica.go
Original file line number Diff line number Diff line change
Expand Up @@ -376,15 +376,15 @@ type Replica struct {
minBytes int64
// Max bytes before split.
maxBytes int64
// localProposals stores the Raft in-flight commands which originated at
// proposals stores the Raft in-flight commands which originated at
// this Replica, i.e. all commands for which propose has been called,
// but which have not yet applied.
//
// The *ProposalData in the map are "owned" by it. Elements from the
// map must only be referenced while Replica.mu is held, except if the
// element is removed from the map first. The notable exception is the
// contained RaftCommand, which we treat as immutable.
localProposals map[storagebase.CmdIDKey]*ProposalData
proposals map[storagebase.CmdIDKey]*ProposalData
internalRaftGroup *raft.RawNode
// The ID of the replica within the Raft group. May be 0 if the replica has
// been created from a preemptive snapshot (i.e. before being added to the
Expand Down Expand Up @@ -698,7 +698,7 @@ func (r *Replica) initRaftMuLockedReplicaMuLocked(
r.cmdQMu.queues[spanset.SpanLocal] = NewCommandQueue(false /* optimizeOverlap */)
r.cmdQMu.Unlock()

r.mu.localProposals = map[storagebase.CmdIDKey]*ProposalData{}
r.mu.proposals = map[storagebase.CmdIDKey]*ProposalData{}
r.mu.checksums = map[uuid.UUID]ReplicaChecksum{}
// Clear the internal raft group in case we're being reset. Since we're
// reloading the raft state below, it isn't safe to use the existing raft
Expand Down Expand Up @@ -865,7 +865,7 @@ func (r *Replica) destroyRaftMuLocked(ctx context.Context, nextReplicaID roachpb

func (r *Replica) cancelPendingCommandsLocked() {
r.mu.AssertHeld()
for _, p := range r.mu.localProposals {
for _, p := range r.mu.proposals {
r.cleanupFailedProposalLocked(p)
// NB: each proposal needs its own version of the error (i.e. don't try to
// share the error across proposals).
Expand All @@ -881,7 +881,7 @@ func (r *Replica) cancelPendingCommandsLocked() {
func (r *Replica) cleanupFailedProposalLocked(p *ProposalData) {
// Clear the proposal from the proposals map. May be a no-op if the
// proposal has not yet been inserted into the map.
delete(r.mu.localProposals, p.idKey)
delete(r.mu.proposals, p.idKey)
// Release associated quota pool resources if we have been tracking
// this command.
//
Expand Down Expand Up @@ -1883,7 +1883,7 @@ func (r *Replica) State() storagebase.RangeInfo {
var ri storagebase.RangeInfo
ri.ReplicaState = *(protoutil.Clone(&r.mu.state)).(*storagebase.ReplicaState)
ri.LastIndex = r.mu.lastIndex
ri.NumPending = uint64(len(r.mu.localProposals))
ri.NumPending = uint64(len(r.mu.proposals))
ri.RaftLogSize = r.mu.raftLogSize
ri.NumDropped = uint64(r.mu.droppedMessages)
if r.mu.proposalQuota != nil {
Expand Down Expand Up @@ -3522,11 +3522,11 @@ func (r *Replica) insertProposalLocked(
proposal.idKey, proposal.command.MaxLeaseIndex)
}

if _, ok := r.mu.localProposals[proposal.idKey]; ok {
if _, ok := r.mu.proposals[proposal.idKey]; ok {
ctx := r.AnnotateCtx(context.TODO())
log.Fatalf(ctx, "pending command already exists for %s", proposal.idKey)
}
r.mu.localProposals[proposal.idKey] = proposal
r.mu.proposals[proposal.idKey] = proposal
if isLease {
// For lease requests, we return zero because no real MaxLeaseIndex is assigned.
// We could also return the lastAssignedIndex but this invites confusion.
Expand Down Expand Up @@ -3750,7 +3750,7 @@ func (r *Replica) propose(
}

// Must not use `proposal` in the closure below as a proposal which is not
// present in r.mu.localProposals is no longer protected by the mutex. Abandoning
// present in r.mu.proposals is no longer protected by the mutex. Abandoning
// a command only abandons the associated context. As soon as we propose a
// command to Raft, ownership passes to the "below Raft" machinery. In
// particular, endCmds will be invoked when the command is applied. There are
Expand All @@ -3759,7 +3759,7 @@ func (r *Replica) propose(
// range.
tryAbandon := func() bool {
r.mu.Lock()
p, ok := r.mu.localProposals[idKey]
p, ok := r.mu.proposals[idKey]
if ok {
// TODO(radu): Should this context be created via tracer.ForkCtxSpan?
// We'd need to make sure the span is finished eventually.
Expand All @@ -3771,7 +3771,7 @@ func (r *Replica) propose(
return proposalCh, tryAbandon, maxLeaseIndex, nil
}

// submitProposalLocked proposes or re-proposes a command in r.mu.localProposals.
// submitProposalLocked proposes or re-proposes a command in r.mu.proposals.
// The replica lock must be held.
func (r *Replica) submitProposalLocked(p *ProposalData) error {
p.proposedAtTicks = r.mu.ticks
Expand Down Expand Up @@ -3886,9 +3886,9 @@ func (r *Replica) quiesce() bool {

func (r *Replica) quiesceLocked() bool {
ctx := r.AnnotateCtx(context.TODO())
if len(r.mu.localProposals) != 0 {
if len(r.mu.proposals) != 0 {
if log.V(3) {
log.Infof(ctx, "not quiescing: %d pending commands", len(r.mu.localProposals))
log.Infof(ctx, "not quiescing: %d pending commands", len(r.mu.proposals))
}
return false
}
Expand Down Expand Up @@ -4514,7 +4514,7 @@ func (r *Replica) tick(livenessMap map[roachpb.NodeID]bool) (bool, error) {
// correctness issues.
func (r *Replica) maybeQuiesceLocked(livenessMap map[roachpb.NodeID]bool) bool {
ctx := r.AnnotateCtx(context.TODO())
status, ok := shouldReplicaQuiesce(ctx, r, r.store.Clock().Now(), len(r.mu.localProposals), livenessMap)
status, ok := shouldReplicaQuiesce(ctx, r, r.store.Clock().Now(), len(r.mu.proposals), livenessMap)
if !ok {
return false
}
Expand Down Expand Up @@ -4766,7 +4766,7 @@ func (r *Replica) refreshProposalsLocked(refreshAtDelta int, reason refreshRaftR

numShouldRetry := 0
var reproposals pendingCmdSlice
for _, p := range r.mu.localProposals {
for _, p := range r.mu.proposals {
if p.command.MaxLeaseIndex == 0 {
// Commands without a MaxLeaseIndex cannot be reproposed, as they might
// apply twice. We also don't want to ask the proposer to retry these
Expand All @@ -4781,7 +4781,7 @@ func (r *Replica) refreshProposalsLocked(refreshAtDelta int, reason refreshRaftR
} else if cannotApplyAnyMore := !p.command.ReplicatedEvalResult.IsLeaseRequest &&
p.command.MaxLeaseIndex <= r.mu.state.LeaseAppliedIndex; cannotApplyAnyMore {
// The command's designated lease index slot was filled up. We got to
// LeaseAppliedIndex and p is still pending in r.mu.localProposals; generally
// LeaseAppliedIndex and p is still pending in r.mu.proposals; generally
// this means that proposal p didn't commit, and it will be sent back to
// the proposer for a retry - the request needs to be re-evaluated and the
// command re-proposed with a new MaxLeaseIndex. Note that this branch is not
Expand All @@ -4792,7 +4792,7 @@ func (r *Replica) refreshProposalsLocked(refreshAtDelta int, reason refreshRaftR
// reasonSnapshotApplied - in that case we don't know if p or some other
// command filled the p.command.MaxLeaseIndex slot (i.e. p might have been
// applied, but we're not watching for every proposal when applying a
// snapshot, so nobody removed p from r.mu.localProposals). In this
// snapshot, so nobody removed p from r.mu.proposals). In this
// ambiguous case, we'll also send the command back to the proposer for a
// retry, but the proposer needs to be aware that, if the retry fails, an
// AmbiguousResultError needs to be returned to the higher layers.
Expand Down Expand Up @@ -4853,7 +4853,7 @@ func (r *Replica) refreshProposalsLocked(refreshAtDelta int, reason refreshRaftR
// that they can make it in the right place. Reproposing in order is
// definitely required, however.
//
// TODO(tschottdorf): evaluate whether `r.mu.localProposals` should
// TODO(tschottdorf): evaluate whether `r.mu.proposals` should
// be a list/slice.
sort.Sort(reproposals)
for _, p := range reproposals {
Expand Down Expand Up @@ -5233,14 +5233,14 @@ func (r *Replica) processRaftCommand(
}

r.mu.Lock()
proposal, proposedLocally := r.mu.localProposals[idKey]
proposal, proposedLocally := r.mu.proposals[idKey]

// TODO(tschottdorf): consider the Trace situation here.
if proposedLocally {
// We initiated this command, so use the caller-supplied context.
ctx = proposal.ctx
proposal.ctx = nil // avoid confusion
delete(r.mu.localProposals, idKey)
delete(r.mu.proposals, idKey)
}

leaseIndex, proposalRetry, forcedErr := r.checkForcedErrLocked(ctx, idKey, raftCmd, proposal, proposedLocally)
Expand Down
2 changes: 1 addition & 1 deletion pkg/storage/replica_sideload.go
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,7 @@ func (r *Replica) maybeSideloadEntriesRaftMuLocked(
maybeRaftCommand := func(cmdID storagebase.CmdIDKey) (storagebase.RaftCommand, bool) {
r.mu.Lock()
defer r.mu.Unlock()
cmd, ok := r.mu.localProposals[cmdID]
cmd, ok := r.mu.proposals[cmdID]
if ok {
return *cmd.command, true
}
Expand Down
10 changes: 5 additions & 5 deletions pkg/storage/replica_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -7601,7 +7601,7 @@ func TestReplicaTryAbandon(t *testing.T) {
func() {
tc.repl.mu.Lock()
defer tc.repl.mu.Unlock()
if len(tc.repl.mu.localProposals) == 0 {
if len(tc.repl.mu.proposals) == 0 {
t.Fatal("expected non-empty proposals map")
}
}()
Expand Down Expand Up @@ -8157,7 +8157,7 @@ func TestReplicaBurstPendingCommandsAndRepropose(t *testing.T) {
}

tc.repl.mu.Lock()
for _, p := range tc.repl.mu.localProposals {
for _, p := range tc.repl.mu.proposals {
if v := p.ctx.Value(magicKey{}); v != nil {
origIndexes = append(origIndexes, int(p.command.MaxLeaseIndex))
}
Expand Down Expand Up @@ -8189,13 +8189,13 @@ func TestReplicaBurstPendingCommandsAndRepropose(t *testing.T) {

tc.repl.mu.Lock()
defer tc.repl.mu.Unlock()
nonePending := len(tc.repl.mu.localProposals) == 0
nonePending := len(tc.repl.mu.proposals) == 0
c := int(tc.repl.mu.lastAssignedLeaseIndex) - int(tc.repl.mu.state.LeaseAppliedIndex)
if nonePending && c > 0 {
t.Errorf("no pending cmds, but have required index offset %d", c)
}
if !nonePending {
t.Fatalf("still pending commands: %+v", tc.repl.mu.localProposals)
t.Fatalf("still pending commands: %+v", tc.repl.mu.proposals)
}
}

Expand Down Expand Up @@ -8353,7 +8353,7 @@ func TestReplicaRefreshPendingCommandsTicks(t *testing.T) {
}
// Build the map of expected reproposals at this stage.
m := map[storagebase.CmdIDKey]int{}
for id, p := range r.mu.localProposals {
for id, p := range r.mu.proposals {
m[id] = p.proposedAtTicks
}
r.mu.Unlock()
Expand Down

0 comments on commit 62bb20b

Please sign in to comment.