Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

cherry-pick-1.1: storage: add permitLargeSnapshots flag to replica #20906

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 1 addition & 5 deletions pkg/storage/client_raft_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3635,13 +3635,9 @@ func TestInitRaftGroupOnRequest(t *testing.T) {
t.Fatal("replica should not be nil for RHS range")
}

// TODO(spencer): Raft messages seem to turn up
// occasionally on restart, which initialize the replica, so
// this is not a test failure. Not sure how to work around this
// problem.
// Verify the raft group isn't initialized yet.
if repl.IsRaftGroupInitialized() {
log.Errorf(context.TODO(), "expected raft group to be uninitialized")
t.Fatal("expected raft group to be uninitialized")
}

// Send an increment and verify that initializes the Raft group.
Expand Down
170 changes: 167 additions & 3 deletions pkg/storage/client_split_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/internal/client"
"github.com/cockroachdb/cockroach/pkg/keys"
"github.com/cockroachdb/cockroach/pkg/roachpb"
"github.com/cockroachdb/cockroach/pkg/server"
"github.com/cockroachdb/cockroach/pkg/sql/sqlbase"
"github.com/cockroachdb/cockroach/pkg/storage"
"github.com/cockroachdb/cockroach/pkg/storage/engine"
Expand Down Expand Up @@ -789,7 +790,7 @@ func TestStoreRangeSplitStatsWithMerges(t *testing.T) {
// fillRange writes keys with the given prefix and associated values
// until bytes bytes have been written or the given range has split.
func fillRange(
store *storage.Store, rangeID roachpb.RangeID, prefix roachpb.Key, bytes int64, t *testing.T,
t *testing.T, store *storage.Store, rangeID roachpb.RangeID, prefix roachpb.Key, bytes int64,
) {
src := rand.New(rand.NewSource(0))
for {
Expand All @@ -802,7 +803,7 @@ func fillRange(
return
}
key := append(append([]byte(nil), prefix...), randutil.RandBytes(src, 100)...)
key = keys.MakeFamilyKey(key, 0)
key = keys.MakeFamilyKey(key, src.Uint32())
val := randutil.RandBytes(src, int(src.Int31n(1<<8)))
pArgs := putArgs(key, val)
_, pErr := client.SendWrappedWith(context.Background(), store, roachpb.Header{
Expand Down Expand Up @@ -861,7 +862,7 @@ func TestStoreZoneUpdateAndRangeSplit(t *testing.T) {
}

// Look in the range after prefix we're writing to.
fillRange(store, repl.RangeID, tableBoundary, maxBytes, t)
fillRange(t, store, repl.RangeID, tableBoundary, maxBytes)
}

// Verify that the range is in fact split.
Expand Down Expand Up @@ -912,6 +913,169 @@ func TestStoreRangeSplitWithMaxBytesUpdate(t *testing.T) {
})
}

// TestStoreRangeSplitAfterLargeSnapshot tests a scenario where a range is too
// large to snapshot a follower, but is unable to split because it cannot
// achieve quorum. The leader of the range should adapt to this, eventually
// permitting the large snapshot so that it can recover and then split
// successfully.
func TestStoreRangeSplitAfterLargeSnapshot(t *testing.T) {
defer leaktest.AfterTest(t)()

// Set maxBytes to something small so we can exceed the maximum snapshot
// size without adding 2x64MB of data.
const maxBytes = 1 << 16
defer config.TestingSetDefaultZoneConfig(config.ZoneConfig{
RangeMaxBytes: maxBytes,
})()

// Create a three node cluster.
sc := storage.TestStoreConfig(nil)
sc.RaftElectionTimeoutTicks = 1000000
mtc := &multiTestContext{storeConfig: &sc}
defer mtc.Stop()
mtc.Start(t, 3)
store0 := mtc.stores[0]
forAllLiveStores := func(f func(*storage.Store)) {
for _, store := range mtc.stores {
if store != nil {
f(store)
}
}
}

// The behindNode falls behind far enough to require a snapshot.
const behindNode = 1
// The crashingNode crashes after its single range becomes too large to
// snapshot.
const crashingNode = 2

// Wait for initial splits.
t.Log("waiting for initial splits")
forAllLiveStores(func(store *storage.Store) {
store.SetRaftSnapshotQueueActive(true)
store.SetSplitQueueActive(true)
store.ForceSplitScanAndProcess()
})
if err := server.WaitForInitialSplits(store0.DB()); err != nil {
t.Fatal(err)
}

// Then do a write; we'll use this to determine when the dust has settled.
t.Log("performing first write")
keyPrefix := append(keys.UserTableDataMin, []byte("key")...)
repl := store0.LookupReplica(roachpb.RKey(keyPrefix), nil)
rangeID := repl.RangeID
header := roachpb.Header{RangeID: rangeID}
incArgs := incrementArgs(keyPrefix, 1)
if _, pErr := client.SendWrappedWith(context.Background(), store0, header, incArgs); pErr != nil {
t.Fatal(pErr)
}

// Replicate the range we'll play with to the other nodes.
t.Log("replicating range")
mtc.replicateRange(rangeID, behindNode, crashingNode)
mtc.waitForValues(keyPrefix, []int64{1, 1, 1})

// Fill the range without allowing splits so that it will try to split once
// the splitQueue is re-enabled. Fill it past the snapshot size limit
// enforced in Replica.GetSnapshot. We do this before stopping behindNode so
// that the quotaPool does not throttle progress.
t.Log("filling range")
forAllLiveStores(func(store *storage.Store) {
store.SetSplitQueueActive(false)
})
fillRange(t, store0, rangeID, keyPrefix, 2*maxBytes+1)

// Turn off replica scanner and snapshot queue. We'll control queues
// directly from now on.
forAllLiveStores(func(store *storage.Store) {
store.SetReplicaScannerActive(false)
store.SetRaftSnapshotQueueActive(false)
})

// Stop behindNode so it falls behind and will require a snapshot.
t.Log("letting one follower fall behind")
mtc.stopStore(behindNode)

// Let behindNode fall behind.
if _, pErr := client.SendWrappedWith(context.Background(), store0, header, incArgs); pErr != nil {
t.Fatal(pErr)
}
mtc.waitForValues(keyPrefix, []int64{2, 1, 2})

// Truncate the replica's log. This ensures that the only way behindNode can
// recover is through a snapshot.
index, err := repl.GetLastIndex()
if err != nil {
t.Fatal(err)
}
truncArgs := truncateLogArgs(index+1, rangeID)
truncArgs.Key = repl.Desc().StartKey.AsRawKey()
if _, pErr := client.SendWrappedWith(context.Background(), store0, header, truncArgs); pErr != nil {
t.Fatal(pErr)
}

// The range can still make forward progress.
if _, pErr := client.SendWrappedWith(context.Background(), store0, header, incArgs); pErr != nil {
t.Fatal(pErr)
}
mtc.waitForValues(keyPrefix, []int64{3, 1, 3})

// Determine the range count.
prevRangeCount := store0.ReplicaCount()

// Stop crashingNode so that we lose quorum and can no longer split.
// Bring behindNode back up.
t.Log("killing the other follower")
mtc.stopStore(crashingNode)
mtc.restartStore(behindNode)

// Reactivate the split queues and reduce its timeout so it times out due
// to a lack of quorum faster. Force a split, which should fail because it
// cannot achieve quorum. This in turn should set the permitLargeSnapshot
// flag.
t.Log("attempting a split without quorum; this should fail")
forAllLiveStores(func(store *storage.Store) {
store.SetSplitQueueProcessTimeout(1 * time.Second)
store.SetSplitQueueActive(true)
store.ForceSplitScanAndProcess()
})
testutils.SucceedsSoon(t, func() error {
if !repl.PermittingLargeSnapshots() {
return errors.Errorf("replica not permitting large snapshots")
}
return nil
})

// Now that the permitLargeSnapshot flag is set, we should see
// the range recover after behindNode is sent a snapshot.
t.Log("waiting for large snapshot to succeed")
forAllLiveStores(func(store *storage.Store) {
store.SetRaftSnapshotQueueActive(true)
store.ForceRaftSnapshotQueueProcess()
})
mtc.waitForValues(keyPrefix, []int64{3, 3, 3})

// Once the range has a majority of up-to-date nodes, it should be
// able to split. We first increment the manual clock to make sure
// any dangling intents left by previous splits expire.
t.Log("waiting for split to succeed")
mtc.manualClock.Increment(2*base.DefaultHeartbeatInterval.Nanoseconds() + 1)
forAllLiveStores(func(store *storage.Store) {
store.ForceSplitScanAndProcess()
})
testutils.SucceedsSoon(t, func() error {
if store0.ReplicaCount() < prevRangeCount+1 {
return errors.Errorf("expected new range created by split")
}
return nil
})

// Per the contract on multiTestContext.stopStore, we need to restart the
// stopped store before calling multiTestContext.Stop.
mtc.restartStore(crashingNode)
}

// TestStoreRangeSystemSplits verifies that splits are based on the contents of
// the SystemConfig span.
func TestStoreRangeSystemSplits(t *testing.T) {
Expand Down
6 changes: 6 additions & 0 deletions pkg/storage/client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -874,6 +874,11 @@ func (m *multiTestContext) stopStore(i int) {

m.mu.Lock()
m.stoppers[i] = nil
// Break the transport breaker for this node so that messages sent between a
// store stopping and that store restarting will never remain in-flight in
// the transport and end up reaching the store. This has been the cause of
// flakiness in the past.
m.transport.GetCircuitBreaker(m.idents[i].NodeID).Break()
m.senders[i].RemoveStore(m.stores[i])
m.stores[i] = nil
m.mu.Unlock()
Expand Down Expand Up @@ -905,6 +910,7 @@ func (m *multiTestContext) restartStore(i int) {
m.t.Fatal(err)
}
m.senders[i].AddStore(store)
m.transport.GetCircuitBreaker(m.idents[i].NodeID).Reset()
m.mu.Unlock()
cfg.NodeLiveness.StartHeartbeat(ctx, stopper, func(ctx context.Context) {
now := m.clock.Now()
Expand Down
14 changes: 14 additions & 0 deletions pkg/storage/helpers_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -176,6 +176,12 @@ func (s *Store) SetReplicaScannerActive(active bool) {
s.setScannerActive(active)
}

// SetSplitQueueProcessTimeout sets the timeout for processing a replica in the
// split queue.
func (s *Store) SetSplitQueueProcessTimeout(dur time.Duration) {
s.splitQueue.SetProcessTimeout(dur)
}

// GetOrCreateReplica passes through to its lowercase sibling.
func (s *Store) GetOrCreateReplica(
ctx context.Context,
Expand Down Expand Up @@ -347,6 +353,14 @@ func (r *Replica) GetTimestampCacheLowWater() hlc.Timestamp {
return t
}

// PermittingLargeSnapshots returns whether the replica is permitting large
// snapshots.
func (r *Replica) PermittingLargeSnapshots() bool {
r.mu.RLock()
defer r.mu.RUnlock()
return r.mu.permitLargeSnapshots
}

// GetRaftLogSize returns the raft log size.
func (r *Replica) GetRaftLogSize() int64 {
r.mu.Lock()
Expand Down
16 changes: 10 additions & 6 deletions pkg/storage/id_alloc.go
Original file line number Diff line number Diff line change
Expand Up @@ -81,15 +81,19 @@ func newIDAllocator(
}

// Allocate allocates a new ID from the global KV DB.
func (ia *idAllocator) Allocate() (uint32, error) {
func (ia *idAllocator) Allocate(ctx context.Context) (uint32, error) {
ia.once.Do(ia.start)

id := <-ia.ids
// when the channel is closed, the zero value is returned.
if id == 0 {
return id, errors.Errorf("could not allocate ID; system is draining")
select {
case id := <-ia.ids:
// when the channel is closed, the zero value is returned.
if id == 0 {
return id, errors.Errorf("could not allocate ID; system is draining")
}
return id, nil
case <-ctx.Done():
return 0, ctx.Err()
}
return id, nil
}

func (ia *idAllocator) start() {
Expand Down
24 changes: 17 additions & 7 deletions pkg/storage/id_alloc_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ func TestIDAllocator(t *testing.T) {
for i := 0; i < maxI; i++ {
go func() {
for j := 0; j < maxJ; j++ {
id, err := idAlloc.Allocate()
id, err := idAlloc.Allocate(context.Background())
errChan <- err
allocd <- id
}
Expand Down Expand Up @@ -111,7 +111,7 @@ func TestIDAllocatorNegativeValue(t *testing.T) {
if err != nil {
t.Errorf("failed to create IDAllocator: %v", err)
}
value, err := idAlloc.Allocate()
value, err := idAlloc.Allocate(context.Background())
if err != nil {
t.Fatal(err)
}
Expand Down Expand Up @@ -158,7 +158,7 @@ func TestAllocateErrorAndRecovery(t *testing.T) {
t.Errorf("failed to create IDAllocator: %v", err)
}

firstID, err := idAlloc.Allocate()
firstID, err := idAlloc.Allocate(context.Background())
if err != nil {
t.Fatal(err)
}
Expand All @@ -172,7 +172,7 @@ func TestAllocateErrorAndRecovery(t *testing.T) {
// Should be able to get the allocated IDs, and there will be one
// background allocateBlock to get ID continuously.
for i := 0; i < 8; i++ {
id, err := idAlloc.Allocate()
id, err := idAlloc.Allocate(context.Background())
if err != nil {
t.Fatal(err)
}
Expand All @@ -194,7 +194,7 @@ func TestAllocateErrorAndRecovery(t *testing.T) {
errChan <- nil
}

id, err := idAlloc.Allocate()
id, err := idAlloc.Allocate(context.Background())
errChan <- err
allocd <- id
}()
Expand All @@ -207,6 +207,16 @@ func TestAllocateErrorAndRecovery(t *testing.T) {
}
}

// Attempt a few allocations with a context timeout while allocations are
// blocked. All attempts should hit a context deadline exceeded error.
ctx, _ := context.WithTimeout(context.Background(), 10*time.Millisecond)
for i := 0; i < routines; i++ {
id, err := idAlloc.Allocate(ctx)
if id != 0 || err != context.DeadlineExceeded {
t.Errorf("expected context cancellation, found id=%d, err=%v", id, err)
}
}

// Make the IDAllocator valid again.
idAlloc.idKey.Store(keys.RangeIDGenerator)
// Check if the blocked allocations return expected ID.
Expand All @@ -226,7 +236,7 @@ func TestAllocateErrorAndRecovery(t *testing.T) {

// Check if the following allocations return expected ID.
for i := 0; i < routines; i++ {
id, err := idAlloc.Allocate()
id, err := idAlloc.Allocate(context.Background())
if err != nil {
t.Fatal(err)
}
Expand Down Expand Up @@ -254,7 +264,7 @@ func TestAllocateWithStopper(t *testing.T) {
return idAlloc
}()

if _, err := idAlloc.Allocate(); !testutils.IsError(err, "system is draining") {
if _, err := idAlloc.Allocate(context.Background()); !testutils.IsError(err, "system is draining") {
t.Errorf("unexpected error: %v", err)
}
}
7 changes: 7 additions & 0 deletions pkg/storage/queue.go
Original file line number Diff line number Diff line change
Expand Up @@ -302,6 +302,13 @@ func (bq *baseQueue) Disabled() bool {
return bq.mu.disabled
}

// SetProcessTimeout sets the timeout for processing a replica.
func (bq *baseQueue) SetProcessTimeout(dur time.Duration) {
bq.processMu.Lock()
bq.processTimeout = dur
bq.processMu.Unlock()
}

// Start launches a goroutine to process entries in the queue. The
// provided stopper is used to finish processing.
func (bq *baseQueue) Start(clock *hlc.Clock, stopper *stop.Stopper) {
Expand Down
Loading