diff --git a/gossip/gossip/channel/channel.go b/gossip/gossip/channel/channel.go index c5e8d46e75f..82e8c53a15e 100644 --- a/gossip/gossip/channel/channel.go +++ b/gossip/gossip/channel/channel.go @@ -119,6 +119,7 @@ type gossipChannel struct { joinMsg api.JoinChannelMessage blockMsgStore msgstore.MessageStore stateInfoMsgStore msgstore.MessageStore + leaderMsgStore msgstore.MessageStore chainID common.ChainID blocksPuller pull.Mediator logger *util.Logger @@ -166,6 +167,7 @@ func NewGossipChannel(mcs api.MessageCryptoService, chainID common.ChainID, adap gc.stateInfoMsgStore = NewStateInfoMessageStore() gc.blocksPuller = gc.createBlockPuller() + gc.leaderMsgStore = msgstore.NewMessageStore(proto.NewGossipMessageComparator(0), func(m interface{}) {}) gc.ConfigureChannel(joinMsg) @@ -419,6 +421,15 @@ func (gc *gossipChannel) HandleMessage(msg comm.ReceivedMessage) { } gc.blocksPuller.HandleMessage(msg) } + + if m.IsLeadershipMsg() { + // Handling leadership message + added := gc.leaderMsgStore.Add(m) + if added { + gc.DeMultiplex(m) + } + + } } func (gc *gossipChannel) handleStateInfSnapshot(m *proto.GossipMessage, sender common.PKIidType) { diff --git a/gossip/gossip/gossip_impl.go b/gossip/gossip/gossip_impl.go index e4b947d8238..3d034154d0d 100644 --- a/gossip/gossip/gossip_impl.go +++ b/gossip/gossip/gossip_impl.go @@ -391,6 +391,7 @@ func (g *gossipServiceImpl) gossipBatch(msgs []*proto.GossipMessage) { var blocks []*proto.GossipMessage var stateInfoMsgs []*proto.GossipMessage var orgMsgs []*proto.GossipMessage + var leadershipMsgs []*proto.GossipMessage isABlock := func(o interface{}) bool { return o.(*proto.GossipMessage).IsDataMsg() @@ -401,6 +402,9 @@ func (g *gossipServiceImpl) gossipBatch(msgs []*proto.GossipMessage) { isOrgRestricted := func(o interface{}) bool { return o.(*proto.GossipMessage).IsOrgRestricted() } + isLeadershipMsg := func(o interface{}) bool { + return o.(*proto.GossipMessage).IsLeadershipMsg() + } // Gossip blocks blocks, msgs = partitionMessages(isABlock, msgs) @@ -414,6 +418,12 @@ func (g *gossipServiceImpl) gossipBatch(msgs []*proto.GossipMessage) { return gc.IsMemberInChan }) + //Gossip Leadership messages + leadershipMsgs, msgs = partitionMessages(isLeadershipMsg, msgs) + g.gossipInChan(leadershipMsgs, func(gc channel.GossipChannel) filter.RoutingFilter { + return filter.CombineRoutingFilters(gc.IsSubscribed, gc.IsMemberInChan, g.isInMyorg) + }) + // Gossip messages restricted to our org orgMsgs, msgs = partitionMessages(isOrgRestricted, msgs) peers2Send := filter.SelectPeers(g.conf.PropagatePeerNum, g.disc.GetMembership(), g.isInMyorg) @@ -451,10 +461,17 @@ func (g *gossipServiceImpl) gossipInChan(messages []*proto.GossipMessage, chanRo continue } // Select the peers to send the messages to - peers2Send := filter.SelectPeers(g.conf.PropagatePeerNum, g.disc.GetMembership(), chanRoutingFactory(gc)) + // For leadership messages we will select all peers that pass routing factory - e.g. all peers in channel and org + membership := g.disc.GetMembership() + allPeersInCh := filter.SelectPeers(len(membership), membership, chanRoutingFactory(gc)) + peers2Send := filter.SelectPeers(g.conf.PropagatePeerNum, membership, chanRoutingFactory(gc)) // Send the messages to the remote peers for _, msg := range messagesOfChannel { - g.comm.Send(msg, peers2Send...) + if msg.IsLeadershipMsg() { + g.comm.Send(msg, allPeersInCh...) + } else { + g.comm.Send(msg, peers2Send...) + } } } } diff --git a/gossip/gossip/gossip_test.go b/gossip/gossip/gossip_test.go index b904fba850c..a47e54a0e1d 100644 --- a/gossip/gossip/gossip_test.go +++ b/gossip/gossip/gossip_test.go @@ -62,6 +62,13 @@ func acceptData(m interface{}) bool { return false } +func acceptLeadershp(message interface{}) bool { + validMsg := message.(*proto.GossipMessage).Tag == proto.GossipMessage_CHAN_AND_ORG && + message.(*proto.GossipMessage).IsLeadershipMsg() + + return validMsg +} + type joinChanMsg struct { anchorPeers []api.AnchorPeer } @@ -489,6 +496,34 @@ func TestDissemination(t *testing.T) { for i := 0; i < n; i++ { assert.Equal(t, msgsCount2Send, receivedMessages[i]) } + + //Sending leadership messages + receivedLeadershipMessages := make([]int, n) + wgLeadership := sync.WaitGroup{} + wgLeadership.Add(n) + for i := 1; i <= n; i++ { + leadershipChan, _ := peers[i-1].Accept(acceptLeadershp, false) + go func(index int, ch <-chan *proto.GossipMessage) { + defer wgLeadership.Done() + <-ch + receivedLeadershipMessages[index]++ + }(i-1, leadershipChan) + } + + seqNum := 0 + incTime := uint64(time.Now().UnixNano()) + t3 := time.Now() + + leadershipMsg := createLeadershipMsg(true, common.ChainID("A"), incTime, uint64(seqNum), boot.(*gossipServiceImpl).conf.SelfEndpoint, boot.(*gossipServiceImpl).comm.GetPKIid()) + boot.Gossip(leadershipMsg) + + waitUntilOrFailBlocking(t, wgLeadership.Wait) + t.Log("Leadership message dissemination took", time.Since(t3)) + + for i := 0; i < n; i++ { + assert.Equal(t, 1, receivedLeadershipMessages[i]) + } + t.Log("Stopping peers") stop := func() { @@ -784,6 +819,78 @@ func TestEndedGoroutines(t *testing.T) { ensureGoroutineExit(t) } +//func TestLeadershipMsgDissemination(t *testing.T) { +// t.Parallel() +// portPrefix := 3610 +// t1 := time.Now() +// // Scenario: 20 nodes and a bootstrap node. +// // The bootstrap node sends 10 leadership messages and we count +// // that each node got 10 messages after a few seconds +// +// stopped := int32(0) +// go waitForTestCompletion(&stopped, t) +// +// n := 10 +// msgsCount2Send := 10 +// boot := newGossipInstance(portPrefix, 0, 100) +// boot.JoinChan(&joinChanMsg{}, common.ChainID("A")) +// boot.UpdateChannelMetadata([]byte{}, common.ChainID("A")) +// +// peers := make([]Gossip, n) +// receivedMessages := make([]int, n) +// wg := sync.WaitGroup{} +// wg.Add(n) +// for i := 1; i <= n; i++ { +// pI := newGossipInstance(portPrefix, i, 100, 0) +// peers[i-1] = pI +// pI.JoinChan(&joinChanMsg{}, common.ChainID("A")) +// pI.UpdateChannelMetadata([]byte{}, common.ChainID("A")) +// acceptChan, _ := pI.Accept(acceptLeadershp, false) +// go func(index int, ch <-chan *proto.GossipMessage) { +// defer wg.Done() +// for j := 0; j < msgsCount2Send; j++ { +// <-ch +// receivedMessages[index]++ +// } +// }(i-1, acceptChan) +// } +// +// membershipTime := time.Now() +// waitUntilOrFail(t, checkPeersMembership(peers, n)) +// t.Log("Membership establishment took", time.Since(membershipTime)) +// +// seqNum := 0 +// incTime := uint64(time.Now().UnixNano()) +// +// for i := 1; i <= msgsCount2Send; i++ { +// seqNum++ +// leadershipMsg := createLeadershipMsg(true, common.ChainID("A"), incTime, uint64(seqNum), boot.(*gossipServiceImpl).conf.SelfEndpoint, boot.(*gossipServiceImpl).comm.GetPKIid()) +// boot.Gossip(leadershipMsg) +// time.Sleep(time.Duration(500) * time.Millisecond) +// } +// +// t2 := time.Now() +// waitUntilOrFailBlocking(t, wg.Wait) +// t.Log("Leadership message dissemination took", time.Since(t2)) +// +// for i := 0; i < n; i++ { +// assert.Equal(t, msgsCount2Send, receivedMessages[i]) +// } +// t.Log("Stopping peers") +// +// stop := func() { +// stopPeers(append(peers, boot)) +// } +// +// stopTime := time.Now() +// waitUntilOrFailBlocking(t, stop) +// t.Log("Stop took", time.Since(stopTime)) +// t.Log("Took", time.Since(t1)) +// atomic.StoreInt32(&stopped, int32(1)) +// fmt.Println("<<>>") +// testWG.Done() +//} + func createDataMsg(seqnum uint64, data []byte, hash string, channel common.ChainID) *proto.GossipMessage { return &proto.GossipMessage{ Channel: []byte(channel), @@ -801,6 +908,32 @@ func createDataMsg(seqnum uint64, data []byte, hash string, channel common.Chain } } +func createLeadershipMsg(isDeclaration bool, channel common.ChainID, incTime uint64, seqNum uint64, endpoint string, pkiid []byte) *proto.GossipMessage { + + metadata := []byte{} + metadata = strconv.AppendBool(metadata, isDeclaration) + + leadershipMsg := &proto.LeadershipMessage{ + Membership: &proto.Member{ + PkiID: pkiid, + Endpoint: endpoint, + Metadata: metadata, + }, + Timestamp: &proto.PeerTime{ + IncNumber: incTime, + SeqNum: seqNum, + }, + } + + msg := &proto.GossipMessage{ + Nonce: 0, + Tag: proto.GossipMessage_CHAN_AND_ORG, + Content: &proto.GossipMessage_LeadershipMsg{LeadershipMsg: leadershipMsg}, + Channel: channel, + } + return msg +} + type goroutinePredicate func(g goroutine) bool var connectionLeak = func(g goroutine) bool { diff --git a/gossip/proto/extensions.go b/gossip/proto/extensions.go index 1721b66ff3d..69e933d9f72 100644 --- a/gossip/proto/extensions.go +++ b/gossip/proto/extensions.go @@ -61,6 +61,10 @@ func (mc *msgComparator) invalidationPolicy(this interface{}, that interface{}) return mc.identityInvalidationPolicy(thisMsg.GetPeerIdentity(), thatMsg.GetPeerIdentity()) } + if thisMsg.IsLeadershipMsg() && thatMsg.IsLeadershipMsg() { + return leaderInvalidationPolicy(thisMsg.GetLeadershipMsg(), thatMsg.GetLeadershipMsg()) + } + return common.MessageNoAction } @@ -106,6 +110,14 @@ func aliveInvalidationPolicy(thisMsg *AliveMessage, thatMsg *AliveMessage) commo return compareTimestamps(thisMsg.Timestamp, thatMsg.Timestamp) } +func leaderInvalidationPolicy(thisMsg *LeadershipMessage, thatMsg *LeadershipMessage) common.InvalidationResult { + if !bytes.Equal(thisMsg.Membership.PkiID, thatMsg.Membership.PkiID) { + return common.MessageNoAction + } + + return compareTimestamps(thisMsg.Timestamp, thatMsg.Timestamp) +} + func compareTimestamps(thisTS *PeerTime, thatTS *PeerTime) common.InvalidationResult { if thisTS.IncNumber == thatTS.IncNumber { if thisTS.SeqNum > thatTS.SeqNum { @@ -279,6 +291,13 @@ func (m *GossipMessage) IsTagLegal() error { return nil } + if m.IsLeadershipMsg() { + if m.Tag != GossipMessage_CHAN_AND_ORG { + return fmt.Errorf("Tag should be %s", GossipMessage_Tag_name[int32(GossipMessage_CHAN_AND_ORG)]) + } + return nil + } + return fmt.Errorf("Unknown message type: %v", m) }