Skip to content

Commit

Permalink
FAB-1286 Support for leadership message - push
Browse files Browse the repository at this point in the history
Naive implementation - leadership messages sent to all peers
in membership.

Change-Id: Ifab3a75ab0601ab7afbe8883283bc86760760cf2
Signed-off-by: Gennady Laventman <gennady@il.ibm.com>
  • Loading branch information
gennadylaventman committed Jan 30, 2017
1 parent e0411a6 commit a429da3
Show file tree
Hide file tree
Showing 4 changed files with 182 additions and 2 deletions.
11 changes: 11 additions & 0 deletions gossip/gossip/channel/channel.go
Original file line number Diff line number Diff line change
Expand Up @@ -119,6 +119,7 @@ type gossipChannel struct {
joinMsg api.JoinChannelMessage
blockMsgStore msgstore.MessageStore
stateInfoMsgStore msgstore.MessageStore
leaderMsgStore msgstore.MessageStore
chainID common.ChainID
blocksPuller pull.Mediator
logger *util.Logger
Expand Down Expand Up @@ -166,6 +167,7 @@ func NewGossipChannel(mcs api.MessageCryptoService, chainID common.ChainID, adap

gc.stateInfoMsgStore = NewStateInfoMessageStore()
gc.blocksPuller = gc.createBlockPuller()
gc.leaderMsgStore = msgstore.NewMessageStore(proto.NewGossipMessageComparator(0), func(m interface{}) {})

gc.ConfigureChannel(joinMsg)

Expand Down Expand Up @@ -419,6 +421,15 @@ func (gc *gossipChannel) HandleMessage(msg comm.ReceivedMessage) {
}
gc.blocksPuller.HandleMessage(msg)
}

if m.IsLeadershipMsg() {
// Handling leadership message
added := gc.leaderMsgStore.Add(m)
if added {
gc.DeMultiplex(m)
}

}
}

func (gc *gossipChannel) handleStateInfSnapshot(m *proto.GossipMessage, sender common.PKIidType) {
Expand Down
21 changes: 19 additions & 2 deletions gossip/gossip/gossip_impl.go
Original file line number Diff line number Diff line change
Expand Up @@ -391,6 +391,7 @@ func (g *gossipServiceImpl) gossipBatch(msgs []*proto.GossipMessage) {
var blocks []*proto.GossipMessage
var stateInfoMsgs []*proto.GossipMessage
var orgMsgs []*proto.GossipMessage
var leadershipMsgs []*proto.GossipMessage

isABlock := func(o interface{}) bool {
return o.(*proto.GossipMessage).IsDataMsg()
Expand All @@ -401,6 +402,9 @@ func (g *gossipServiceImpl) gossipBatch(msgs []*proto.GossipMessage) {
isOrgRestricted := func(o interface{}) bool {
return o.(*proto.GossipMessage).IsOrgRestricted()
}
isLeadershipMsg := func(o interface{}) bool {
return o.(*proto.GossipMessage).IsLeadershipMsg()
}

// Gossip blocks
blocks, msgs = partitionMessages(isABlock, msgs)
Expand All @@ -414,6 +418,12 @@ func (g *gossipServiceImpl) gossipBatch(msgs []*proto.GossipMessage) {
return gc.IsMemberInChan
})

//Gossip Leadership messages
leadershipMsgs, msgs = partitionMessages(isLeadershipMsg, msgs)
g.gossipInChan(leadershipMsgs, func(gc channel.GossipChannel) filter.RoutingFilter {
return filter.CombineRoutingFilters(gc.IsSubscribed, gc.IsMemberInChan, g.isInMyorg)
})

// Gossip messages restricted to our org
orgMsgs, msgs = partitionMessages(isOrgRestricted, msgs)
peers2Send := filter.SelectPeers(g.conf.PropagatePeerNum, g.disc.GetMembership(), g.isInMyorg)
Expand Down Expand Up @@ -451,10 +461,17 @@ func (g *gossipServiceImpl) gossipInChan(messages []*proto.GossipMessage, chanRo
continue
}
// Select the peers to send the messages to
peers2Send := filter.SelectPeers(g.conf.PropagatePeerNum, g.disc.GetMembership(), chanRoutingFactory(gc))
// For leadership messages we will select all peers that pass routing factory - e.g. all peers in channel and org
membership := g.disc.GetMembership()
allPeersInCh := filter.SelectPeers(len(membership), membership, chanRoutingFactory(gc))
peers2Send := filter.SelectPeers(g.conf.PropagatePeerNum, membership, chanRoutingFactory(gc))
// Send the messages to the remote peers
for _, msg := range messagesOfChannel {
g.comm.Send(msg, peers2Send...)
if msg.IsLeadershipMsg() {
g.comm.Send(msg, allPeersInCh...)
} else {
g.comm.Send(msg, peers2Send...)
}
}
}
}
Expand Down
133 changes: 133 additions & 0 deletions gossip/gossip/gossip_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,13 @@ func acceptData(m interface{}) bool {
return false
}

func acceptLeadershp(message interface{}) bool {
validMsg := message.(*proto.GossipMessage).Tag == proto.GossipMessage_CHAN_AND_ORG &&
message.(*proto.GossipMessage).IsLeadershipMsg()

return validMsg
}

type joinChanMsg struct {
anchorPeers []api.AnchorPeer
}
Expand Down Expand Up @@ -489,6 +496,34 @@ func TestDissemination(t *testing.T) {
for i := 0; i < n; i++ {
assert.Equal(t, msgsCount2Send, receivedMessages[i])
}

//Sending leadership messages
receivedLeadershipMessages := make([]int, n)
wgLeadership := sync.WaitGroup{}
wgLeadership.Add(n)
for i := 1; i <= n; i++ {
leadershipChan, _ := peers[i-1].Accept(acceptLeadershp, false)
go func(index int, ch <-chan *proto.GossipMessage) {
defer wgLeadership.Done()
<-ch
receivedLeadershipMessages[index]++
}(i-1, leadershipChan)
}

seqNum := 0
incTime := uint64(time.Now().UnixNano())
t3 := time.Now()

leadershipMsg := createLeadershipMsg(true, common.ChainID("A"), incTime, uint64(seqNum), boot.(*gossipServiceImpl).conf.SelfEndpoint, boot.(*gossipServiceImpl).comm.GetPKIid())
boot.Gossip(leadershipMsg)

waitUntilOrFailBlocking(t, wgLeadership.Wait)
t.Log("Leadership message dissemination took", time.Since(t3))

for i := 0; i < n; i++ {
assert.Equal(t, 1, receivedLeadershipMessages[i])
}

t.Log("Stopping peers")

stop := func() {
Expand Down Expand Up @@ -784,6 +819,78 @@ func TestEndedGoroutines(t *testing.T) {
ensureGoroutineExit(t)
}

//func TestLeadershipMsgDissemination(t *testing.T) {
// t.Parallel()
// portPrefix := 3610
// t1 := time.Now()
// // Scenario: 20 nodes and a bootstrap node.
// // The bootstrap node sends 10 leadership messages and we count
// // that each node got 10 messages after a few seconds
//
// stopped := int32(0)
// go waitForTestCompletion(&stopped, t)
//
// n := 10
// msgsCount2Send := 10
// boot := newGossipInstance(portPrefix, 0, 100)
// boot.JoinChan(&joinChanMsg{}, common.ChainID("A"))
// boot.UpdateChannelMetadata([]byte{}, common.ChainID("A"))
//
// peers := make([]Gossip, n)
// receivedMessages := make([]int, n)
// wg := sync.WaitGroup{}
// wg.Add(n)
// for i := 1; i <= n; i++ {
// pI := newGossipInstance(portPrefix, i, 100, 0)
// peers[i-1] = pI
// pI.JoinChan(&joinChanMsg{}, common.ChainID("A"))
// pI.UpdateChannelMetadata([]byte{}, common.ChainID("A"))
// acceptChan, _ := pI.Accept(acceptLeadershp, false)
// go func(index int, ch <-chan *proto.GossipMessage) {
// defer wg.Done()
// for j := 0; j < msgsCount2Send; j++ {
// <-ch
// receivedMessages[index]++
// }
// }(i-1, acceptChan)
// }
//
// membershipTime := time.Now()
// waitUntilOrFail(t, checkPeersMembership(peers, n))
// t.Log("Membership establishment took", time.Since(membershipTime))
//
// seqNum := 0
// incTime := uint64(time.Now().UnixNano())
//
// for i := 1; i <= msgsCount2Send; i++ {
// seqNum++
// leadershipMsg := createLeadershipMsg(true, common.ChainID("A"), incTime, uint64(seqNum), boot.(*gossipServiceImpl).conf.SelfEndpoint, boot.(*gossipServiceImpl).comm.GetPKIid())
// boot.Gossip(leadershipMsg)
// time.Sleep(time.Duration(500) * time.Millisecond)
// }
//
// t2 := time.Now()
// waitUntilOrFailBlocking(t, wg.Wait)
// t.Log("Leadership message dissemination took", time.Since(t2))
//
// for i := 0; i < n; i++ {
// assert.Equal(t, msgsCount2Send, receivedMessages[i])
// }
// t.Log("Stopping peers")
//
// stop := func() {
// stopPeers(append(peers, boot))
// }
//
// stopTime := time.Now()
// waitUntilOrFailBlocking(t, stop)
// t.Log("Stop took", time.Since(stopTime))
// t.Log("Took", time.Since(t1))
// atomic.StoreInt32(&stopped, int32(1))
// fmt.Println("<<<TestLeadershipMsgDissemination>>>")
// testWG.Done()
//}

func createDataMsg(seqnum uint64, data []byte, hash string, channel common.ChainID) *proto.GossipMessage {
return &proto.GossipMessage{
Channel: []byte(channel),
Expand All @@ -801,6 +908,32 @@ func createDataMsg(seqnum uint64, data []byte, hash string, channel common.Chain
}
}

func createLeadershipMsg(isDeclaration bool, channel common.ChainID, incTime uint64, seqNum uint64, endpoint string, pkiid []byte) *proto.GossipMessage {

metadata := []byte{}
metadata = strconv.AppendBool(metadata, isDeclaration)

leadershipMsg := &proto.LeadershipMessage{
Membership: &proto.Member{
PkiID: pkiid,
Endpoint: endpoint,
Metadata: metadata,
},
Timestamp: &proto.PeerTime{
IncNumber: incTime,
SeqNum: seqNum,
},
}

msg := &proto.GossipMessage{
Nonce: 0,
Tag: proto.GossipMessage_CHAN_AND_ORG,
Content: &proto.GossipMessage_LeadershipMsg{LeadershipMsg: leadershipMsg},
Channel: channel,
}
return msg
}

type goroutinePredicate func(g goroutine) bool

var connectionLeak = func(g goroutine) bool {
Expand Down
19 changes: 19 additions & 0 deletions gossip/proto/extensions.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,10 @@ func (mc *msgComparator) invalidationPolicy(this interface{}, that interface{})
return mc.identityInvalidationPolicy(thisMsg.GetPeerIdentity(), thatMsg.GetPeerIdentity())
}

if thisMsg.IsLeadershipMsg() && thatMsg.IsLeadershipMsg() {
return leaderInvalidationPolicy(thisMsg.GetLeadershipMsg(), thatMsg.GetLeadershipMsg())
}

return common.MessageNoAction
}

Expand Down Expand Up @@ -106,6 +110,14 @@ func aliveInvalidationPolicy(thisMsg *AliveMessage, thatMsg *AliveMessage) commo
return compareTimestamps(thisMsg.Timestamp, thatMsg.Timestamp)
}

func leaderInvalidationPolicy(thisMsg *LeadershipMessage, thatMsg *LeadershipMessage) common.InvalidationResult {
if !bytes.Equal(thisMsg.Membership.PkiID, thatMsg.Membership.PkiID) {
return common.MessageNoAction
}

return compareTimestamps(thisMsg.Timestamp, thatMsg.Timestamp)
}

func compareTimestamps(thisTS *PeerTime, thatTS *PeerTime) common.InvalidationResult {
if thisTS.IncNumber == thatTS.IncNumber {
if thisTS.SeqNum > thatTS.SeqNum {
Expand Down Expand Up @@ -279,6 +291,13 @@ func (m *GossipMessage) IsTagLegal() error {
return nil
}

if m.IsLeadershipMsg() {
if m.Tag != GossipMessage_CHAN_AND_ORG {
return fmt.Errorf("Tag should be %s", GossipMessage_Tag_name[int32(GossipMessage_CHAN_AND_ORG)])
}
return nil
}

return fmt.Errorf("Unknown message type: %v", m)
}

Expand Down

0 comments on commit a429da3

Please sign in to comment.