From 6adf047196e5fd6f253e550b8ddf1bcc92fa1872 Mon Sep 17 00:00:00 2001 From: Nathan VanBenschoten Date: Sat, 9 Dec 2017 01:38:35 -0500 Subject: [PATCH 1/2] storage: respect context deadline in idAllocator.Allocate Release note: None --- pkg/storage/id_alloc.go | 16 ++++++++++------ pkg/storage/id_alloc_test.go | 24 +++++++++++++++++------- pkg/storage/replica_command.go | 2 +- pkg/storage/store.go | 4 ++-- pkg/storage/store_test.go | 6 ++++-- 5 files changed, 34 insertions(+), 18 deletions(-) diff --git a/pkg/storage/id_alloc.go b/pkg/storage/id_alloc.go index 46b65a34b006..178336073d54 100644 --- a/pkg/storage/id_alloc.go +++ b/pkg/storage/id_alloc.go @@ -81,15 +81,19 @@ func newIDAllocator( } // Allocate allocates a new ID from the global KV DB. -func (ia *idAllocator) Allocate() (uint32, error) { +func (ia *idAllocator) Allocate(ctx context.Context) (uint32, error) { ia.once.Do(ia.start) - id := <-ia.ids - // when the channel is closed, the zero value is returned. - if id == 0 { - return id, errors.Errorf("could not allocate ID; system is draining") + select { + case id := <-ia.ids: + // when the channel is closed, the zero value is returned. + if id == 0 { + return id, errors.Errorf("could not allocate ID; system is draining") + } + return id, nil + case <-ctx.Done(): + return 0, ctx.Err() } - return id, nil } func (ia *idAllocator) start() { diff --git a/pkg/storage/id_alloc_test.go b/pkg/storage/id_alloc_test.go index c580a3cec1f4..1fbc91fb149d 100644 --- a/pkg/storage/id_alloc_test.go +++ b/pkg/storage/id_alloc_test.go @@ -56,7 +56,7 @@ func TestIDAllocator(t *testing.T) { for i := 0; i < maxI; i++ { go func() { for j := 0; j < maxJ; j++ { - id, err := idAlloc.Allocate() + id, err := idAlloc.Allocate(context.Background()) errChan <- err allocd <- id } @@ -111,7 +111,7 @@ func TestIDAllocatorNegativeValue(t *testing.T) { if err != nil { t.Errorf("failed to create IDAllocator: %v", err) } - value, err := idAlloc.Allocate() + value, err := idAlloc.Allocate(context.Background()) if err != nil { t.Fatal(err) } @@ -158,7 +158,7 @@ func TestAllocateErrorAndRecovery(t *testing.T) { t.Errorf("failed to create IDAllocator: %v", err) } - firstID, err := idAlloc.Allocate() + firstID, err := idAlloc.Allocate(context.Background()) if err != nil { t.Fatal(err) } @@ -172,7 +172,7 @@ func TestAllocateErrorAndRecovery(t *testing.T) { // Should be able to get the allocated IDs, and there will be one // background allocateBlock to get ID continuously. for i := 0; i < 8; i++ { - id, err := idAlloc.Allocate() + id, err := idAlloc.Allocate(context.Background()) if err != nil { t.Fatal(err) } @@ -194,7 +194,7 @@ func TestAllocateErrorAndRecovery(t *testing.T) { errChan <- nil } - id, err := idAlloc.Allocate() + id, err := idAlloc.Allocate(context.Background()) errChan <- err allocd <- id }() @@ -207,6 +207,16 @@ func TestAllocateErrorAndRecovery(t *testing.T) { } } + // Attempt a few allocations with a context timeout while allocations are + // blocked. All attempts should hit a context deadline exceeded error. + ctx, _ := context.WithTimeout(context.Background(), 10*time.Millisecond) + for i := 0; i < routines; i++ { + id, err := idAlloc.Allocate(ctx) + if id != 0 || err != context.DeadlineExceeded { + t.Errorf("expected context cancellation, found id=%d, err=%v", id, err) + } + } + // Make the IDAllocator valid again. idAlloc.idKey.Store(keys.RangeIDGenerator) // Check if the blocked allocations return expected ID. @@ -226,7 +236,7 @@ func TestAllocateErrorAndRecovery(t *testing.T) { // Check if the following allocations return expected ID. for i := 0; i < routines; i++ { - id, err := idAlloc.Allocate() + id, err := idAlloc.Allocate(context.Background()) if err != nil { t.Fatal(err) } @@ -254,7 +264,7 @@ func TestAllocateWithStopper(t *testing.T) { return idAlloc }() - if _, err := idAlloc.Allocate(); !testutils.IsError(err, "system is draining") { + if _, err := idAlloc.Allocate(context.Background()); !testutils.IsError(err, "system is draining") { t.Errorf("unexpected error: %v", err) } } diff --git a/pkg/storage/replica_command.go b/pkg/storage/replica_command.go index 593b0a5b1489..56fe8deb5993 100644 --- a/pkg/storage/replica_command.go +++ b/pkg/storage/replica_command.go @@ -2720,7 +2720,7 @@ func (r *Replica) adminSplitWithDescriptor( log.Event(ctx, "found split key") // Create right hand side range descriptor with the newly-allocated Range ID. - rightDesc, err := r.store.NewRangeDescriptor(splitKey, desc.EndKey, desc.Replicas) + rightDesc, err := r.store.NewRangeDescriptor(ctx, splitKey, desc.EndKey, desc.Replicas) if err != nil { return reply, true, roachpb.NewErrorf("unable to allocate right hand side range descriptor: %s", err) diff --git a/pkg/storage/store.go b/pkg/storage/store.go index a63d2e2978c1..607ef7518ac1 100644 --- a/pkg/storage/store.go +++ b/pkg/storage/store.go @@ -1778,9 +1778,9 @@ func (s *Store) IsDraining() bool { // range ID and returns a RangeDescriptor whose Replicas are a copy // of the supplied replicas slice, with appropriate ReplicaIDs assigned. func (s *Store) NewRangeDescriptor( - start, end roachpb.RKey, replicas []roachpb.ReplicaDescriptor, + ctx context.Context, start, end roachpb.RKey, replicas []roachpb.ReplicaDescriptor, ) (*roachpb.RangeDescriptor, error) { - id, err := s.rangeIDAlloc.Allocate() + id, err := s.rangeIDAlloc.Allocate(ctx) if err != nil { return nil, err } diff --git a/pkg/storage/store_test.go b/pkg/storage/store_test.go index 37dab066d04b..809c43d57a4b 100644 --- a/pkg/storage/store_test.go +++ b/pkg/storage/store_test.go @@ -1002,7 +1002,8 @@ func splitTestRange(store *Store, key, splitKey roachpb.RKey, t *testing.T) *Rep if repl == nil { t.Fatalf("couldn't lookup range for key %q", key) } - desc, err := store.NewRangeDescriptor(splitKey, repl.Desc().EndKey, repl.Desc().Replicas) + desc, err := store.NewRangeDescriptor( + context.Background(), splitKey, repl.Desc().EndKey, repl.Desc().Replicas) if err != nil { t.Fatal(err) } @@ -1062,7 +1063,8 @@ func TestStoreRangeIDAllocation(t *testing.T) { // to rangeIDAllocCount * 3 + 1. for i := 0; i < rangeIDAllocCount*3; i++ { replicas := []roachpb.ReplicaDescriptor{{StoreID: store.StoreID()}} - desc, err := store.NewRangeDescriptor(roachpb.RKey(fmt.Sprintf("%03d", i)), roachpb.RKey(fmt.Sprintf("%03d", i+1)), replicas) + desc, err := store.NewRangeDescriptor(context.Background(), + roachpb.RKey(fmt.Sprintf("%03d", i)), roachpb.RKey(fmt.Sprintf("%03d", i+1)), replicas) if err != nil { t.Fatal(err) } From 4f44c54b51a48e15eddabc996eb1297f9dbd9750 Mon Sep 17 00:00:00 2001 From: Nathan VanBenschoten Date: Sat, 9 Dec 2017 04:09:46 -0500 Subject: [PATCH 2/2] storage: add permitLargeSnapshots flag to replica In a privately reported user issue, we've seen that [our attempts](https://github.com/cockroachdb/cockroach/pull/7788) at [preventing large snapshots](https://github.com/cockroachdb/cockroach/issues/7581) can result in replica unavailability. Our current approach to limiting large snapshots assumes is that its ok to block snapshots indefinitely while waiting for a range to first split. Unfortunately, this can create a dependency cycle where a range requires a snapshot to split (because it can't achieve an up-to-date quorum without it) but isn't allowed to perform a snapshot until its size is reduced below the threshold. This can result in unavailability even when a majority of replicas remain live. Currently, we still need this snapshot size limit because unbounded snapshots can result in OOM errors that crash entire nodes. However, once snapshots are streamed from disk to disk, never needing to buffer in-memory on the sending or receiving side, we should be able to remove any snapshot size limit (see #16954). As a holdover, this change introduces a `permitLargeSnapshots` flag on a replica which is set when the replica is too large to snapshot but observes splits failing. When set, the flag allows snapshots to ignore the size limit until the snapshot goes through and splits are able to succeed again. Release note (bug fix): Fixed a scenario where a range that is too big to snapshot can lose availability even with a majority of nodes alive. --- pkg/storage/client_raft_test.go | 6 +- pkg/storage/client_split_test.go | 170 +++++++++++++++++++++++- pkg/storage/client_test.go | 6 + pkg/storage/helpers_test.go | 14 ++ pkg/storage/queue.go | 7 + pkg/storage/replica.go | 16 ++- pkg/storage/replica_raftstorage.go | 14 +- pkg/storage/replica_raftstorage_test.go | 14 ++ pkg/storage/split_queue.go | 11 ++ pkg/storage/store.go | 5 + 10 files changed, 251 insertions(+), 12 deletions(-) diff --git a/pkg/storage/client_raft_test.go b/pkg/storage/client_raft_test.go index 8032bb87dab2..d752ca2ccc24 100644 --- a/pkg/storage/client_raft_test.go +++ b/pkg/storage/client_raft_test.go @@ -3635,13 +3635,9 @@ func TestInitRaftGroupOnRequest(t *testing.T) { t.Fatal("replica should not be nil for RHS range") } - // TODO(spencer): Raft messages seem to turn up - // occasionally on restart, which initialize the replica, so - // this is not a test failure. Not sure how to work around this - // problem. // Verify the raft group isn't initialized yet. if repl.IsRaftGroupInitialized() { - log.Errorf(context.TODO(), "expected raft group to be uninitialized") + t.Fatal("expected raft group to be uninitialized") } // Send an increment and verify that initializes the Raft group. diff --git a/pkg/storage/client_split_test.go b/pkg/storage/client_split_test.go index ec369d0dfd6f..c39ed270582e 100644 --- a/pkg/storage/client_split_test.go +++ b/pkg/storage/client_split_test.go @@ -34,6 +34,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/internal/client" "github.com/cockroachdb/cockroach/pkg/keys" "github.com/cockroachdb/cockroach/pkg/roachpb" + "github.com/cockroachdb/cockroach/pkg/server" "github.com/cockroachdb/cockroach/pkg/sql/sqlbase" "github.com/cockroachdb/cockroach/pkg/storage" "github.com/cockroachdb/cockroach/pkg/storage/engine" @@ -789,7 +790,7 @@ func TestStoreRangeSplitStatsWithMerges(t *testing.T) { // fillRange writes keys with the given prefix and associated values // until bytes bytes have been written or the given range has split. func fillRange( - store *storage.Store, rangeID roachpb.RangeID, prefix roachpb.Key, bytes int64, t *testing.T, + t *testing.T, store *storage.Store, rangeID roachpb.RangeID, prefix roachpb.Key, bytes int64, ) { src := rand.New(rand.NewSource(0)) for { @@ -802,7 +803,7 @@ func fillRange( return } key := append(append([]byte(nil), prefix...), randutil.RandBytes(src, 100)...) - key = keys.MakeFamilyKey(key, 0) + key = keys.MakeFamilyKey(key, src.Uint32()) val := randutil.RandBytes(src, int(src.Int31n(1<<8))) pArgs := putArgs(key, val) _, pErr := client.SendWrappedWith(context.Background(), store, roachpb.Header{ @@ -861,7 +862,7 @@ func TestStoreZoneUpdateAndRangeSplit(t *testing.T) { } // Look in the range after prefix we're writing to. - fillRange(store, repl.RangeID, tableBoundary, maxBytes, t) + fillRange(t, store, repl.RangeID, tableBoundary, maxBytes) } // Verify that the range is in fact split. @@ -912,6 +913,169 @@ func TestStoreRangeSplitWithMaxBytesUpdate(t *testing.T) { }) } +// TestStoreRangeSplitAfterLargeSnapshot tests a scenario where a range is too +// large to snapshot a follower, but is unable to split because it cannot +// achieve quorum. The leader of the range should adapt to this, eventually +// permitting the large snapshot so that it can recover and then split +// successfully. +func TestStoreRangeSplitAfterLargeSnapshot(t *testing.T) { + defer leaktest.AfterTest(t)() + + // Set maxBytes to something small so we can exceed the maximum snapshot + // size without adding 2x64MB of data. + const maxBytes = 1 << 16 + defer config.TestingSetDefaultZoneConfig(config.ZoneConfig{ + RangeMaxBytes: maxBytes, + })() + + // Create a three node cluster. + sc := storage.TestStoreConfig(nil) + sc.RaftElectionTimeoutTicks = 1000000 + mtc := &multiTestContext{storeConfig: &sc} + defer mtc.Stop() + mtc.Start(t, 3) + store0 := mtc.stores[0] + forAllLiveStores := func(f func(*storage.Store)) { + for _, store := range mtc.stores { + if store != nil { + f(store) + } + } + } + + // The behindNode falls behind far enough to require a snapshot. + const behindNode = 1 + // The crashingNode crashes after its single range becomes too large to + // snapshot. + const crashingNode = 2 + + // Wait for initial splits. + t.Log("waiting for initial splits") + forAllLiveStores(func(store *storage.Store) { + store.SetRaftSnapshotQueueActive(true) + store.SetSplitQueueActive(true) + store.ForceSplitScanAndProcess() + }) + if err := server.WaitForInitialSplits(store0.DB()); err != nil { + t.Fatal(err) + } + + // Then do a write; we'll use this to determine when the dust has settled. + t.Log("performing first write") + keyPrefix := append(keys.UserTableDataMin, []byte("key")...) + repl := store0.LookupReplica(roachpb.RKey(keyPrefix), nil) + rangeID := repl.RangeID + header := roachpb.Header{RangeID: rangeID} + incArgs := incrementArgs(keyPrefix, 1) + if _, pErr := client.SendWrappedWith(context.Background(), store0, header, incArgs); pErr != nil { + t.Fatal(pErr) + } + + // Replicate the range we'll play with to the other nodes. + t.Log("replicating range") + mtc.replicateRange(rangeID, behindNode, crashingNode) + mtc.waitForValues(keyPrefix, []int64{1, 1, 1}) + + // Fill the range without allowing splits so that it will try to split once + // the splitQueue is re-enabled. Fill it past the snapshot size limit + // enforced in Replica.GetSnapshot. We do this before stopping behindNode so + // that the quotaPool does not throttle progress. + t.Log("filling range") + forAllLiveStores(func(store *storage.Store) { + store.SetSplitQueueActive(false) + }) + fillRange(t, store0, rangeID, keyPrefix, 2*maxBytes+1) + + // Turn off replica scanner and snapshot queue. We'll control queues + // directly from now on. + forAllLiveStores(func(store *storage.Store) { + store.SetReplicaScannerActive(false) + store.SetRaftSnapshotQueueActive(false) + }) + + // Stop behindNode so it falls behind and will require a snapshot. + t.Log("letting one follower fall behind") + mtc.stopStore(behindNode) + + // Let behindNode fall behind. + if _, pErr := client.SendWrappedWith(context.Background(), store0, header, incArgs); pErr != nil { + t.Fatal(pErr) + } + mtc.waitForValues(keyPrefix, []int64{2, 1, 2}) + + // Truncate the replica's log. This ensures that the only way behindNode can + // recover is through a snapshot. + index, err := repl.GetLastIndex() + if err != nil { + t.Fatal(err) + } + truncArgs := truncateLogArgs(index+1, rangeID) + truncArgs.Key = repl.Desc().StartKey.AsRawKey() + if _, pErr := client.SendWrappedWith(context.Background(), store0, header, truncArgs); pErr != nil { + t.Fatal(pErr) + } + + // The range can still make forward progress. + if _, pErr := client.SendWrappedWith(context.Background(), store0, header, incArgs); pErr != nil { + t.Fatal(pErr) + } + mtc.waitForValues(keyPrefix, []int64{3, 1, 3}) + + // Determine the range count. + prevRangeCount := store0.ReplicaCount() + + // Stop crashingNode so that we lose quorum and can no longer split. + // Bring behindNode back up. + t.Log("killing the other follower") + mtc.stopStore(crashingNode) + mtc.restartStore(behindNode) + + // Reactivate the split queues and reduce its timeout so it times out due + // to a lack of quorum faster. Force a split, which should fail because it + // cannot achieve quorum. This in turn should set the permitLargeSnapshot + // flag. + t.Log("attempting a split without quorum; this should fail") + forAllLiveStores(func(store *storage.Store) { + store.SetSplitQueueProcessTimeout(1 * time.Second) + store.SetSplitQueueActive(true) + store.ForceSplitScanAndProcess() + }) + testutils.SucceedsSoon(t, func() error { + if !repl.PermittingLargeSnapshots() { + return errors.Errorf("replica not permitting large snapshots") + } + return nil + }) + + // Now that the permitLargeSnapshot flag is set, we should see + // the range recover after behindNode is sent a snapshot. + t.Log("waiting for large snapshot to succeed") + forAllLiveStores(func(store *storage.Store) { + store.SetRaftSnapshotQueueActive(true) + store.ForceRaftSnapshotQueueProcess() + }) + mtc.waitForValues(keyPrefix, []int64{3, 3, 3}) + + // Once the range has a majority of up-to-date nodes, it should be + // able to split. We first increment the manual clock to make sure + // any dangling intents left by previous splits expire. + t.Log("waiting for split to succeed") + mtc.manualClock.Increment(2*base.DefaultHeartbeatInterval.Nanoseconds() + 1) + forAllLiveStores(func(store *storage.Store) { + store.ForceSplitScanAndProcess() + }) + testutils.SucceedsSoon(t, func() error { + if store0.ReplicaCount() < prevRangeCount+1 { + return errors.Errorf("expected new range created by split") + } + return nil + }) + + // Per the contract on multiTestContext.stopStore, we need to restart the + // stopped store before calling multiTestContext.Stop. + mtc.restartStore(crashingNode) +} + // TestStoreRangeSystemSplits verifies that splits are based on the contents of // the SystemConfig span. func TestStoreRangeSystemSplits(t *testing.T) { diff --git a/pkg/storage/client_test.go b/pkg/storage/client_test.go index 9dd520b012d8..89c8e9374cf6 100644 --- a/pkg/storage/client_test.go +++ b/pkg/storage/client_test.go @@ -874,6 +874,11 @@ func (m *multiTestContext) stopStore(i int) { m.mu.Lock() m.stoppers[i] = nil + // Break the transport breaker for this node so that messages sent between a + // store stopping and that store restarting will never remain in-flight in + // the transport and end up reaching the store. This has been the cause of + // flakiness in the past. + m.transport.GetCircuitBreaker(m.idents[i].NodeID).Break() m.senders[i].RemoveStore(m.stores[i]) m.stores[i] = nil m.mu.Unlock() @@ -905,6 +910,7 @@ func (m *multiTestContext) restartStore(i int) { m.t.Fatal(err) } m.senders[i].AddStore(store) + m.transport.GetCircuitBreaker(m.idents[i].NodeID).Reset() m.mu.Unlock() cfg.NodeLiveness.StartHeartbeat(ctx, stopper, func(ctx context.Context) { now := m.clock.Now() diff --git a/pkg/storage/helpers_test.go b/pkg/storage/helpers_test.go index 489943111e13..b80153d16933 100644 --- a/pkg/storage/helpers_test.go +++ b/pkg/storage/helpers_test.go @@ -176,6 +176,12 @@ func (s *Store) SetReplicaScannerActive(active bool) { s.setScannerActive(active) } +// SetSplitQueueProcessTimeout sets the timeout for processing a replica in the +// split queue. +func (s *Store) SetSplitQueueProcessTimeout(dur time.Duration) { + s.splitQueue.SetProcessTimeout(dur) +} + // GetOrCreateReplica passes through to its lowercase sibling. func (s *Store) GetOrCreateReplica( ctx context.Context, @@ -347,6 +353,14 @@ func (r *Replica) GetTimestampCacheLowWater() hlc.Timestamp { return t } +// PermittingLargeSnapshots returns whether the replica is permitting large +// snapshots. +func (r *Replica) PermittingLargeSnapshots() bool { + r.mu.RLock() + defer r.mu.RUnlock() + return r.mu.permitLargeSnapshots +} + // GetRaftLogSize returns the raft log size. func (r *Replica) GetRaftLogSize() int64 { r.mu.Lock() diff --git a/pkg/storage/queue.go b/pkg/storage/queue.go index 5481d438d1c9..65ea013c2097 100644 --- a/pkg/storage/queue.go +++ b/pkg/storage/queue.go @@ -302,6 +302,13 @@ func (bq *baseQueue) Disabled() bool { return bq.mu.disabled } +// SetProcessTimeout sets the timeout for processing a replica. +func (bq *baseQueue) SetProcessTimeout(dur time.Duration) { + bq.processMu.Lock() + bq.processTimeout = dur + bq.processMu.Unlock() +} + // Start launches a goroutine to process entries in the queue. The // provided stopper is used to finish processing. func (bq *baseQueue) Start(clock *hlc.Clock, stopper *stop.Stopper) { diff --git a/pkg/storage/replica.go b/pkg/storage/replica.go index 053d01889b84..95813f7ce3cc 100644 --- a/pkg/storage/replica.go +++ b/pkg/storage/replica.go @@ -342,6 +342,14 @@ type Replica struct { minLeaseProposedTS hlc.Timestamp // Max bytes before split. maxBytes int64 + // Allow snapshots of any size instead of waiting for a split. Set to + // true when a split that is required for snapshots fails. Reset to + // false when the splits eventually succeed. The reasoning here is that + // in certain situations the split is dependent on the snapshot + // succeeding (either directly or transitively), so blocking the + // snapshot on the split can create a deadlock. + // TODO(nvanbenschoten): remove after #16954 is addressed. + permitLargeSnapshots bool // proposals stores the Raft in-flight commands which // originated at this Replica, i.e. all commands for which // propose has been called, but which have not yet @@ -1121,19 +1129,21 @@ func (r *Replica) getEstimatedBehindCountRLocked(raftStatus *raft.Status) int64 return 0 } -// GetMaxBytes atomically gets the range maximum byte limit. +// GetMaxBytes gets the range maximum byte limit. func (r *Replica) GetMaxBytes() int64 { r.mu.RLock() defer r.mu.RUnlock() return r.mu.maxBytes } -// SetMaxBytes atomically sets the maximum byte limit before -// split. This value is cached by the range for efficiency. +// SetMaxBytes sets the maximum byte limit before split. func (r *Replica) SetMaxBytes(maxBytes int64) { r.mu.Lock() defer r.mu.Unlock() r.mu.maxBytes = maxBytes + + // Whenever we change maxBytes, reset permitLargeSnapshots. + r.mu.permitLargeSnapshots = false } // IsFirstRange returns true if this is the first range. diff --git a/pkg/storage/replica_raftstorage.go b/pkg/storage/replica_raftstorage.go index 9ec4f160dffb..fd4ce1e2db28 100644 --- a/pkg/storage/replica_raftstorage.go +++ b/pkg/storage/replica_raftstorage.go @@ -383,7 +383,19 @@ func (r *Replica) GetSnapshot( defer r.mu.RUnlock() rangeID := r.RangeID - if r.exceedsDoubleSplitSizeRLocked() { + // TODO(nvanbenschoten): We should never block snapshots indefinitely. Doing + // so can reduce a range's ability to recover from an under-replicated state + // and can cause unavailability even when a majority of replicas remain + // live. For instance, if a range gets too large to snapshot and requires a + // split in order to do so again, the loss of one up-to-date replica could + // cause it to permanently lose quorum. + // + // For now we still need this check because unbounded snapshots can result + // in OOM errors that crash entire nodes. However, once snapshots are + // streamed from disk to disk, never needing to buffer in-memory on the + // sending or receiving side, we should be able to remove any snapshot size + // limit. See #16954 for more. + if r.exceedsDoubleSplitSizeRLocked() && !r.mu.permitLargeSnapshots { maxBytes := r.mu.maxBytes size := r.mu.state.Stats.Total() err := errors.Errorf( diff --git a/pkg/storage/replica_raftstorage_test.go b/pkg/storage/replica_raftstorage_test.go index ea91bb625528..88488567bd7a 100644 --- a/pkg/storage/replica_raftstorage_test.go +++ b/pkg/storage/replica_raftstorage_test.go @@ -83,6 +83,7 @@ func TestSkipLargeReplicaSnapshot(t *testing.T) { t.Fatal(err) } + // Snapshot should succeed. if snap, err := rep.GetSnapshot(context.Background(), "test"); err != nil { t.Fatal(err) } else { @@ -93,6 +94,7 @@ func TestSkipLargeReplicaSnapshot(t *testing.T) { t.Fatal(err) } + // Snapshot should fail. const expected = "not generating test snapshot because replica is too large" if _, err := rep.GetSnapshot(context.Background(), "test"); !testutils.IsError(err, expected) { rep.mu.Lock() @@ -104,4 +106,16 @@ func TestSkipLargeReplicaSnapshot(t *testing.T) { rep.needsSplitBySize(), rep.exceedsDoubleSplitSizeRLocked(), err, ) } + + // Set the permitLargeSnapshots flag, which bypasses the snapshot size check. + rep.mu.Lock() + rep.mu.permitLargeSnapshots = true + rep.mu.Unlock() + + // Snapshot should succeed. + if snap, err := rep.GetSnapshot(context.Background(), "test"); err != nil { + t.Fatal(err) + } else { + snap.Close() + } } diff --git a/pkg/storage/split_queue.go b/pkg/storage/split_queue.go index 93a938ddf3ba..ce610da4dca7 100644 --- a/pkg/storage/split_queue.go +++ b/pkg/storage/split_queue.go @@ -113,6 +113,17 @@ func (sq *splitQueue) process(ctx context.Context, r *Replica, sysCfg config.Sys roachpb.AdminSplitRequest{}, desc, ); pErr != nil { + // If we failed to split the range and the range is too large to snapshot, + // set the permitLargeSnapshots flag so that we don't continue to block + // large snapshots. This could result in unavailability. The flag is reset + // whenever the split size is adjusted, which includes when the split + // finally succeeds. + // TODO(nvanbenschoten): remove after #16954. + r.mu.Lock() + defer r.mu.Unlock() + if r.exceedsDoubleSplitSizeRLocked() { + r.mu.permitLargeSnapshots = true + } return pErr.GoError() } else if !validSplitKey { // If we couldn't find a split key, set the max-bytes for the range to diff --git a/pkg/storage/store.go b/pkg/storage/store.go index 607ef7518ac1..44b6890f1d97 100644 --- a/pkg/storage/store.go +++ b/pkg/storage/store.go @@ -724,6 +724,8 @@ type StoreTestingKnobs struct { // DisableTimeSeriesMaintenanceQueue disables the time series maintenance // queue. DisableTimeSeriesMaintenanceQueue bool + // DisableRaftSnapshotQueue disables the raft snapshot queue. + DisableRaftSnapshotQueue bool // DisableScanner disables the replica scanner. DisableScanner bool // DisablePeriodicGossips disables periodic gossiping. @@ -905,6 +907,9 @@ func NewStore(cfg StoreConfig, eng engine.Engine, nodeDesc *roachpb.NodeDescript if cfg.TestingKnobs.DisableTimeSeriesMaintenanceQueue { s.setTimeSeriesMaintenanceQueueActive(false) } + if cfg.TestingKnobs.DisableRaftSnapshotQueue { + s.setRaftSnapshotQueueActive(false) + } if cfg.TestingKnobs.DisableScanner { s.setScannerActive(false) }