Skip to content

Commit

Permalink
Merge "[FAB-5793] Block deliver if payload buffer is too full"
Browse files Browse the repository at this point in the history
  • Loading branch information
C0rWin authored and Gerrit Code Review committed Aug 19, 2017
2 parents b871903 + 06d9357 commit ea93c42
Show file tree
Hide file tree
Showing 3 changed files with 117 additions and 23 deletions.
4 changes: 2 additions & 2 deletions gossip/state/payloads_buffer.go
Original file line number Diff line number Diff line change
Expand Up @@ -123,8 +123,8 @@ func (b *PayloadsBufferImpl) Pop() *proto.Payload {

// Size returns current number of payloads stored within buffer
func (b *PayloadsBufferImpl) Size() int {
b.mutex.Lock()
defer b.mutex.Unlock()
b.mutex.RLock()
defer b.mutex.RUnlock()
return len(b.buf)
}

Expand Down
28 changes: 25 additions & 3 deletions gossip/state/state.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import (
"github.com/hyperledger/fabric/protos/common"
proto "github.com/hyperledger/fabric/protos/gossip"
"github.com/op/go-logging"
"github.com/spf13/viper"
)

// GossipStateProvider is the interface to acquire sequences of the ledger blocks
Expand All @@ -48,6 +49,11 @@ const (
defAntiEntropyMaxRetries = 3

defMaxBlockDistance = 100

blocking = true
nonBlocking = false

enqueueRetryInterval = time.Millisecond * 100
)

// GossipAdapter defines gossip/communication required interface for state provider
Expand Down Expand Up @@ -445,7 +451,7 @@ func (s *GossipStateProviderImpl) queueNewMessage(msg *proto.GossipMessage) {

dataMsg := msg.GetDataMsg()
if dataMsg != nil {
if err := s.AddPayload(dataMsg.GetPayload()); err != nil {
if err := s.addPayload(dataMsg.GetPayload(), nonBlocking); err != nil {
logger.Warning("Failed adding payload:", err)
return
}
Expand Down Expand Up @@ -668,8 +674,20 @@ func (s *GossipStateProviderImpl) GetBlock(index uint64) *common.Block {
return nil
}

// AddPayload add new payload into state
// AddPayload add new payload into state.
func (s *GossipStateProviderImpl) AddPayload(payload *proto.Payload) error {
blockingMode := blocking
if viper.GetBool("peer.gossip.nonBlockingCommitMode") {
blockingMode = false
}
return s.addPayload(payload, blockingMode)
}

// addPayload add new payload into state. It may (or may not) block according to the
// given parameter. If it gets a block while in blocking mode - it would wait until
// the block is sent into the payloads buffer.
// Else - it may drop the block, if the payload buffer is too full.
func (s *GossipStateProviderImpl) addPayload(payload *proto.Payload, blockingMode bool) error {
if payload == nil {
return errors.New("Given payload is nil")
}
Expand All @@ -679,10 +697,14 @@ func (s *GossipStateProviderImpl) AddPayload(payload *proto.Payload) error {
return fmt.Errorf("Failed obtaining ledger height: %v", err)
}

if payload.SeqNum-height >= defMaxBlockDistance {
if !blockingMode && payload.SeqNum-height >= defMaxBlockDistance {
return fmt.Errorf("Ledger height is at %d, cannot enqueue block with sequence of %d", height, payload.SeqNum)
}

for blockingMode && s.payloads.Size() > defMaxBlockDistance*2 {
time.Sleep(enqueueRetryInterval)
}

return s.payloads.Push(payload)
}

Expand Down
108 changes: 90 additions & 18 deletions gossip/state/state_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import (
"bytes"
"errors"
"fmt"
"math/rand"
"strconv"
"sync"
"testing"
Expand Down Expand Up @@ -147,7 +148,7 @@ func bootPeers(ids ...int) []string {
type peerNode struct {
port int
g gossip.Gossip
s GossipStateProvider
s *GossipStateProviderImpl
cs *cryptoServiceMock
commit committer.Committer
}
Expand All @@ -164,17 +165,21 @@ type mockCommitter struct {
}

func (mc *mockCommitter) Commit(block *pcomm.Block) error {
mc.Called(block)
mc.Lock()
m := mc.Mock
mc.Unlock()
m.Called(block)
return nil
}

func (mc *mockCommitter) LedgerHeight() (uint64, error) {
mc.Lock()
defer mc.Unlock()
if mc.Called().Get(1) == nil {
return mc.Called().Get(0).(uint64), nil
m := mc.Mock
mc.Unlock()
if m.Called().Get(1) == nil {
return m.Called().Get(0).(uint64), nil
}
return mc.Called().Get(0).(uint64), mc.Called().Get(1).(error)
return m.Called().Get(0).(uint64), m.Called().Get(1).(error)
}

func (mc *mockCommitter) GetBlocks(blockSeqs []uint64) []*pcomm.Block {
Expand Down Expand Up @@ -246,7 +251,7 @@ func newPeerNodeWithGossip(config *gossip.Config, committer committer.Committer,
return &peerNode{
port: config.BindPort,
g: g,
s: sp,
s: sp.(*GossipStateProviderImpl),
commit: committer,
cs: cs,
}
Expand All @@ -265,13 +270,13 @@ func TestNilDirectMsg(t *testing.T) {
g.On("Accept", mock.Anything, true).Return(nil, make(<-chan proto.ReceivedMessage))
p := newPeerNodeWithGossip(newGossipConfig(0), mc, noopPeerIdentityAcceptor, g)
defer p.shutdown()
p.s.(*GossipStateProviderImpl).handleStateRequest(nil)
p.s.(*GossipStateProviderImpl).directMessage(nil)
sMsg, _ := p.s.(*GossipStateProviderImpl).stateRequestMessage(uint64(10), uint64(8)).NoopSign()
p.s.handleStateRequest(nil)
p.s.directMessage(nil)
sMsg, _ := p.s.stateRequestMessage(uint64(10), uint64(8)).NoopSign()
req := &comm.ReceivedMessageImpl{
SignedGossipMessage: sMsg,
}
p.s.(*GossipStateProviderImpl).directMessage(req)
p.s.directMessage(req)
}

func TestNilAddPayload(t *testing.T) {
Expand Down Expand Up @@ -335,32 +340,32 @@ func TestOverPopulation(t *testing.T) {
for i := 1; i <= 4; i++ {
rawblock := pcomm.NewBlock(uint64(i), []byte{})
b, _ := pb.Marshal(rawblock)
assert.NoError(t, p.s.AddPayload(&proto.Payload{
assert.NoError(t, p.s.addPayload(&proto.Payload{
SeqNum: uint64(i),
Data: b,
}))
}, nonBlocking))
}

// Add payloads from 10 to defMaxBlockDistance, while we're missing blocks [5,9]
// Should succeed
for i := 10; i <= defMaxBlockDistance; i++ {
rawblock := pcomm.NewBlock(uint64(i), []byte{})
b, _ := pb.Marshal(rawblock)
assert.NoError(t, p.s.AddPayload(&proto.Payload{
assert.NoError(t, p.s.addPayload(&proto.Payload{
SeqNum: uint64(i),
Data: b,
}))
}, nonBlocking))
}

// Add payloads from defMaxBlockDistance + 2 to defMaxBlockDistance * 10
// Should fail.
for i := defMaxBlockDistance + 1; i <= defMaxBlockDistance*10; i++ {
rawblock := pcomm.NewBlock(uint64(i), []byte{})
b, _ := pb.Marshal(rawblock)
assert.Error(t, p.s.AddPayload(&proto.Payload{
assert.Error(t, p.s.addPayload(&proto.Payload{
SeqNum: uint64(i),
Data: b,
}))
}, nonBlocking))
}

// Ensure only blocks 1-4 were passed to the ledger
Expand All @@ -373,9 +378,76 @@ func TestOverPopulation(t *testing.T) {
assert.Equal(t, 5, i)

// Ensure we don't store too many blocks in memory
sp := p.s.(*GossipStateProviderImpl)
sp := p.s
assert.True(t, sp.payloads.Size() < defMaxBlockDistance)
}

func TestBlockingEnqueue(t *testing.T) {
// Scenario: In parallel, get blocks from gossip and from the orderer.
// The blocks from the orderer we get are X2 times the amount of blocks from gossip.
// The blocks we get from gossip are random indices, to maximize disruption.
mc := &mockCommitter{}
blocksPassedToLedger := make(chan uint64, 10)
mc.On("Commit", mock.Anything).Run(func(arg mock.Arguments) {
blocksPassedToLedger <- arg.Get(0).(*pcomm.Block).Header.Number
})
mc.On("LedgerHeight", mock.Anything).Return(uint64(1), nil)
g := &mocks.GossipMock{}
g.On("Accept", mock.Anything, false).Return(make(<-chan *proto.GossipMessage), nil)
g.On("Accept", mock.Anything, true).Return(nil, make(<-chan proto.ReceivedMessage))
p := newPeerNode(newGossipConfig(0), mc, noopPeerIdentityAcceptor)
defer p.shutdown()

numBlocksReceived := 500
receivedBlockCount := 0
// Get a block from the orderer every 1ms
go func() {
for i := 1; i <= numBlocksReceived; i++ {
rawblock := pcomm.NewBlock(uint64(i), []byte{})
b, _ := pb.Marshal(rawblock)
block := &proto.Payload{
SeqNum: uint64(i),
Data: b,
}
p.s.AddPayload(block)
time.Sleep(time.Millisecond)
}
}()

// Get a block from gossip every 1ms too
go func() {
rand.Seed(time.Now().UnixNano())
for i := 1; i <= numBlocksReceived/2; i++ {
blockSeq := rand.Intn(numBlocksReceived)
rawblock := pcomm.NewBlock(uint64(blockSeq), []byte{})
b, _ := pb.Marshal(rawblock)
block := &proto.Payload{
SeqNum: uint64(blockSeq),
Data: b,
}
p.s.addPayload(block, nonBlocking)
time.Sleep(time.Millisecond)
}
}()

for {
receivedBlock := <-blocksPassedToLedger
receivedBlockCount++
m := mock.Mock{}
m.On("LedgerHeight", mock.Anything).Return(receivedBlock, nil)
m.On("Commit", mock.Anything).Run(func(arg mock.Arguments) {
blocksPassedToLedger <- arg.Get(0).(*pcomm.Block).Header.Number
})
mc.Lock()
mc.Mock = m
mc.Unlock()
assert.Equal(t, receivedBlock, uint64(receivedBlockCount))
if int(receivedBlockCount) == numBlocksReceived {
break
}
time.Sleep(time.Millisecond * 10)
t.Log("got block", receivedBlock)
}
}

func TestFailures(t *testing.T) {
Expand Down

0 comments on commit ea93c42

Please sign in to comment.