Skip to content

Commit

Permalink
storage: Deflake TestReplicaReproposalWithNewLeaseIndexError
Browse files Browse the repository at this point in the history
Fixes cockroachdb#39739. When inserting a new command into the proposal buffer, we
first reserve an index into the buffer's array. If we subsequently fail
to insert the proposal, we should account for this during proposal
registration.

Additionally add `TestPropBufCnt` to test basic behaviour of the counter
maintained by the proposal buffer.

Release note: None
  • Loading branch information
irfansharif committed Sep 4, 2019
1 parent 79d97a7 commit 267cb6f
Show file tree
Hide file tree
Showing 11 changed files with 405,037 additions and 1 deletion.
4,327 changes: 4,327 additions & 0 deletions pkg/sql/exec/cast.eg.go

Large diffs are not rendered by default.

400 changes: 400 additions & 0 deletions pkg/sql/exec/mergejoinbase.eg.go

Large diffs are not rendered by default.

84,848 changes: 84,848 additions & 0 deletions pkg/sql/exec/mergejoiner_fullouter.eg.go

Large diffs are not rendered by default.

53,491 changes: 53,491 additions & 0 deletions pkg/sql/exec/mergejoiner_inner.eg.go

Large diffs are not rendered by default.

69,485 changes: 69,485 additions & 0 deletions pkg/sql/exec/mergejoiner_leftanti.eg.go

Large diffs are not rendered by default.

69,445 changes: 69,445 additions & 0 deletions pkg/sql/exec/mergejoiner_leftouter.eg.go

Large diffs are not rendered by default.

54,059 changes: 54,059 additions & 0 deletions pkg/sql/exec/mergejoiner_leftsemi.eg.go

Large diffs are not rendered by default.

68,905 changes: 68,905 additions & 0 deletions pkg/sql/exec/mergejoiner_rightouter.eg.go

Large diffs are not rendered by default.

2 changes: 1 addition & 1 deletion pkg/storage/replica_proposal.go
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,7 @@ type ProposalData struct {
// containing the command ID.
encodedCommand []byte

// quotaAlloc is the allocation retreived from the proposalQuota.
// quotaAlloc is the allocation retrieved from the proposalQuota.
// Once a proposal has been passed to raft modifying this field requires
// holding the raftMu.
quotaAlloc *quotapool.IntAlloc
Expand Down
5 changes: 5 additions & 0 deletions pkg/storage/replica_proposal_buf.go
Original file line number Diff line number Diff line change
Expand Up @@ -400,6 +400,11 @@ func (b *propBuf) FlushLockedWithRaftGroup(raftGroup *raft.RawNode) error {
// stop trying to propose commands to raftGroup.
var firstErr error
for i, p := range buf {
if p == nil {
// If we run into an error during proposal insertion, we may have reserved
// an array index without actually inserting a proposal.
continue
}
buf[i] = nil // clear buffer

// Raft processing bookkeeping.
Expand Down
71 changes: 71 additions & 0 deletions pkg/storage/replica_proposal_buf_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/util/leaktest"
"github.com/cockroachdb/cockroach/pkg/util/syncutil"
"github.com/pkg/errors"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"go.etcd.io/etcd/raft"
"golang.org/x/sync/errgroup"
Expand Down Expand Up @@ -258,3 +259,73 @@ func TestProposalBufferRegistersAllOnProposalError(t *testing.T) {
require.Equal(t, propErr, err)
require.Equal(t, num, p.registered)
}

// TestProposalBufferRegistrationWithInsertionErrors tests that if during
// proposal insertion we reserve array indexes but are unable to actually insert
// them due to errors, we simply ignore said indexes when flushing proposals.
func TestProposalBufferRegistrationWithInsertionErrors(t *testing.T) {
defer leaktest.AfterTest(t)()

var p testProposer
var b propBuf
b.Init(&p)

num := propBufArrayMinSize
for i := 0; i < num / 2; i++ {
pd, data := newPropData(i % 2 == 0)
_, err := b.Insert(pd, data)
require.Nil(t, err)
}

var insertErr = errors.New("failed insertion")
b.testing.leaseIndexFilter = func(*ProposalData) (indexOverride uint64, err error) {
return 0, insertErr
}

for i := 0; i < num / 2; i++ {
pd, data := newPropData(i % 2 == 0)
_, err := b.Insert(pd, data)
require.Equal(t, insertErr, err)
}
require.Equal(t, propBufArrayMinSize, b.Len())

require.Nil(t, b.flushLocked())

require.Equal(t, 0, b.Len())
require.Equal(t, propBufArrayMinSize / 2, p.registered)
}

// TestPropBufCnt tests the basic behaviour of the counter maintained by the
// proposal buffer.
func TestPropBufCnt(t *testing.T) {
defer leaktest.AfterTest(t)()

var count propBufCnt
const numReqs = 10

reqLeaseInc := makePropBufCntReq(true)
reqLeaseNoInc := makePropBufCntReq(false)

for i := 0; i < numReqs; i++ {
count.update(reqLeaseInc)
}

res := count.read()
assert.Equal(t, numReqs, res.arrayLen())
assert.Equal(t, numReqs - 1, res.arrayIndex())
assert.Equal(t, uint64(numReqs), res.leaseIndexOffset())

for i := 0; i < numReqs; i++ {
count.update(reqLeaseNoInc)
}

res = count.read()
assert.Equal(t, 2 * numReqs, res.arrayLen() )
assert.Equal(t, (2 * numReqs) - 1, res.arrayIndex())
assert.Equal(t, uint64(numReqs), res.leaseIndexOffset())

res = count.clear()
assert.Equal(t, 0, res.arrayLen())
assert.Equal(t, -1, res.arrayIndex())
assert.Equal(t, uint64(numReqs), res.leaseIndexOffset())
}

0 comments on commit 267cb6f

Please sign in to comment.