Skip to content

Commit

Permalink
Fix a race between splits and snapshots.
Browse files Browse the repository at this point in the history
When a range is split, followers of that range may receive a snapshot
from the right-hand side of the split before they have caught up and
processed the left-hand side where the split originated. This results in
a "range already exists" panic.

The solution is to silently drop any snapshots which would cause a
conflict. They will be retried and will succeed once the left-hand range
has performed its split.

Fixes cockroachdb#1644.

Also check destination stopper in multiTestContext.rpcSend
  • Loading branch information
bdarnell committed Nov 3, 2015
1 parent bf9c895 commit 37cb585
Show file tree
Hide file tree
Showing 6 changed files with 245 additions and 3 deletions.
9 changes: 9 additions & 0 deletions multiraft/multiraft.go
Original file line number Diff line number Diff line change
Expand Up @@ -750,9 +750,18 @@ func (s *state) handleMessage(req *RaftMessageRequest) {
case raftpb.MsgHeartbeat:
s.fanoutHeartbeat(req)
return

case raftpb.MsgHeartbeatResp:
s.fanoutHeartbeatResponse(req)
return

case raftpb.MsgSnap:
if !s.Storage.CanApplySnapshot(req.GroupID, req.Message.Snapshot) {
// If the storage cannot accept the snapshot, drop it before
// passing it to multiNode.Step, since our error handling
// options past that point are limited.
return
}
}

s.CacheReplicaDescriptor(req.GroupID, req.FromReplica)
Expand Down
11 changes: 11 additions & 0 deletions multiraft/storage.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,12 @@ type Storage interface {
ReplicaIDForStore(groupID roachpb.RangeID, storeID roachpb.StoreID) (roachpb.ReplicaID, error)
ReplicasFromSnapshot(snap raftpb.Snapshot) ([]roachpb.ReplicaDescriptor, error)

// CanApplySnapshot should return false if attempting to apply the
// given snapshot would result in an error. This allows snapshots to
// be dropped cleanly since errors deep inside raft often result in
// panics.
CanApplySnapshot(groupID roachpb.RangeID, snap raftpb.Snapshot) bool

// GroupLocker returns a lock which (if non-nil) will be acquired
// when a group is being created (which entails multiple calls to
// Storage and StateMachine methods and may race with the removal of
Expand Down Expand Up @@ -118,6 +124,11 @@ func (m *MemoryStorage) ReplicasFromSnapshot(_ raftpb.Snapshot) ([]roachpb.Repli
return nil, nil
}

// CanApplySnapshot implements the Storage interface.
func (m *MemoryStorage) CanApplySnapshot(_ roachpb.RangeID, _ raftpb.Snapshot) bool {
return true
}

// GroupLocker implements the Storage interface by returning nil.
func (m *MemoryStorage) GroupLocker() sync.Locker {
return nil
Expand Down
4 changes: 4 additions & 0 deletions multiraft/storage_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,10 @@ func (b *BlockableStorage) ReplicasFromSnapshot(snap raftpb.Snapshot) ([]roachpb
return b.storage.ReplicasFromSnapshot(snap)
}

func (b *BlockableStorage) CanApplySnapshot(groupID roachpb.RangeID, snap raftpb.Snapshot) bool {
return b.storage.CanApplySnapshot(groupID, snap)
}

func (b *BlockableStorage) GroupLocker() sync.Locker {
return b.storage.GroupLocker()
}
Expand Down
167 changes: 167 additions & 0 deletions storage/client_split_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -600,3 +600,170 @@ func TestStoreRangeSystemSplits(t *testing.T) {
t.Errorf("expected splits not found: %s", err)
}
}

// setupSplitSnapshotRace engineers a situation in which a range has
// been split but node 3 hasn't processed it yet. There is a race
// depending on whether node 3 learns of the split from its left or
// right side. When this function returns most of the nodes will be
// stopped, and depending on the order in which they are restarted, we
// can arrange for both possible outcomes of the race.
//
// Range 1 is the system keyspace, located on node 0.
// The range containing leftKey is the left side of the split, located on nodes 1, 2, and 3.
// The range containing rightKey is the right side of the split, located on nodes 3, 4, and 5.
// Nodes 1-5 are stopped; only node 0 is running.
//
// See https://github.com/cockroachdb/cockroach/issues/1644.
func setupSplitSnapshotRace(t *testing.T) (mtc *multiTestContext, leftKey roachpb.Key, rightKey roachpb.Key) {
mtc = startMultiTestContext(t, 6)

leftKey = roachpb.Key("a")
rightKey = roachpb.Key("z")

// First, do a couple of writes; we'll use these to determine when
// the dust has settled.
incArgs := incrementArgs(leftKey, 1)
if _, err := client.SendWrapped(rg1(mtc.stores[0]), nil, &incArgs); err != nil {
t.Fatal(err)
}
incArgs = incrementArgs(rightKey, 2)
if _, err := client.SendWrapped(rg1(mtc.stores[0]), nil, &incArgs); err != nil {
t.Fatal(err)
}

// Split the system range from the rest of the keyspace.
splitArgs := adminSplitArgs(roachpb.KeyMin, keys.SystemMax)
if _, err := client.SendWrapped(rg1(mtc.stores[0]), nil, &splitArgs); err != nil {
t.Fatal(err)
}

// Get the left range's ID. This is currently 2, but using
// LookupReplica is more future-proof (and see below for
// rightRangeID).
leftRangeID := mtc.stores[0].LookupReplica(roachpb.RKey("a"), nil).Desc().RangeID

// Replicate the left range onto nodes 1-3 and remove it from node 0.
mtc.replicateRange(leftRangeID, 0, 1, 2, 3)
mtc.unreplicateRange(leftRangeID, 0, 0)
mtc.expireLeaderLeases()

mtc.waitForValues(leftKey, 3*time.Second, []int64{0, 1, 1, 1, 0, 0})
mtc.waitForValues(rightKey, 3*time.Second, []int64{0, 2, 2, 2, 0, 0})

// Stop node 3 so it doesn't hear about the split.
mtc.stopStore(3)
mtc.expireLeaderLeases()

// Split the data range.
splitArgs = adminSplitArgs(keys.SystemMax, roachpb.Key("m"))
if _, err := client.SendWrapped(mtc.distSender, nil, &splitArgs); err != nil {
t.Fatal(err)
}

// Get the right range's ID. Since the split was performed on node
// 1, it is currently 11 and not 3 as might be expected.
rightRangeID := mtc.stores[1].LookupReplica(roachpb.RKey("z"), nil).Desc().RangeID

// Relocate the right range onto nodes 3-5.
mtc.replicateRange(rightRangeID, 1, 4, 5)
mtc.unreplicateRange(rightRangeID, 1, 2)
mtc.unreplicateRange(rightRangeID, 1, 1)

mtc.waitForValues(rightKey, 3*time.Second, []int64{0, 0, 0, 2, 2, 2})

// Stop the remaining data stores.
mtc.stopStore(1)
mtc.stopStore(2)
// 3 is already stopped.
mtc.stopStore(4)
mtc.stopStore(5)
mtc.expireLeaderLeases()

return mtc, leftKey, rightKey
}

// TestSplitSnapshotRace_SplitWins exercises one outcome of the
// split/snapshot race: The left side of the split propagates first,
// so the split completes before it sees a competing snapshot. This is
// the more common outcome in practice.
func TestSplitSnapshotRace_SplitWins(t *testing.T) {
defer leaktest.AfterTest(t)
mtc, leftKey, rightKey := setupSplitSnapshotRace(t)
defer mtc.Stop()

// Bring the left range up first so that the split happens before it sees a snapshot.
for i := 1; i <= 3; i++ {
mtc.restartStore(i)
}

// Perform a write on the left range and wait for it to propagate.
incArgs := incrementArgs(leftKey, 10)
if _, err := client.SendWrapped(mtc.distSender, nil, &incArgs); err != nil {
t.Fatal(err)
}
mtc.waitForValues(leftKey, 3*time.Second, []int64{0, 11, 11, 11, 0, 0})

// Now wake the other stores up.
mtc.restartStore(4)
mtc.restartStore(5)

// Write to the right range.
incArgs = incrementArgs(rightKey, 20)
if _, err := client.SendWrapped(mtc.distSender, nil, &incArgs); err != nil {
t.Fatal(err)
}
mtc.waitForValues(rightKey, 3*time.Second, []int64{0, 0, 0, 22, 22, 22})
}

// TestSplitSnapshotRace_SplitWins exercises one outcome of the
// split/snapshot race: The right side of the split replicates first,
// so target node sees a raft snapshot before it has processed the split,
// so it still has a conflicting range.
func TestSplitSnapshotRace_SnapshotWins(t *testing.T) {
defer leaktest.AfterTest(t)
mtc, leftKey, rightKey := setupSplitSnapshotRace(t)
defer mtc.Stop()

// Bring the right range up first.
for i := 3; i <= 5; i++ {
mtc.restartStore(i)
}

// Perform a write on the right range.
incArgs := incrementArgs(rightKey, 20)
if _, err := client.SendWrapped(mtc.distSender, nil, &incArgs); err != nil {
t.Fatal(err)
}

// It immediately propagates between nodes 4 and 5, but node 3
// remains at its old value. It can't accept the right-hand range
// because it conflicts with its not-yet-split copy of the left-hand
// range. This test is not completely deterministic: we want to make
// sure that node 3 doesn't panic when it receives the snapshot, but
// since it silently drops the message there is nothing we can wait
// for. There is a high probability that the message will have been
// received by the time that nodes 4 and 5 have processed their
// update.
mtc.waitForValues(rightKey, 3*time.Second, []int64{0, 0, 0, 2, 22, 22})

// Wake up the left-hand range. This will allow the left-hand
// range's split to complete and unblock the right-hand range.
mtc.restartStore(1)
mtc.restartStore(2)

// Perform writes on both sides. This is not strictly necessary but
// it helps wake up dormant ranges that would otherwise have to wait
// for retry timeouts.
incArgs = incrementArgs(leftKey, 10)
if _, err := client.SendWrapped(mtc.distSender, nil, &incArgs); err != nil {
t.Fatal(err)
}
mtc.waitForValues(leftKey, 3*time.Second, []int64{0, 11, 11, 11, 0, 0})

incArgs = incrementArgs(rightKey, 200)
if _, err := client.SendWrapped(mtc.distSender, nil, &incArgs); err != nil {
t.Fatal(err)
}
mtc.waitForValues(rightKey, 3*time.Second, []int64{0, 0, 0, 222, 222, 222})

}
22 changes: 19 additions & 3 deletions storage/client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -142,6 +142,7 @@ type multiTestContext struct {
gossip *gossip.Gossip
storePool *storage.StorePool
transport multiraft.Transport
distSender *kv.DistSender
db *client.DB
feed *util.Feed

Expand Down Expand Up @@ -214,12 +215,12 @@ func (m *multiTestContext) Start(t *testing.T, numStores int) {
m.senders = append(m.senders, kv.NewLocalSender())

if m.db == nil {
distSender := kv.NewDistSender(&kv.DistSenderContext{
m.distSender = kv.NewDistSender(&kv.DistSenderContext{
Clock: m.clock,
RangeDescriptorDB: m.senders[0],
RPCSend: m.rpcSend,
}, m.gossip)
sender := kv.NewTxnCoordSender(distSender, m.clock, false, nil, m.clientStopper)
sender := kv.NewTxnCoordSender(m.distSender, m.clock, false, nil, m.clientStopper)
m.db = client.NewDB(sender)
}

Expand Down Expand Up @@ -296,7 +297,19 @@ func (m *multiTestContext) rpcSend(_ rpc.Options, _ string, addrs []net.Addr,
if stErr != nil {
m.t.Fatal(stErr)
}
br, pErr = m.senders[nodeID-1].Send(context.Background(), ba)
nodeIndex := nodeID - 1
// The rpcSend method crosses store boundaries: it is possible that the
// destination store is stopped while the source is still running.
// Run the send in a Task on the destination store to simulate what
// would happen with real RPCs.
if s := m.stoppers[nodeIndex]; s == nil || !s.RunTask(func() {
br, pErr = m.senders[nodeIndex].Send(context.Background(), ba)
}) {
pErr = &roachpb.Error{}
pErr.SetGoError(rpc.NewSendError("store is stopped", false))
m.expireLeaderLeases()
continue
}
if pErr == nil {
return []proto.Message{br}, nil
}
Expand All @@ -309,6 +322,9 @@ func (m *multiTestContext) rpcSend(_ rpc.Options, _ string, addrs []net.Addr,
// from the group. Move the manual clock forward in an attempt to
// expire the lease.
m.expireLeaderLeases()
} else if m.stores[tErr.Leader.NodeID-1] == nil {
// The leader is known but down, so expire its lease.
m.expireLeaderLeases()
}
default:
if testutils.IsError(tErr, `store \d+ not found`) {
Expand Down
35 changes: 35 additions & 0 deletions storage/store.go
Original file line number Diff line number Diff line change
Expand Up @@ -918,6 +918,13 @@ func (s *Store) SplitRange(origRng, newRng *Replica) error {
if s.replicasByKey.ReplaceOrInsert(origRng) != nil {
return util.Errorf("couldn't insert range %v in rangesByKey btree", origRng)
}

// If we have an uninitialized replica of the new range, delete it to make
// way for the complete one created by the split.
if _, ok := s.uninitReplicas[newDesc.RangeID]; ok {
delete(s.uninitReplicas, newDesc.RangeID)
delete(s.replicas, newDesc.RangeID)
}
if err := s.addReplicaInternal(newRng); err != nil {
return util.Errorf("couldn't insert range %v in rangesByKey btree: %s", newRng, err)
}
Expand Down Expand Up @@ -1599,6 +1606,34 @@ func (s *Store) GroupLocker() sync.Locker {
return &s.raftGroupLocker
}

// CanApplySnapshot implements the multiraft.Storage interface.
func (s *Store) CanApplySnapshot(rangeID roachpb.RangeID, snap raftpb.Snapshot) bool {
s.mu.Lock()
defer s.mu.Unlock()
if r, ok := s.replicas[rangeID]; ok && r.isInitialized() {
// We have the range and it's initialized, so let the snapshot
// through.
return true
}

// We don't have the range (or we have an uninitialized
// placeholder). Will we be able to create/initialize it?
// TODO(bdarnell): can we avoid parsing this twice?
var parsedSnap roachpb.RaftSnapshotData
if err := parsedSnap.Unmarshal(snap.Data); err != nil {
return false
}

if s.replicasByKey.Has(rangeBTreeKey(parsedSnap.RangeDescriptor.EndKey)) {
// We have a conflicting range, so we must block the snapshot.
// When such a conflict exists, it will be resolved by one range
// either being split or garbage collected.
return false
}

return true
}

// AppliedIndex implements the multiraft.StateMachine interface.
func (s *Store) AppliedIndex(groupID roachpb.RangeID) (uint64, error) {
s.mu.RLock()
Expand Down

0 comments on commit 37cb585

Please sign in to comment.