Skip to content

Commit

Permalink
Add new test to check state communication
Browse files Browse the repository at this point in the history
Gossip accepts predicate to indentify condition of new messages,
this commit externalize predicate for gossip anti entripy messages
and adds test to ensure messages delivered correctly between peers.

Change-Id: I8da7e7cafada33bf8a7087cbdd1d1b87973cb5ab
Signed-off-by: Artem Barger <bartem@il.ibm.com>
  • Loading branch information
C0rWin committed Jan 9, 2017
1 parent afd3884 commit 9e05f49
Show file tree
Hide file tree
Showing 2 changed files with 74 additions and 5 deletions.
12 changes: 7 additions & 5 deletions gossip/state/state.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,11 @@ var logFormat = logging.MustStringFormatter(
`%{color}%{level} %{longfunc}():%{color:reset}(%{module})%{message}`,
)

var remoteStateMsgFilter = func(message interface{}) bool {
return message.(comm.ReceivedMessage).GetGossipMessage().IsRemoteStateMessage()
}


const (
defPollingPeriod = 200 * time.Millisecond
defAntiEntropyInterval = 10 * time.Second
Expand Down Expand Up @@ -89,7 +94,6 @@ type GossipStateProviderImpl struct {
// NewGossipStateProvider creates initialized instance of gossip state provider
func NewGossipStateProvider(chainID string, g gossip.Gossip, committer committer.Committer) GossipStateProvider {
logger, _ := logging.GetLogger("GossipStateProvider")
logging.SetLevel(logging.DEBUG, logger.Module)

gossipChan, _ := g.Accept(func(message interface{}) bool {
// Get only data messages
Expand All @@ -98,9 +102,7 @@ func NewGossipStateProvider(chainID string, g gossip.Gossip, committer committer
}, false)

// Filter message which are only relevant for state transfer
_, commChan := g.Accept(func(message interface{}) bool {
return message.(comm.ReceivedMessage).GetGossipMessage().IsRemoteStateMessage()
}, true)
_, commChan := g.Accept(remoteStateMsgFilter, true)

height, err := committer.LedgerHeight()

Expand Down Expand Up @@ -139,7 +141,7 @@ func NewGossipStateProvider(chainID string, g gossip.Gossip, committer committer
s.logger.Infof("Updating node metadata information, current ledger sequence is at = %d, next expected block is = %d", state.LedgerHeight, s.payloads.Next())
bytes, err := state.Bytes()
if err == nil {
s.logger.Debug("[VVV]: Updating gossip metadate state", state)
s.logger.Debug("Updating gossip metadate state", state)
g.UpdateChannelMetadata(bytes, common2.ChainID(s.chainID))
} else {
s.logger.Errorf("Unable to serialize node meta state, error = %s", err)
Expand Down
67 changes: 67 additions & 0 deletions gossip/state/state_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import (
"bytes"
"fmt"
"strconv"
"sync"
"testing"
"time"

Expand All @@ -28,12 +29,14 @@ import (
"github.com/hyperledger/fabric/core/ledger/ledgermgmt"
"github.com/hyperledger/fabric/core/util"
"github.com/hyperledger/fabric/gossip/api"
"github.com/hyperledger/fabric/gossip/comm"
"github.com/hyperledger/fabric/gossip/common"
"github.com/hyperledger/fabric/gossip/gossip"
"github.com/hyperledger/fabric/gossip/proto"
pcomm "github.com/hyperledger/fabric/protos/common"
"github.com/op/go-logging"
"github.com/spf13/viper"
"github.com/stretchr/testify/assert"
)

var (
Expand Down Expand Up @@ -332,6 +335,70 @@ func TestNewGossipStateProvider_SendingManyMessages(t *testing.T) {
}, 60*time.Second)
}

func TestGossipStateProvider_TestStateMessages(t *testing.T) {
viper.Set("peer.fileSystemPath", "/tmp/tests/ledger/node")
ledgermgmt.InitializeTestEnv()
defer ledgermgmt.CleanupTestEnv()

bootPeer := newPeerNode(newGossipConfig(0, 100), newCommitter(0))
defer bootPeer.shutdown()

peer := newPeerNode(newGossipConfig(1, 100, 0), newCommitter(1))
defer peer.shutdown()

_, bootCh := bootPeer.g.Accept(remoteStateMsgFilter, true)
_, peerCh := peer.g.Accept(remoteStateMsgFilter, true)

wg := sync.WaitGroup{}
wg.Add(2)

go func() {
msg := <-bootCh
logger.Info("Bootstrap node got message, ", msg)
assert.True(t, msg.GetGossipMessage().GetStateRequest() != nil)
msg.Respond(&proto.GossipMessage{
Content: &proto.GossipMessage_StateResponse{&proto.RemoteStateResponse{nil}},
})
wg.Done()
}()

go func() {
msg := <-peerCh
logger.Info("Peer node got an answer, ", msg)
assert.True(t, msg.GetGossipMessage().GetStateResponse() != nil)
wg.Done()

}()

readyCh := make(chan struct{})
go func() {
wg.Wait()
readyCh <- struct{}{}
}()

time.Sleep(time.Duration(5) * time.Second)
logger.Info("Sending gossip message with remote state request")

chainID := common.ChainID(util.GetTestChainID())

peer.g.Send(&proto.GossipMessage{
Content: &proto.GossipMessage_StateRequest{&proto.RemoteStateRequest{nil}},
}, &comm.RemotePeer{peer.g.PeersOfChannel(chainID)[0].Endpoint, peer.g.PeersOfChannel(chainID)[0].PKIid})
logger.Info("Waiting until peers exchange messages")

select {
case <-readyCh:
{
logger.Info("[XXX]: Done!!!")

}
case <-time.After(time.Duration(10) * time.Second):
{
t.Fail()
}
}
}

func waitUntilTrueOrTimeout(t *testing.T, predicate func() bool, timeout time.Duration) {
ch := make(chan struct{})
go func() {
Expand Down

0 comments on commit 9e05f49

Please sign in to comment.