Skip to content

Commit

Permalink
Merge #40477
Browse files Browse the repository at this point in the history
40477: storage: Deflake TestReplicaReproposalWithNewLeaseIndexError r=irfansharif a=irfansharif

Fixes #39739. When inserting a new command into the proposal buffer, we
first reserve an index into the buffer's array and an offset from the
buffer's base lease index. If we subsequently fail to insert the
proposal, we should undo the index and offset reservation.

When we error out in `(*propBuf).Insert`, if we don't undo the index
reservation we expect a proposal at said index during consumption, but
don't find one.

Release note: None

Co-authored-by: irfan sharif <irfanmahmoudsharif@gmail.com>
  • Loading branch information
craig[bot] and irfansharif committed Sep 4, 2019
2 parents d8c99dc + a84f0f4 commit 18bdfe1
Show file tree
Hide file tree
Showing 3 changed files with 78 additions and 1 deletion.
2 changes: 1 addition & 1 deletion pkg/storage/replica_proposal.go
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,7 @@ type ProposalData struct {
// containing the command ID.
encodedCommand []byte

// quotaAlloc is the allocation retreived from the proposalQuota.
// quotaAlloc is the allocation retrieved from the proposalQuota.
// Once a proposal has been passed to raft modifying this field requires
// holding the raftMu.
quotaAlloc *quotapool.IntAlloc
Expand Down
5 changes: 5 additions & 0 deletions pkg/storage/replica_proposal_buf.go
Original file line number Diff line number Diff line change
Expand Up @@ -399,6 +399,11 @@ func (b *propBuf) FlushLockedWithRaftGroup(raftGroup *raft.RawNode) error {
// stop trying to propose commands to raftGroup.
var firstErr error
for i, p := range buf {
if p == nil {
// If we run into an error during proposal insertion, we may have reserved
// an array index without actually inserting a proposal.
continue
}
buf[i] = nil // clear buffer

// Raft processing bookkeeping.
Expand Down
72 changes: 72 additions & 0 deletions pkg/storage/replica_proposal_buf_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/util/leaktest"
"github.com/cockroachdb/cockroach/pkg/util/syncutil"
"github.com/pkg/errors"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"go.etcd.io/etcd/raft"
"golang.org/x/sync/errgroup"
Expand Down Expand Up @@ -258,3 +259,74 @@ func TestProposalBufferRegistersAllOnProposalError(t *testing.T) {
require.Equal(t, propErr, err)
require.Equal(t, num, p.registered)
}

// TestProposalBufferRegistrationWithInsertionErrors tests that if during
// proposal insertion we reserve array indexes but are unable to actually insert
// them due to errors, we simply ignore said indexes when flushing proposals.
func TestProposalBufferRegistrationWithInsertionErrors(t *testing.T) {
defer leaktest.AfterTest(t)()

var p testProposer
var b propBuf
b.Init(&p)

num := propBufArrayMinSize / 2
for i := 0; i < num; i++ {
pd, data := newPropData(i%2 == 0)
_, err := b.Insert(pd, data)
require.Nil(t, err)
}

var insertErr = errors.New("failed insertion")
b.testing.leaseIndexFilter = func(*ProposalData) (indexOverride uint64, err error) {
return 0, insertErr
}

for i := 0; i < num; i++ {
pd, data := newPropData(i%2 == 0)
_, err := b.Insert(pd, data)
require.Equal(t, insertErr, err)
}
require.Equal(t, 2*num, b.Len())

require.Nil(t, b.flushLocked())

require.Equal(t, 0, b.Len())
require.Equal(t, num, p.registered)
}

// TestPropBufCnt tests the basic behavior of the counter maintained by the
// proposal buffer.
func TestPropBufCnt(t *testing.T) {
defer leaktest.AfterTest(t)()

var count propBufCnt
const numReqs = 10

reqLeaseInc := makePropBufCntReq(true)
reqLeaseNoInc := makePropBufCntReq(false)

for i := 0; i < numReqs; i++ {
count.update(reqLeaseInc)
}

res := count.read()
assert.Equal(t, numReqs, res.arrayLen())
assert.Equal(t, numReqs-1, res.arrayIndex())
assert.Equal(t, uint64(numReqs), res.leaseIndexOffset())

for i := 0; i < numReqs; i++ {
count.update(reqLeaseNoInc)
}

res = count.read()
assert.Equal(t, 2*numReqs, res.arrayLen())
assert.Equal(t, (2*numReqs)-1, res.arrayIndex())
assert.Equal(t, uint64(numReqs), res.leaseIndexOffset())

count.clear()
res = count.read()
assert.Equal(t, 0, res.arrayLen())
assert.Equal(t, -1, res.arrayIndex())
assert.Equal(t, uint64(0), res.leaseIndexOffset())
}

0 comments on commit 18bdfe1

Please sign in to comment.