Skip to content

Commit

Permalink
storage: release quota on failed Raft proposals
Browse files Browse the repository at this point in the history
Fixes cockroachdb#34180.
Fixes cockroachdb#35493.
Fixes cockroachdb#36983.
Fixes cockroachdb#37108.
Fixes cockroachdb#37371.
Fixes cockroachdb#37384.
Fixes cockroachdb#37551.
Fixes cockroachdb#37879.
Fixes cockroachdb#38095.
Fixes cockroachdb#38131.
Fixes cockroachdb#38136.
Fixes cockroachdb#38549.
Fixes cockroachdb#38552.
Fixes cockroachdb#38555.
Fixes cockroachdb#38560.
Fixes cockroachdb#38562.
Fixes cockroachdb#38563.
Fixes cockroachdb#38569.
Fixes cockroachdb#38578.
Fixes cockroachdb#38600.

_A lot of the early issues fixed by this had previous failures, but nothing
very recent or actionable. I think it's worth closing them now that they
should be fixed in the short term._

This fixes a bug introduced in 1ff3556 where Raft proposal quota is
no longer released when Replica.propose fails. This used to happen
[here](cockroachdb@1ff3556#diff-4315c7ebf8b8bf7bda469e1e7be82690L316),
but that code was accidentally lost in the rewrite.

I tracked this down by running a series of `import/tpch/nodes=4` and
`scrub/all-checks/tpcc/w=100` roachtests. About half the time, the
import would stall after a few hours and the roachtest health reports
would start logging lines like: `n1/s1  2.00  metrics  requests.slow.latch`.
I tracked the stalled latch acquisition to a stalled proposal quota acquisition
by a conflicting command. The range debug page showed the following:

<image>

We see that the leaseholder of the Range has no pending commands
but also no available proposal quota. This indicates a proposal
quota leak, which led to me finding the lost release in this
error case.

The (now confirmed) theory for what went wrong in these roachtests is that
they are performing imports, which generate a large number of AddSSTRequests.
These requests are typically larger than the available proposal quota
for a range, meaning that they request all of its available quota. The
effect of this is that if even a single byte of quota is leaked, the entire
range will seize up and stall when an AddSSTRequests is issued.
Instrumentation revealed that a ChangeReplicas request with a quota size
equal to the leaked amount was failing due to the error:
```
received invalid ChangeReplicasTrigger REMOVE_REPLICA((n3,s3):3): updated=[(n1,s1):1 (n4,s4):2 (n2,s2):4] next=5 to remove self (leaseholder)
```
Because of the missing error handling, this quota was not being released back
into the pool, causing future requests to get stuck indefinitely waiting for
leaked quota, stalling the entire import.

Release note: None
  • Loading branch information
nvanbenschoten committed Jul 3, 2019
1 parent ef02100 commit ba3813c
Show file tree
Hide file tree
Showing 4 changed files with 82 additions and 18 deletions.
10 changes: 9 additions & 1 deletion pkg/storage/replica.go
Original file line number Diff line number Diff line change
Expand Up @@ -571,8 +571,16 @@ func (r *Replica) String() string {
return fmt.Sprintf("[n%d,s%d,r%s]", r.store.Ident.NodeID, r.store.Ident.StoreID, &r.rangeStr)
}

// cleanupFailedProposalLocked cleans up after a proposal that has failed. It
// cleanupFailedProposal cleans up after a proposal that has failed. It
// clears any references to the proposal and releases associated quota.
func (r *Replica) cleanupFailedProposal(p *ProposalData) {
r.mu.Lock()
defer r.mu.Unlock()
r.cleanupFailedProposalLocked(p)
}

// cleanupFailedProposalLocked is like cleanupFailedProposal, but requires
// the Replica mutex to be exclusively held.
func (r *Replica) cleanupFailedProposalLocked(p *ProposalData) {
// Clear the proposal from the proposals map. May be a no-op if the
// proposal has not yet been inserted into the map.
Expand Down
6 changes: 4 additions & 2 deletions pkg/storage/replica_proposal_buf.go
Original file line number Diff line number Diff line change
Expand Up @@ -136,7 +136,7 @@ type propBuf struct {
testing struct {
// leaseIndexFilter can be used by tests to override the max lease index
// assigned to a proposal by returning a non-zero lease index.
leaseIndexFilter func(*ProposalData) (indexOverride uint64)
leaseIndexFilter func(*ProposalData) (indexOverride uint64, err error)
// submitProposalFilter can be used by tests to observe and optionally
// drop Raft proposals before they are handed to etcd/raft to begin the
// process of replication. Dropped proposals are still eligible to be
Expand Down Expand Up @@ -208,7 +208,9 @@ func (b *propBuf) Insert(p *ProposalData, data []byte) (uint64, error) {
// Assign the command's maximum lease index.
p.command.MaxLeaseIndex = b.liBase + res.leaseIndexOffset()
if filter := b.testing.leaseIndexFilter; filter != nil {
if override := filter(p); override != 0 {
if override, err := filter(p); err != nil {
return 0, err
} else if override != 0 {
p.command.MaxLeaseIndex = override
}
}
Expand Down
23 changes: 17 additions & 6 deletions pkg/storage/replica_raft.go
Original file line number Diff line number Diff line change
Expand Up @@ -190,6 +190,14 @@ func (r *Replica) evalAndPropose(
if err := r.maybeAcquireProposalQuota(ctx, proposal.quotaSize); err != nil {
return nil, nil, 0, roachpb.NewError(err)
}
// Make sure we clean up the proposal if we fail to insert it into the
// proposal buffer successfully. This ensures that we always release any
// quota that we acquire.
defer func() {
if pErr != nil {
r.cleanupFailedProposal(proposal)
}
}()

if filter := r.store.TestingKnobs().TestingProposalFilter; filter != nil {
filterArgs := storagebase.ProposalFilterArgs{
Expand Down Expand Up @@ -230,11 +238,13 @@ func (r *Replica) evalAndPropose(
return proposalCh, abandon, maxLeaseIndex, nil
}

// propose starts tracking a command and proposes it to raft. If
// this method succeeds, the caller is responsible for eventually
// removing the proposal from the pending map (on success, in
// processRaftCommand, or on failure via cleanupFailedProposalLocked).
func (r *Replica) propose(ctx context.Context, p *ProposalData) (_ int64, pErr *roachpb.Error) {
// propose encodes a command, starts tracking it, and proposes it to raft. The
// method is also responsible for assigning the command its maximum lease index.
//
// The method hands ownership of the command over to the Raft machinery. After
// the method returns, all access to the command must be performed while holding
// Replica.mu and Replica.raftMu.
func (r *Replica) propose(ctx context.Context, p *ProposalData) (int64, *roachpb.Error) {
// Make sure the maximum lease index is unset. This field will be set in
// propBuf.Insert and its encoded bytes will be appended to the encoding
// buffer as a RaftCommandFooter.
Expand All @@ -258,7 +268,8 @@ func (r *Replica) propose(ctx context.Context, p *ProposalData) (_ int64, pErr *
// leases can stay in such a state for a very long time when using epoch-
// based range leases). This shouldn't happen often, but has been seen
// before (#12591).
if crt.ChangeType == roachpb.REMOVE_REPLICA && crt.Replica.ReplicaID == r.mu.replicaID {
replID := p.command.ProposerReplica.ReplicaID
if crt.ChangeType == roachpb.REMOVE_REPLICA && crt.Replica.ReplicaID == replID {
msg := fmt.Sprintf("received invalid ChangeReplicasTrigger %s to remove self (leaseholder)", crt)
log.Error(p.ctx, msg)
return 0, roachpb.NewErrorf("%s: %s", r, msg)
Expand Down
61 changes: 52 additions & 9 deletions pkg/storage/replica_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -6532,6 +6532,49 @@ func TestReplicaDestroy(t *testing.T) {
}
}

// TestQuotaPoolReleasedOnFailedProposal tests that the quota acquired by
// proposals is released back into the quota pool if the proposal fails before
// being submitted to Raft.
func TestQuotaPoolReleasedOnFailedProposal(t *testing.T) {
defer leaktest.AfterTest(t)()
tc := testContext{}
stopper := stop.NewStopper()
defer stopper.Stop(context.TODO())
tc.Start(t, stopper)

// Flush a write all the way through the Raft proposal pipeline to ensure
// that the replica becomes the Raft leader and sets up its quota pool.
iArgs := incrementArgs([]byte("a"), 1)
if _, pErr := tc.SendWrapped(&iArgs); pErr != nil {
t.Fatal(pErr)
}

type magicKey struct{}
var minQuotaSize int64
propErr := errors.New("proposal error")

tc.repl.mu.Lock()
tc.repl.mu.proposalBuf.testing.leaseIndexFilter = func(p *ProposalData) (indexOverride uint64, _ error) {
if v := p.ctx.Value(magicKey{}); v != nil {
minQuotaSize = tc.repl.mu.proposalQuota.approximateQuota() + p.quotaSize
return 0, propErr
}
return 0, nil
}
tc.repl.mu.Unlock()

var ba roachpb.BatchRequest
pArg := putArgs(roachpb.Key("a"), make([]byte, 1<<10))
ba.Add(&pArg)
ctx := context.WithValue(context.Background(), magicKey{}, "foo")
if _, pErr := tc.Sender().Send(ctx, ba); !testutils.IsPError(pErr, propErr.Error()) {
t.Fatalf("expected error %v, found %v", propErr, pErr)
}
if curQuota := tc.repl.QuotaAvailable(); curQuota < minQuotaSize {
t.Fatalf("proposal quota not released: found=%d, want=%d", curQuota, minQuotaSize)
}
}

// TestQuotaPoolAccessOnDestroyedReplica tests the occurrence of #17303 where
// following a leader replica getting destroyed, the scheduling of
// handleRaftReady twice on the replica would cause a panic when
Expand Down Expand Up @@ -7280,13 +7323,13 @@ func TestReplicaRetryRaftProposal(t *testing.T) {
var wrongLeaseIndex uint64 // populated below

tc.repl.mu.Lock()
tc.repl.mu.proposalBuf.testing.leaseIndexFilter = func(p *ProposalData) (indexOverride uint64) {
tc.repl.mu.proposalBuf.testing.leaseIndexFilter = func(p *ProposalData) (indexOverride uint64, _ error) {
if v := p.ctx.Value(magicKey{}); v != nil {
if curAttempt := atomic.AddInt32(&c, 1); curAttempt == 1 {
return wrongLeaseIndex
return wrongLeaseIndex, nil
}
}
return 0
return 0, nil
}
tc.repl.mu.Unlock()

Expand Down Expand Up @@ -7731,12 +7774,12 @@ func TestReplicaRefreshMultiple(t *testing.T) {
t.Fatalf("test requires LeaseAppliedIndex >= 2 at this point, have %d", ai)
}
assigned := false
repl.mu.proposalBuf.testing.leaseIndexFilter = func(p *ProposalData) (indexOverride uint64) {
repl.mu.proposalBuf.testing.leaseIndexFilter = func(p *ProposalData) (indexOverride uint64, _ error) {
if p == proposal && !assigned {
assigned = true
return ai - 1
return ai - 1, nil
}
return 0
return 0, nil
}
repl.mu.Unlock()

Expand Down Expand Up @@ -7877,16 +7920,16 @@ func TestFailureToProcessCommandClearsLocalResult(t *testing.T) {

r := tc.repl
r.mu.Lock()
r.mu.proposalBuf.testing.leaseIndexFilter = func(p *ProposalData) (indexOverride uint64) {
r.mu.proposalBuf.testing.leaseIndexFilter = func(p *ProposalData) (indexOverride uint64, _ error) {
// We're going to recognize the first time the commnand for the
// EndTransaction is proposed and we're going to hackily decrease its
// MaxLeaseIndex, so that the processing gets rejected further on.
ut := p.Local.UpdatedTxns
if atomic.LoadInt64(&proposalRecognized) == 0 && ut != nil && len(*ut) == 1 && (*ut)[0].ID == txn.ID {
atomic.StoreInt64(&proposalRecognized, 1)
return p.command.MaxLeaseIndex - 1
return p.command.MaxLeaseIndex - 1, nil
}
return 0
return 0, nil
}
r.mu.Unlock()

Expand Down

0 comments on commit ba3813c

Please sign in to comment.