From 9e05f4934becc01e38820914cd08b9a23c23eb96 Mon Sep 17 00:00:00 2001 From: Artem Barger Date: Sun, 8 Jan 2017 23:58:39 +0200 Subject: [PATCH] Add new test to check state communication Gossip accepts predicate to indentify condition of new messages, this commit externalize predicate for gossip anti entripy messages and adds test to ensure messages delivered correctly between peers. Change-Id: I8da7e7cafada33bf8a7087cbdd1d1b87973cb5ab Signed-off-by: Artem Barger --- gossip/state/state.go | 12 ++++--- gossip/state/state_test.go | 67 ++++++++++++++++++++++++++++++++++++++ 2 files changed, 74 insertions(+), 5 deletions(-) diff --git a/gossip/state/state.go b/gossip/state/state.go index 9904646f4ac..f2930d11a6f 100644 --- a/gossip/state/state.go +++ b/gossip/state/state.go @@ -51,6 +51,11 @@ var logFormat = logging.MustStringFormatter( `%{color}%{level} %{longfunc}():%{color:reset}(%{module})%{message}`, ) +var remoteStateMsgFilter = func(message interface{}) bool { + return message.(comm.ReceivedMessage).GetGossipMessage().IsRemoteStateMessage() +} + + const ( defPollingPeriod = 200 * time.Millisecond defAntiEntropyInterval = 10 * time.Second @@ -89,7 +94,6 @@ type GossipStateProviderImpl struct { // NewGossipStateProvider creates initialized instance of gossip state provider func NewGossipStateProvider(chainID string, g gossip.Gossip, committer committer.Committer) GossipStateProvider { logger, _ := logging.GetLogger("GossipStateProvider") - logging.SetLevel(logging.DEBUG, logger.Module) gossipChan, _ := g.Accept(func(message interface{}) bool { // Get only data messages @@ -98,9 +102,7 @@ func NewGossipStateProvider(chainID string, g gossip.Gossip, committer committer }, false) // Filter message which are only relevant for state transfer - _, commChan := g.Accept(func(message interface{}) bool { - return message.(comm.ReceivedMessage).GetGossipMessage().IsRemoteStateMessage() - }, true) + _, commChan := g.Accept(remoteStateMsgFilter, true) height, err := committer.LedgerHeight() @@ -139,7 +141,7 @@ func NewGossipStateProvider(chainID string, g gossip.Gossip, committer committer s.logger.Infof("Updating node metadata information, current ledger sequence is at = %d, next expected block is = %d", state.LedgerHeight, s.payloads.Next()) bytes, err := state.Bytes() if err == nil { - s.logger.Debug("[VVV]: Updating gossip metadate state", state) + s.logger.Debug("Updating gossip metadate state", state) g.UpdateChannelMetadata(bytes, common2.ChainID(s.chainID)) } else { s.logger.Errorf("Unable to serialize node meta state, error = %s", err) diff --git a/gossip/state/state_test.go b/gossip/state/state_test.go index 8336cf4c423..6f8da5f0cd4 100644 --- a/gossip/state/state_test.go +++ b/gossip/state/state_test.go @@ -20,6 +20,7 @@ import ( "bytes" "fmt" "strconv" + "sync" "testing" "time" @@ -28,12 +29,14 @@ import ( "github.com/hyperledger/fabric/core/ledger/ledgermgmt" "github.com/hyperledger/fabric/core/util" "github.com/hyperledger/fabric/gossip/api" + "github.com/hyperledger/fabric/gossip/comm" "github.com/hyperledger/fabric/gossip/common" "github.com/hyperledger/fabric/gossip/gossip" "github.com/hyperledger/fabric/gossip/proto" pcomm "github.com/hyperledger/fabric/protos/common" "github.com/op/go-logging" "github.com/spf13/viper" + "github.com/stretchr/testify/assert" ) var ( @@ -332,6 +335,70 @@ func TestNewGossipStateProvider_SendingManyMessages(t *testing.T) { }, 60*time.Second) } +func TestGossipStateProvider_TestStateMessages(t *testing.T) { + viper.Set("peer.fileSystemPath", "/tmp/tests/ledger/node") + ledgermgmt.InitializeTestEnv() + defer ledgermgmt.CleanupTestEnv() + + bootPeer := newPeerNode(newGossipConfig(0, 100), newCommitter(0)) + defer bootPeer.shutdown() + + peer := newPeerNode(newGossipConfig(1, 100, 0), newCommitter(1)) + defer peer.shutdown() + + _, bootCh := bootPeer.g.Accept(remoteStateMsgFilter, true) + _, peerCh := peer.g.Accept(remoteStateMsgFilter, true) + + wg := sync.WaitGroup{} + wg.Add(2) + + go func() { + msg := <-bootCh + logger.Info("Bootstrap node got message, ", msg) + assert.True(t, msg.GetGossipMessage().GetStateRequest() != nil) + msg.Respond(&proto.GossipMessage{ + Content: &proto.GossipMessage_StateResponse{&proto.RemoteStateResponse{nil}}, + }) + wg.Done() + }() + + go func() { + msg := <-peerCh + logger.Info("Peer node got an answer, ", msg) + assert.True(t, msg.GetGossipMessage().GetStateResponse() != nil) + wg.Done() + + }() + + readyCh := make(chan struct{}) + go func() { + wg.Wait() + readyCh <- struct{}{} + }() + + time.Sleep(time.Duration(5) * time.Second) + logger.Info("Sending gossip message with remote state request") + + chainID := common.ChainID(util.GetTestChainID()) + + peer.g.Send(&proto.GossipMessage{ + Content: &proto.GossipMessage_StateRequest{&proto.RemoteStateRequest{nil}}, + }, &comm.RemotePeer{peer.g.PeersOfChannel(chainID)[0].Endpoint, peer.g.PeersOfChannel(chainID)[0].PKIid}) + logger.Info("Waiting until peers exchange messages") + + select { + case <-readyCh: + { + logger.Info("[XXX]: Done!!!") + + } + case <-time.After(time.Duration(10) * time.Second): + { + t.Fail() + } + } +} + func waitUntilTrueOrTimeout(t *testing.T, predicate func() bool, timeout time.Duration) { ch := make(chan struct{}) go func() {