Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

kv: fix lock ordering in Replica.applySnapshot #60504

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 3 additions & 3 deletions pkg/kv/kvserver/gc_queue_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -907,9 +907,9 @@ func TestGCQueueTransactionTable(t *testing.T) {
batch := tc.engine.NewSnapshot()
defer batch.Close()
tc.repl.raftMu.Lock()
tc.repl.mu.Lock()
tc.repl.assertStateLocked(ctx, batch) // check that in-mem and on-disk state were updated
tc.repl.mu.Unlock()
tc.repl.mu.RLock()
tc.repl.assertStateRaftMuLockedReplicaMuRLocked(ctx, batch) // check that in-mem and on-disk state were updated
tc.repl.mu.RUnlock()
tc.repl.raftMu.Unlock()
}

Expand Down
6 changes: 3 additions & 3 deletions pkg/kv/kvserver/helpers_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -263,9 +263,9 @@ func NewTestStorePool(cfg StoreConfig) *StorePool {
func (r *Replica) AssertState(ctx context.Context, reader storage.Reader) {
r.raftMu.Lock()
defer r.raftMu.Unlock()
r.mu.Lock()
defer r.mu.Unlock()
r.assertStateLocked(ctx, reader)
r.mu.RLock()
defer r.mu.RUnlock()
r.assertStateRaftMuLockedReplicaMuRLocked(ctx, reader)
}

func (r *Replica) RaftLock() {
Expand Down
10 changes: 6 additions & 4 deletions pkg/kv/kvserver/replica.go
Original file line number Diff line number Diff line change
Expand Up @@ -1134,10 +1134,12 @@ func (r *Replica) State() kvserverpb.RangeInfo {
return ri
}

// assertStateLocked can be called from the Raft goroutine to check that the
// in-memory and on-disk states of the Replica are congruent.
// Requires that both r.raftMu and r.mu are held.
func (r *Replica) assertStateLocked(ctx context.Context, reader storage.Reader) {
// assertStateRaftMuLockedReplicaMuRLocked can be called from the Raft goroutine
// to check that the in-memory and on-disk states of the Replica are congruent.
// Requires that r.raftMu is locked and r.mu is read locked.
func (r *Replica) assertStateRaftMuLockedReplicaMuRLocked(
ctx context.Context, reader storage.Reader,
) {
diskState, err := r.mu.stateLoader.Load(ctx, reader, r.mu.state.Desc)
if err != nil {
log.Fatalf(ctx, "%v", err)
Expand Down
6 changes: 3 additions & 3 deletions pkg/kv/kvserver/replica_application_state_machine.go
Original file line number Diff line number Diff line change
Expand Up @@ -1045,9 +1045,9 @@ func (sm *replicaStateMachine) ApplySideEffects(
if shouldAssert {
// Assert that the on-disk state doesn't diverge from the in-memory
// state as a result of the side effects.
sm.r.mu.Lock()
sm.r.assertStateLocked(ctx, sm.r.store.Engine())
sm.r.mu.Unlock()
sm.r.mu.RLock()
sm.r.assertStateRaftMuLockedReplicaMuRLocked(ctx, sm.r.store.Engine())
sm.r.mu.RUnlock()
sm.stats.stateAssertions++
}
} else if res := cmd.replicatedResult(); !res.IsZero() {
Expand Down
2 changes: 1 addition & 1 deletion pkg/kv/kvserver/replica_init.go
Original file line number Diff line number Diff line change
Expand Up @@ -213,7 +213,7 @@ func (r *Replica) loadRaftMuLockedReplicaMuLocked(desc *roachpb.RangeDescriptor)
); err != nil {
return errors.Wrap(err, "while initializing sideloaded storage")
}
r.assertStateLocked(ctx, r.store.Engine())
r.assertStateRaftMuLockedReplicaMuRLocked(ctx, r.store.Engine())
return nil
}

Expand Down
14 changes: 12 additions & 2 deletions pkg/kv/kvserver/replica_raftstorage.go
Original file line number Diff line number Diff line change
Expand Up @@ -969,15 +969,18 @@ func (r *Replica) applySnapshot(
// consume the SSTs above, meaning that at this point the in-memory state lags
// the on-disk state.

r.mu.Lock()
r.store.mu.Lock()
r.mu.Lock()
if r.store.removePlaceholderLocked(ctx, r.RangeID) {
atomic.AddInt32(&r.store.counts.filledPlaceholders, 1)
}
r.setDescLockedRaftMuLocked(ctx, s.Desc)
if err := r.store.maybeMarkReplicaInitializedLockedReplLocked(ctx, r); err != nil {
log.Fatalf(ctx, "unable to mark replica initialized while applying snapshot: %+v", err)
}
// NOTE: even though we acquired the store mutex first (according to the
// lock ordering rules described on Store.mu), it is safe to drop it first
// without risking a lock-ordering deadlock.
r.store.mu.Unlock()

// Invoke the leasePostApply method to ensure we properly initialize the
Expand Down Expand Up @@ -1010,9 +1013,16 @@ func (r *Replica) applySnapshot(
// Snapshots typically have fewer log entries than the leaseholder. The next
// time we hold the lease, recompute the log size before making decisions.
r.mu.raftLogSizeTrusted = false
r.assertStateLocked(ctx, r.store.Engine())
r.mu.Unlock()

// Assert that the in-memory and on-disk states of the Replica are congruent
// after the application of the snapshot. Do so under a read lock, as this
// operation can be expensive. This is safe, as we hold the Replica.raftMu
// across both Replica.mu critical sections.
r.mu.RLock()
r.assertStateRaftMuLockedReplicaMuRLocked(ctx, r.store.Engine())
r.mu.RUnlock()

// The rangefeed processor is listening for the logical ops attached to
// each raft command. These will be lost during a snapshot, so disconnect
// the rangefeed, if one exists.
Expand Down
8 changes: 4 additions & 4 deletions pkg/kv/kvserver/replica_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -406,9 +406,9 @@ func (tc *testContext) addBogusReplicaToRangeDesc(

tc.repl.setDescRaftMuLocked(ctx, &newDesc)
tc.repl.raftMu.Lock()
tc.repl.mu.Lock()
tc.repl.assertStateLocked(ctx, tc.engine)
tc.repl.mu.Unlock()
tc.repl.mu.RLock()
tc.repl.assertStateRaftMuLockedReplicaMuRLocked(ctx, tc.engine)
tc.repl.mu.RUnlock()
tc.repl.raftMu.Unlock()
return secondReplica, nil
}
Expand Down Expand Up @@ -9964,7 +9964,7 @@ func TestReplicaRecomputeStats(t *testing.T) {
disturbMS.ContainsEstimates = 0
ms.Add(*disturbMS)
err := repl.raftMu.stateLoader.SetMVCCStats(ctx, tc.engine, ms)
repl.assertStateLocked(ctx, tc.engine)
repl.assertStateRaftMuLockedReplicaMuRLocked(ctx, tc.engine)
repl.mu.Unlock()
repl.raftMu.Unlock()

Expand Down