From 5b2baa084d3657f702245448c7bfb0519de1d020 Mon Sep 17 00:00:00 2001 From: Gennady Laventman Date: Tue, 7 Mar 2017 11:56:32 +0200 Subject: [PATCH] [FAB-2777] Alive msg handling Move alive message store from gossip impl to discovy impl to keep all alive messages handling in one place Change-Id: I98157f1c208375b95db497f8fe8d1fac07ba97f3 Signed-off-by: Gennady Laventman --- gossip/discovery/discovery_impl.go | 10 ++++++++++ gossip/gossip/gossip_impl.go | 11 ----------- 2 files changed, 10 insertions(+), 11 deletions(-) diff --git a/gossip/discovery/discovery_impl.go b/gossip/discovery/discovery_impl.go index b6d80441b55..231709b23c1 100644 --- a/gossip/discovery/discovery_impl.go +++ b/gossip/discovery/discovery_impl.go @@ -27,6 +27,7 @@ import ( "strings" "github.com/hyperledger/fabric/gossip/common" + "github.com/hyperledger/fabric/gossip/gossip/msgstore" "github.com/hyperledger/fabric/gossip/util" proto "github.com/hyperledger/fabric/protos/gossip" "github.com/op/go-logging" @@ -79,6 +80,8 @@ type gossipDiscoveryImpl struct { aliveMembership *util.MembershipStore deadMembership *util.MembershipStore + msgStore msgstore.MessageStore + bootstrapPeers []string comm CommService @@ -102,6 +105,7 @@ func NewDiscoveryService(bootstrapPeers []string, self NetworkMember, comm CommS id2Member: make(map[string]*NetworkMember), aliveMembership: util.NewMembershipStore(), deadMembership: util.NewMembershipStore(), + msgStore: msgstore.NewMessageStore(proto.NewGossipMessageComparator(0), func(m interface{}) {}), crypt: crypt, comm: comm, lock: &sync.RWMutex{}, @@ -319,6 +323,12 @@ func (d *gossipDiscoveryImpl) handleMsgFromComm(m *proto.SignedGossipMessage) { } if m.IsAliveMsg() { + added := d.msgStore.Add(m) + if !added { + return + } + d.comm.Gossip(m) + d.handleAliveMessage(m) return } diff --git a/gossip/gossip/gossip_impl.go b/gossip/gossip/gossip_impl.go index 28a068f9114..734d78aa59b 100644 --- a/gossip/gossip/gossip_impl.go +++ b/gossip/gossip/gossip_impl.go @@ -70,7 +70,6 @@ type gossipServiceImpl struct { chanState *channelState disSecAdap *discoverySecurityAdapter mcs api.MessageCryptoService - aliveMsgStore msgstore.MessageStore stateInfoMsgStore msgstore.MessageStore } @@ -110,8 +109,6 @@ func NewGossipService(conf *Config, s *grpc.Server, secAdvisor api.SecurityAdvis includeIdentityPeriod: time.Now().Add(conf.PublishCertPeriod), } - g.aliveMsgStore = msgstore.NewMessageStore(proto.NewGossipMessageComparator(0), func(m interface{}) {}) - g.chanState = newChannelState(g) g.emitter = newBatchingEmitter(conf.PropagateIterations, conf.MaxPropagationBurstSize, conf.MaxPropagationBurstLatency, @@ -298,14 +295,6 @@ func (g *gossipServiceImpl) handleMessage(m proto.ReceivedMessage) { return } - if msg.IsAliveMsg() { - added := g.aliveMsgStore.Add(msg) - if !added { - return - } - g.emitter.Add(msg) - } - if msg.IsChannelRestricted() { if gc := g.chanState.getGossipChannelByChainID(msg.Channel); gc == nil { // If we're not in the channel but we should forward to peers of our org