From 37cb585feca0ed74530456275b512a76c6bf325f Mon Sep 17 00:00:00 2001 From: Ben Darnell Date: Wed, 28 Oct 2015 23:33:44 -0400 Subject: [PATCH] Fix a race between splits and snapshots. When a range is split, followers of that range may receive a snapshot from the right-hand side of the split before they have caught up and processed the left-hand side where the split originated. This results in a "range already exists" panic. The solution is to silently drop any snapshots which would cause a conflict. They will be retried and will succeed once the left-hand range has performed its split. Fixes #1644. Also check destination stopper in multiTestContext.rpcSend --- multiraft/multiraft.go | 9 ++ multiraft/storage.go | 11 +++ multiraft/storage_test.go | 4 + storage/client_split_test.go | 167 +++++++++++++++++++++++++++++++++++ storage/client_test.go | 22 ++++- storage/store.go | 35 ++++++++ 6 files changed, 245 insertions(+), 3 deletions(-) diff --git a/multiraft/multiraft.go b/multiraft/multiraft.go index c386c02341ab..dce651c16a7b 100644 --- a/multiraft/multiraft.go +++ b/multiraft/multiraft.go @@ -750,9 +750,18 @@ func (s *state) handleMessage(req *RaftMessageRequest) { case raftpb.MsgHeartbeat: s.fanoutHeartbeat(req) return + case raftpb.MsgHeartbeatResp: s.fanoutHeartbeatResponse(req) return + + case raftpb.MsgSnap: + if !s.Storage.CanApplySnapshot(req.GroupID, req.Message.Snapshot) { + // If the storage cannot accept the snapshot, drop it before + // passing it to multiNode.Step, since our error handling + // options past that point are limited. + return + } } s.CacheReplicaDescriptor(req.GroupID, req.FromReplica) diff --git a/multiraft/storage.go b/multiraft/storage.go index 04a322b47d1b..9821ae8241db 100644 --- a/multiraft/storage.go +++ b/multiraft/storage.go @@ -53,6 +53,12 @@ type Storage interface { ReplicaIDForStore(groupID roachpb.RangeID, storeID roachpb.StoreID) (roachpb.ReplicaID, error) ReplicasFromSnapshot(snap raftpb.Snapshot) ([]roachpb.ReplicaDescriptor, error) + // CanApplySnapshot should return false if attempting to apply the + // given snapshot would result in an error. This allows snapshots to + // be dropped cleanly since errors deep inside raft often result in + // panics. + CanApplySnapshot(groupID roachpb.RangeID, snap raftpb.Snapshot) bool + // GroupLocker returns a lock which (if non-nil) will be acquired // when a group is being created (which entails multiple calls to // Storage and StateMachine methods and may race with the removal of @@ -118,6 +124,11 @@ func (m *MemoryStorage) ReplicasFromSnapshot(_ raftpb.Snapshot) ([]roachpb.Repli return nil, nil } +// CanApplySnapshot implements the Storage interface. +func (m *MemoryStorage) CanApplySnapshot(_ roachpb.RangeID, _ raftpb.Snapshot) bool { + return true +} + // GroupLocker implements the Storage interface by returning nil. func (m *MemoryStorage) GroupLocker() sync.Locker { return nil diff --git a/multiraft/storage_test.go b/multiraft/storage_test.go index 0bfdea09a4c6..d9f9df88b947 100644 --- a/multiraft/storage_test.go +++ b/multiraft/storage_test.go @@ -53,6 +53,10 @@ func (b *BlockableStorage) ReplicasFromSnapshot(snap raftpb.Snapshot) ([]roachpb return b.storage.ReplicasFromSnapshot(snap) } +func (b *BlockableStorage) CanApplySnapshot(groupID roachpb.RangeID, snap raftpb.Snapshot) bool { + return b.storage.CanApplySnapshot(groupID, snap) +} + func (b *BlockableStorage) GroupLocker() sync.Locker { return b.storage.GroupLocker() } diff --git a/storage/client_split_test.go b/storage/client_split_test.go index 960894d496b7..2a81b786b53a 100644 --- a/storage/client_split_test.go +++ b/storage/client_split_test.go @@ -600,3 +600,170 @@ func TestStoreRangeSystemSplits(t *testing.T) { t.Errorf("expected splits not found: %s", err) } } + +// setupSplitSnapshotRace engineers a situation in which a range has +// been split but node 3 hasn't processed it yet. There is a race +// depending on whether node 3 learns of the split from its left or +// right side. When this function returns most of the nodes will be +// stopped, and depending on the order in which they are restarted, we +// can arrange for both possible outcomes of the race. +// +// Range 1 is the system keyspace, located on node 0. +// The range containing leftKey is the left side of the split, located on nodes 1, 2, and 3. +// The range containing rightKey is the right side of the split, located on nodes 3, 4, and 5. +// Nodes 1-5 are stopped; only node 0 is running. +// +// See https://github.com/cockroachdb/cockroach/issues/1644. +func setupSplitSnapshotRace(t *testing.T) (mtc *multiTestContext, leftKey roachpb.Key, rightKey roachpb.Key) { + mtc = startMultiTestContext(t, 6) + + leftKey = roachpb.Key("a") + rightKey = roachpb.Key("z") + + // First, do a couple of writes; we'll use these to determine when + // the dust has settled. + incArgs := incrementArgs(leftKey, 1) + if _, err := client.SendWrapped(rg1(mtc.stores[0]), nil, &incArgs); err != nil { + t.Fatal(err) + } + incArgs = incrementArgs(rightKey, 2) + if _, err := client.SendWrapped(rg1(mtc.stores[0]), nil, &incArgs); err != nil { + t.Fatal(err) + } + + // Split the system range from the rest of the keyspace. + splitArgs := adminSplitArgs(roachpb.KeyMin, keys.SystemMax) + if _, err := client.SendWrapped(rg1(mtc.stores[0]), nil, &splitArgs); err != nil { + t.Fatal(err) + } + + // Get the left range's ID. This is currently 2, but using + // LookupReplica is more future-proof (and see below for + // rightRangeID). + leftRangeID := mtc.stores[0].LookupReplica(roachpb.RKey("a"), nil).Desc().RangeID + + // Replicate the left range onto nodes 1-3 and remove it from node 0. + mtc.replicateRange(leftRangeID, 0, 1, 2, 3) + mtc.unreplicateRange(leftRangeID, 0, 0) + mtc.expireLeaderLeases() + + mtc.waitForValues(leftKey, 3*time.Second, []int64{0, 1, 1, 1, 0, 0}) + mtc.waitForValues(rightKey, 3*time.Second, []int64{0, 2, 2, 2, 0, 0}) + + // Stop node 3 so it doesn't hear about the split. + mtc.stopStore(3) + mtc.expireLeaderLeases() + + // Split the data range. + splitArgs = adminSplitArgs(keys.SystemMax, roachpb.Key("m")) + if _, err := client.SendWrapped(mtc.distSender, nil, &splitArgs); err != nil { + t.Fatal(err) + } + + // Get the right range's ID. Since the split was performed on node + // 1, it is currently 11 and not 3 as might be expected. + rightRangeID := mtc.stores[1].LookupReplica(roachpb.RKey("z"), nil).Desc().RangeID + + // Relocate the right range onto nodes 3-5. + mtc.replicateRange(rightRangeID, 1, 4, 5) + mtc.unreplicateRange(rightRangeID, 1, 2) + mtc.unreplicateRange(rightRangeID, 1, 1) + + mtc.waitForValues(rightKey, 3*time.Second, []int64{0, 0, 0, 2, 2, 2}) + + // Stop the remaining data stores. + mtc.stopStore(1) + mtc.stopStore(2) + // 3 is already stopped. + mtc.stopStore(4) + mtc.stopStore(5) + mtc.expireLeaderLeases() + + return mtc, leftKey, rightKey +} + +// TestSplitSnapshotRace_SplitWins exercises one outcome of the +// split/snapshot race: The left side of the split propagates first, +// so the split completes before it sees a competing snapshot. This is +// the more common outcome in practice. +func TestSplitSnapshotRace_SplitWins(t *testing.T) { + defer leaktest.AfterTest(t) + mtc, leftKey, rightKey := setupSplitSnapshotRace(t) + defer mtc.Stop() + + // Bring the left range up first so that the split happens before it sees a snapshot. + for i := 1; i <= 3; i++ { + mtc.restartStore(i) + } + + // Perform a write on the left range and wait for it to propagate. + incArgs := incrementArgs(leftKey, 10) + if _, err := client.SendWrapped(mtc.distSender, nil, &incArgs); err != nil { + t.Fatal(err) + } + mtc.waitForValues(leftKey, 3*time.Second, []int64{0, 11, 11, 11, 0, 0}) + + // Now wake the other stores up. + mtc.restartStore(4) + mtc.restartStore(5) + + // Write to the right range. + incArgs = incrementArgs(rightKey, 20) + if _, err := client.SendWrapped(mtc.distSender, nil, &incArgs); err != nil { + t.Fatal(err) + } + mtc.waitForValues(rightKey, 3*time.Second, []int64{0, 0, 0, 22, 22, 22}) +} + +// TestSplitSnapshotRace_SplitWins exercises one outcome of the +// split/snapshot race: The right side of the split replicates first, +// so target node sees a raft snapshot before it has processed the split, +// so it still has a conflicting range. +func TestSplitSnapshotRace_SnapshotWins(t *testing.T) { + defer leaktest.AfterTest(t) + mtc, leftKey, rightKey := setupSplitSnapshotRace(t) + defer mtc.Stop() + + // Bring the right range up first. + for i := 3; i <= 5; i++ { + mtc.restartStore(i) + } + + // Perform a write on the right range. + incArgs := incrementArgs(rightKey, 20) + if _, err := client.SendWrapped(mtc.distSender, nil, &incArgs); err != nil { + t.Fatal(err) + } + + // It immediately propagates between nodes 4 and 5, but node 3 + // remains at its old value. It can't accept the right-hand range + // because it conflicts with its not-yet-split copy of the left-hand + // range. This test is not completely deterministic: we want to make + // sure that node 3 doesn't panic when it receives the snapshot, but + // since it silently drops the message there is nothing we can wait + // for. There is a high probability that the message will have been + // received by the time that nodes 4 and 5 have processed their + // update. + mtc.waitForValues(rightKey, 3*time.Second, []int64{0, 0, 0, 2, 22, 22}) + + // Wake up the left-hand range. This will allow the left-hand + // range's split to complete and unblock the right-hand range. + mtc.restartStore(1) + mtc.restartStore(2) + + // Perform writes on both sides. This is not strictly necessary but + // it helps wake up dormant ranges that would otherwise have to wait + // for retry timeouts. + incArgs = incrementArgs(leftKey, 10) + if _, err := client.SendWrapped(mtc.distSender, nil, &incArgs); err != nil { + t.Fatal(err) + } + mtc.waitForValues(leftKey, 3*time.Second, []int64{0, 11, 11, 11, 0, 0}) + + incArgs = incrementArgs(rightKey, 200) + if _, err := client.SendWrapped(mtc.distSender, nil, &incArgs); err != nil { + t.Fatal(err) + } + mtc.waitForValues(rightKey, 3*time.Second, []int64{0, 0, 0, 222, 222, 222}) + +} diff --git a/storage/client_test.go b/storage/client_test.go index 72500cda90c2..73a563aa57df 100644 --- a/storage/client_test.go +++ b/storage/client_test.go @@ -142,6 +142,7 @@ type multiTestContext struct { gossip *gossip.Gossip storePool *storage.StorePool transport multiraft.Transport + distSender *kv.DistSender db *client.DB feed *util.Feed @@ -214,12 +215,12 @@ func (m *multiTestContext) Start(t *testing.T, numStores int) { m.senders = append(m.senders, kv.NewLocalSender()) if m.db == nil { - distSender := kv.NewDistSender(&kv.DistSenderContext{ + m.distSender = kv.NewDistSender(&kv.DistSenderContext{ Clock: m.clock, RangeDescriptorDB: m.senders[0], RPCSend: m.rpcSend, }, m.gossip) - sender := kv.NewTxnCoordSender(distSender, m.clock, false, nil, m.clientStopper) + sender := kv.NewTxnCoordSender(m.distSender, m.clock, false, nil, m.clientStopper) m.db = client.NewDB(sender) } @@ -296,7 +297,19 @@ func (m *multiTestContext) rpcSend(_ rpc.Options, _ string, addrs []net.Addr, if stErr != nil { m.t.Fatal(stErr) } - br, pErr = m.senders[nodeID-1].Send(context.Background(), ba) + nodeIndex := nodeID - 1 + // The rpcSend method crosses store boundaries: it is possible that the + // destination store is stopped while the source is still running. + // Run the send in a Task on the destination store to simulate what + // would happen with real RPCs. + if s := m.stoppers[nodeIndex]; s == nil || !s.RunTask(func() { + br, pErr = m.senders[nodeIndex].Send(context.Background(), ba) + }) { + pErr = &roachpb.Error{} + pErr.SetGoError(rpc.NewSendError("store is stopped", false)) + m.expireLeaderLeases() + continue + } if pErr == nil { return []proto.Message{br}, nil } @@ -309,6 +322,9 @@ func (m *multiTestContext) rpcSend(_ rpc.Options, _ string, addrs []net.Addr, // from the group. Move the manual clock forward in an attempt to // expire the lease. m.expireLeaderLeases() + } else if m.stores[tErr.Leader.NodeID-1] == nil { + // The leader is known but down, so expire its lease. + m.expireLeaderLeases() } default: if testutils.IsError(tErr, `store \d+ not found`) { diff --git a/storage/store.go b/storage/store.go index cfb9b3be8250..d4b4f9ac813d 100644 --- a/storage/store.go +++ b/storage/store.go @@ -918,6 +918,13 @@ func (s *Store) SplitRange(origRng, newRng *Replica) error { if s.replicasByKey.ReplaceOrInsert(origRng) != nil { return util.Errorf("couldn't insert range %v in rangesByKey btree", origRng) } + + // If we have an uninitialized replica of the new range, delete it to make + // way for the complete one created by the split. + if _, ok := s.uninitReplicas[newDesc.RangeID]; ok { + delete(s.uninitReplicas, newDesc.RangeID) + delete(s.replicas, newDesc.RangeID) + } if err := s.addReplicaInternal(newRng); err != nil { return util.Errorf("couldn't insert range %v in rangesByKey btree: %s", newRng, err) } @@ -1599,6 +1606,34 @@ func (s *Store) GroupLocker() sync.Locker { return &s.raftGroupLocker } +// CanApplySnapshot implements the multiraft.Storage interface. +func (s *Store) CanApplySnapshot(rangeID roachpb.RangeID, snap raftpb.Snapshot) bool { + s.mu.Lock() + defer s.mu.Unlock() + if r, ok := s.replicas[rangeID]; ok && r.isInitialized() { + // We have the range and it's initialized, so let the snapshot + // through. + return true + } + + // We don't have the range (or we have an uninitialized + // placeholder). Will we be able to create/initialize it? + // TODO(bdarnell): can we avoid parsing this twice? + var parsedSnap roachpb.RaftSnapshotData + if err := parsedSnap.Unmarshal(snap.Data); err != nil { + return false + } + + if s.replicasByKey.Has(rangeBTreeKey(parsedSnap.RangeDescriptor.EndKey)) { + // We have a conflicting range, so we must block the snapshot. + // When such a conflict exists, it will be resolved by one range + // either being split or garbage collected. + return false + } + + return true +} + // AppliedIndex implements the multiraft.StateMachine interface. func (s *Store) AppliedIndex(groupID roachpb.RangeID) (uint64, error) { s.mu.RLock()