diff --git a/gossip/state/state.go b/gossip/state/state.go index 9904646f4ac..f2930d11a6f 100644 --- a/gossip/state/state.go +++ b/gossip/state/state.go @@ -51,6 +51,11 @@ var logFormat = logging.MustStringFormatter( `%{color}%{level} %{longfunc}():%{color:reset}(%{module})%{message}`, ) +var remoteStateMsgFilter = func(message interface{}) bool { + return message.(comm.ReceivedMessage).GetGossipMessage().IsRemoteStateMessage() +} + + const ( defPollingPeriod = 200 * time.Millisecond defAntiEntropyInterval = 10 * time.Second @@ -89,7 +94,6 @@ type GossipStateProviderImpl struct { // NewGossipStateProvider creates initialized instance of gossip state provider func NewGossipStateProvider(chainID string, g gossip.Gossip, committer committer.Committer) GossipStateProvider { logger, _ := logging.GetLogger("GossipStateProvider") - logging.SetLevel(logging.DEBUG, logger.Module) gossipChan, _ := g.Accept(func(message interface{}) bool { // Get only data messages @@ -98,9 +102,7 @@ func NewGossipStateProvider(chainID string, g gossip.Gossip, committer committer }, false) // Filter message which are only relevant for state transfer - _, commChan := g.Accept(func(message interface{}) bool { - return message.(comm.ReceivedMessage).GetGossipMessage().IsRemoteStateMessage() - }, true) + _, commChan := g.Accept(remoteStateMsgFilter, true) height, err := committer.LedgerHeight() @@ -139,7 +141,7 @@ func NewGossipStateProvider(chainID string, g gossip.Gossip, committer committer s.logger.Infof("Updating node metadata information, current ledger sequence is at = %d, next expected block is = %d", state.LedgerHeight, s.payloads.Next()) bytes, err := state.Bytes() if err == nil { - s.logger.Debug("[VVV]: Updating gossip metadate state", state) + s.logger.Debug("Updating gossip metadate state", state) g.UpdateChannelMetadata(bytes, common2.ChainID(s.chainID)) } else { s.logger.Errorf("Unable to serialize node meta state, error = %s", err) diff --git a/gossip/state/state_test.go b/gossip/state/state_test.go index 8336cf4c423..6f8da5f0cd4 100644 --- a/gossip/state/state_test.go +++ b/gossip/state/state_test.go @@ -20,6 +20,7 @@ import ( "bytes" "fmt" "strconv" + "sync" "testing" "time" @@ -28,12 +29,14 @@ import ( "github.com/hyperledger/fabric/core/ledger/ledgermgmt" "github.com/hyperledger/fabric/core/util" "github.com/hyperledger/fabric/gossip/api" + "github.com/hyperledger/fabric/gossip/comm" "github.com/hyperledger/fabric/gossip/common" "github.com/hyperledger/fabric/gossip/gossip" "github.com/hyperledger/fabric/gossip/proto" pcomm "github.com/hyperledger/fabric/protos/common" "github.com/op/go-logging" "github.com/spf13/viper" + "github.com/stretchr/testify/assert" ) var ( @@ -332,6 +335,70 @@ func TestNewGossipStateProvider_SendingManyMessages(t *testing.T) { }, 60*time.Second) } +func TestGossipStateProvider_TestStateMessages(t *testing.T) { + viper.Set("peer.fileSystemPath", "/tmp/tests/ledger/node") + ledgermgmt.InitializeTestEnv() + defer ledgermgmt.CleanupTestEnv() + + bootPeer := newPeerNode(newGossipConfig(0, 100), newCommitter(0)) + defer bootPeer.shutdown() + + peer := newPeerNode(newGossipConfig(1, 100, 0), newCommitter(1)) + defer peer.shutdown() + + _, bootCh := bootPeer.g.Accept(remoteStateMsgFilter, true) + _, peerCh := peer.g.Accept(remoteStateMsgFilter, true) + + wg := sync.WaitGroup{} + wg.Add(2) + + go func() { + msg := <-bootCh + logger.Info("Bootstrap node got message, ", msg) + assert.True(t, msg.GetGossipMessage().GetStateRequest() != nil) + msg.Respond(&proto.GossipMessage{ + Content: &proto.GossipMessage_StateResponse{&proto.RemoteStateResponse{nil}}, + }) + wg.Done() + }() + + go func() { + msg := <-peerCh + logger.Info("Peer node got an answer, ", msg) + assert.True(t, msg.GetGossipMessage().GetStateResponse() != nil) + wg.Done() + + }() + + readyCh := make(chan struct{}) + go func() { + wg.Wait() + readyCh <- struct{}{} + }() + + time.Sleep(time.Duration(5) * time.Second) + logger.Info("Sending gossip message with remote state request") + + chainID := common.ChainID(util.GetTestChainID()) + + peer.g.Send(&proto.GossipMessage{ + Content: &proto.GossipMessage_StateRequest{&proto.RemoteStateRequest{nil}}, + }, &comm.RemotePeer{peer.g.PeersOfChannel(chainID)[0].Endpoint, peer.g.PeersOfChannel(chainID)[0].PKIid}) + logger.Info("Waiting until peers exchange messages") + + select { + case <-readyCh: + { + logger.Info("[XXX]: Done!!!") + + } + case <-time.After(time.Duration(10) * time.Second): + { + t.Fail() + } + } +} + func waitUntilTrueOrTimeout(t *testing.T, predicate func() bool, timeout time.Duration) { ch := make(chan struct{}) go func() {