Skip to content

Commit

Permalink
kvserver: use combined store/repl critical section in applySnapshot
Browse files Browse the repository at this point in the history
Fixes #58378.

Release note: None
  • Loading branch information
tbg committed Feb 3, 2021
1 parent e920176 commit ad78116
Show file tree
Hide file tree
Showing 5 changed files with 44 additions and 26 deletions.
11 changes: 7 additions & 4 deletions pkg/kv/kvserver/replica_raftstorage.go
Original file line number Diff line number Diff line change
Expand Up @@ -964,19 +964,22 @@ func (r *Replica) applySnapshot(
}

// Atomically swap the placeholder, if any, for the replica, and update the
// replica's descriptor.
// replica's state. Note that this is intentionally in one critical section.
// to avoid exposing an inconsistent in-memory state. We did however already
// consume the SSTs above, meaning that at this point the in-memory state lags
// the on-disk state.

r.mu.Lock()
r.store.mu.Lock()
if r.store.removePlaceholderLocked(ctx, r.RangeID) {
atomic.AddInt32(&r.store.counts.filledPlaceholders, 1)
}
r.setDescRaftMuLocked(ctx, s.Desc)
if err := r.store.maybeMarkReplicaInitializedLocked(ctx, r); err != nil {
r.setDescLockedRaftMuLocked(ctx, s.Desc)
if err := r.store.maybeMarkReplicaInitializedLockedReplLocked(ctx, r); err != nil {
log.Fatalf(ctx, "unable to mark replica initialized while applying snapshot: %+v", err)
}
r.store.mu.Unlock()

r.mu.Lock()
// Invoke the leasePostApply method to ensure we properly initialize the
// replica according to whether it holds the lease. We allow jumps in the
// lease sequence because there may be multiple lease changes accounted for
Expand Down
21 changes: 12 additions & 9 deletions pkg/kv/kvserver/store_create_replica.go
Original file line number Diff line number Diff line change
Expand Up @@ -311,25 +311,28 @@ func (s *Store) addReplicaToRangeMapLocked(repl *Replica) error {
// unintialized replica has become initialized so that the store can update its
// internal bookkeeping. It requires that Store.mu and Replica.raftMu
// are locked.
func (s *Store) maybeMarkReplicaInitializedLocked(ctx context.Context, repl *Replica) error {
if !repl.IsInitialized() {
return errors.Errorf("attempted to process uninitialized range %s", repl)
func (s *Store) maybeMarkReplicaInitializedLockedReplLocked(
ctx context.Context, lockedRepl *Replica,
) error {
desc := lockedRepl.descRLocked()
if !desc.IsInitialized() {
return errors.Errorf("attempted to process uninitialized range %s", desc)
}

rangeID := repl.RangeID
repl.startKey = repl.Desc().StartKey
rangeID := lockedRepl.RangeID
lockedRepl.startKey = desc.StartKey

if _, ok := s.mu.uninitReplicas[rangeID]; !ok {
// Do nothing if the range has already been initialized.
return nil
}
delete(s.mu.uninitReplicas, rangeID)

if it := s.getOverlappingKeyRangeLocked(repl.Desc()); it.item != nil {
return errors.Errorf("%s: cannot initialize replica; range %s has overlapping range %s",
s, repl, it.Desc())
if it := s.getOverlappingKeyRangeLocked(desc); it.item != nil {
return errors.Errorf("%s: cannot initialize replica; %s has overlapping range %s",
s, desc, it.Desc())
}
if it := s.mu.replicasByKey.ReplaceOrInsertReplica(ctx, repl); it.item != nil {
if it := s.mu.replicasByKey.ReplaceOrInsertReplica(ctx, lockedRepl); it.item != nil {
return errors.Errorf("range for key %v already exists in replicasByKey btree: %+v",
it.item.key(), it)
}
Expand Down
2 changes: 1 addition & 1 deletion pkg/kv/kvserver/store_replica_btree_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -97,7 +97,7 @@ func TestStoreReplicaBTree_LookupPrecedingReplica(t *testing.T) {
desc.EndKey = roachpb.RKey(end)
r := &Replica{}
r.mu.state.Desc = desc
r.startKey = desc.StartKey // this is what's actually used in btree
r.startKey = desc.StartKey // this is what's actually used in the btree
return r
}

Expand Down
30 changes: 21 additions & 9 deletions pkg/kv/kvserver/store_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -854,9 +854,13 @@ func TestMaybeMarkReplicaInitialized(t *testing.T) {

expectedResult := "attempted to process uninitialized range.*"
ctx := r.AnnotateCtx(context.Background())
if err := store.maybeMarkReplicaInitializedLocked(ctx, r); !testutils.IsError(err, expectedResult) {
t.Errorf("expected maybeMarkReplicaInitializedLocked with uninitialized replica to fail, got %v", err)
}
func() {
r.mu.Lock()
defer r.mu.Unlock()
if err := store.maybeMarkReplicaInitializedLockedReplLocked(ctx, r); !testutils.IsError(err, expectedResult) {
t.Errorf("expected maybeMarkReplicaInitializedLocked with uninitialized replica to fail, got %v", err)
}
}()

// Initialize the range with start and end keys.
desc = protoutil.Clone(desc).(*roachpb.RangeDescriptor)
Expand All @@ -869,16 +873,24 @@ func TestMaybeMarkReplicaInitialized(t *testing.T) {
}}
desc.NextReplicaID = 2
r.setDescRaftMuLocked(ctx, desc)
if err := store.maybeMarkReplicaInitializedLocked(ctx, r); err != nil {
t.Errorf("expected maybeMarkReplicaInitializedLocked on a replica that's not in the uninit map to silently succeed, got %v", err)
}
func() {
r.mu.Lock()
defer r.mu.Unlock()
if err := store.maybeMarkReplicaInitializedLockedReplLocked(ctx, r); err != nil {
t.Errorf("expected maybeMarkReplicaInitializedLocked on a replica that's not in the uninit map to silently succeed, got %v", err)
}
}()

store.mu.uninitReplicas[newRangeID] = r

expectedResult = ".*cannot initialize replica.*"
if err := store.maybeMarkReplicaInitializedLocked(ctx, r); !testutils.IsError(err, expectedResult) {
t.Errorf("expected maybeMarkReplicaInitializedLocked with overlapping keys to fail, got %v", err)
}
func() {
r.mu.Lock()
defer r.mu.Unlock()
if err := store.maybeMarkReplicaInitializedLockedReplLocked(ctx, r); !testutils.IsError(err, expectedResult) {
t.Errorf("expected maybeMarkReplicaInitializedLocked with overlapping keys to fail, got %v", err)
}
}()
}

// TestStoreSend verifies straightforward command execution
Expand Down
6 changes: 3 additions & 3 deletions pkg/sql/logictest/testdata/logic_test/crdb_internal
Original file line number Diff line number Diff line change
Expand Up @@ -353,11 +353,11 @@ SELECT node_id, network, regexp_replace(address, '\d+$', '<port>') as address, a
node_id network address attrs locality server_version
1 tcp 127.0.0.1:<port> [] region=test,dc=dc1 <server_version>

query IITBBT colnames
SELECT node_id, epoch, regexp_replace(expiration, '^\d+\.\d+,\d+$', '<timestamp>') as expiration, draining, decommissioning, membership FROM crdb_internal.gossip_liveness WHERE node_id = 1
query ITTBBT colnames
SELECT node_id, regexp_replace(epoch::string, '^\d+$', '<epoch>') as epoch, regexp_replace(expiration, '^\d+\.\d+,\d+$', '<timestamp>') as expiration, draining, decommissioning, membership FROM crdb_internal.gossip_liveness WHERE node_id = 1
----
node_id epoch expiration draining decommissioning membership
1 1 <timestamp> false false active
1 <epoch> <timestamp> false false active

query ITTTTTT colnames
SELECT node_id, network, regexp_replace(address, '\d+$', '<port>') as address, attrs, locality, regexp_replace(server_version, '^\d+\.\d+(-\d+)?$', '<server_version>') as server_version, regexp_replace(go_version, '^go.+$', '<go_version>') as go_version
Expand Down

0 comments on commit ad78116

Please sign in to comment.