Skip to content

Commit e2375ff

Browse files
committed
[FAB-6063] Make peers ignore those left the channel
This commit adds a left channel flag to the stateInfo message's properties so that peers would ignore these peers. Change-Id: I55e281bf1bd933223da0c8ab253584901e22f6e9 Signed-off-by: yacovm <yacovm@il.ibm.com>
1 parent b48cea6 commit e2375ff

File tree

9 files changed

+300
-110
lines changed

9 files changed

+300
-110
lines changed

gossip/gossip/channel/channel.go

Lines changed: 27 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -76,6 +76,9 @@ type GossipChannel interface {
7676
// that are eligible to be in the channel
7777
ConfigureChannel(joinMsg api.JoinChannelMessage)
7878

79+
// LeaveChannel makes the peer leave the channel
80+
LeaveChannel()
81+
7982
// Stop stops the channel's activity
8083
Stop()
8184
}
@@ -134,6 +137,7 @@ type gossipChannel struct {
134137
stateInfoRequestScheduler *time.Ticker
135138
memFilter *membershipFilter
136139
ledgerHeight uint64
140+
leftChannel int32
137141
}
138142

139143
type membershipFilter struct {
@@ -143,6 +147,9 @@ type membershipFilter struct {
143147

144148
// GetMembership returns the known alive peers and their information
145149
func (mf *membershipFilter) GetMembership() []discovery.NetworkMember {
150+
if mf.hasLeftChannel() {
151+
return nil
152+
}
146153
var members []discovery.NetworkMember
147154
for _, mem := range mf.adapter.GetMembership() {
148155
if mf.eligibleForChannelAndSameOrg(mem) {
@@ -269,9 +276,21 @@ func (gc *gossipChannel) periodicalInvocation(fn func(), c <-chan time.Time) {
269276
}
270277
}
271278

279+
// LeaveChannel makes the peer leave the channel
280+
func (gc *gossipChannel) LeaveChannel() {
281+
atomic.StoreInt32(&gc.leftChannel, 1)
282+
}
283+
284+
func (gc *gossipChannel) hasLeftChannel() bool {
285+
return atomic.LoadInt32(&gc.leftChannel) == 1
286+
}
287+
272288
// GetPeers returns a list of peers with metadata as published by them
273289
func (gc *gossipChannel) GetPeers() []discovery.NetworkMember {
274290
members := []discovery.NetworkMember{}
291+
if gc.hasLeftChannel() {
292+
return members
293+
}
275294

276295
for _, member := range gc.GetMembership() {
277296
if !gc.EligibleForChannel(member) {
@@ -281,6 +300,10 @@ func (gc *gossipChannel) GetPeers() []discovery.NetworkMember {
281300
if stateInf == nil {
282301
continue
283302
}
303+
props := stateInf.GetStateInfo().Properties
304+
if props != nil && props.LeftChannel {
305+
continue
306+
}
284307
member.Metadata = stateInf.GetStateInfo().Metadata
285308
member.Properties = stateInf.GetStateInfo().Properties
286309
members = append(members, member)
@@ -530,6 +553,10 @@ func (gc *gossipChannel) HandleMessage(msg proto.ReceivedMessage) {
530553
}
531554

532555
if m.IsPullMsg() && m.GetPullMsgType() == proto.PullMsgType_BLOCK_MSG {
556+
if gc.hasLeftChannel() {
557+
gc.logger.Info("Received Pull message from", msg.GetConnectionInfo().Endpoint, "but left the channel", string(gc.chainID))
558+
return
559+
}
533560
// If we don't have a StateInfo message from the peer,
534561
// no way of validating its eligibility in the channel.
535562
if gc.stateInfoMsgStore.MsgByID(msg.GetConnectionInfo().ID) == nil {

gossip/gossip/channel/channel_test.go

Lines changed: 75 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -374,6 +374,81 @@ func TestMsgStoreNotExpire(t *testing.T) {
374374
assert.Len(t, c, 2)
375375
}
376376

377+
func TestLeaveChannel(t *testing.T) {
378+
// Scenario: Have our peer receive a stateInfo message
379+
// from a peer that has left the channel, and ensure that it skips it
380+
// when returning membership.
381+
// Next, have our own peer leave the channel and ensure:
382+
// 1) It doesn't return any members of the channel when queried
383+
// 2) It doesn't send anymore pull for blocks
384+
// 3) When asked for pull for blocks, it ignores the request
385+
t.Parallel()
386+
387+
jcm := &joinChanMsg{
388+
members2AnchorPeers: map[string][]api.AnchorPeer{
389+
"ORG1": {},
390+
"ORG2": {},
391+
},
392+
}
393+
394+
cs := &cryptoService{}
395+
cs.On("VerifyBlock", mock.Anything).Return(nil)
396+
adapter := new(gossipAdapterMock)
397+
adapter.On("Gossip", mock.Anything)
398+
adapter.On("DeMultiplex", mock.Anything)
399+
members := []discovery.NetworkMember{
400+
{PKIid: pkiIDInOrg1},
401+
{PKIid: pkiIDinOrg2},
402+
}
403+
var helloPullWG sync.WaitGroup
404+
helloPullWG.Add(1)
405+
configureAdapter(adapter, members...)
406+
gc := NewGossipChannel(common.PKIidType("p0"), orgInChannelA, cs, channelA, adapter, jcm)
407+
adapter.On("Send", mock.Anything, mock.Anything).Run(func(arguments mock.Arguments) {
408+
msg := arguments.Get(0).(*proto.SignedGossipMessage)
409+
if msg.IsPullMsg() {
410+
helloPullWG.Done()
411+
assert.False(t, gc.(*gossipChannel).hasLeftChannel())
412+
}
413+
})
414+
gc.HandleMessage(&receivedMsg{PKIID: pkiIDInOrg1, msg: createStateInfoMsg(1, pkiIDInOrg1, channelA)})
415+
gc.HandleMessage(&receivedMsg{PKIID: pkiIDinOrg2, msg: createStateInfoMsg(1, pkiIDinOrg2, channelA)})
416+
// Have some peer send a block to us, so we can send some peer a digest when hello is sent to us
417+
gc.HandleMessage(&receivedMsg{msg: createDataMsg(2, channelA), PKIID: pkiIDInOrg1})
418+
assert.Len(t, gc.GetPeers(), 2)
419+
// Now, have peer in org2 "leave the channel" by publishing is an update
420+
stateInfoMsg := &receivedMsg{PKIID: pkiIDinOrg2, msg: createStateInfoMsg(0, pkiIDinOrg2, channelA)}
421+
stateInfoMsg.GetGossipMessage().GetStateInfo().Properties.LeftChannel = true
422+
gc.HandleMessage(stateInfoMsg)
423+
assert.Len(t, gc.GetPeers(), 1)
424+
// Ensure peer in org1 remained and peer in org2 is skipped
425+
assert.Equal(t, pkiIDInOrg1, gc.GetPeers()[0].PKIid)
426+
var digestSendTime int32
427+
var DigestSentWg sync.WaitGroup
428+
DigestSentWg.Add(1)
429+
hello := createHelloMsg(pkiIDInOrg1)
430+
hello.On("Respond", mock.Anything).Run(func(arguments mock.Arguments) {
431+
atomic.AddInt32(&digestSendTime, 1)
432+
// Ensure we only respond with digest before we leave the channel
433+
assert.Equal(t, int32(1), atomic.LoadInt32(&digestSendTime))
434+
DigestSentWg.Done()
435+
})
436+
// Wait until we send a hello pull message
437+
helloPullWG.Wait()
438+
go gc.HandleMessage(hello)
439+
DigestSentWg.Wait()
440+
// Make the peer leave the channel
441+
gc.LeaveChannel()
442+
// Send another hello. Shouldn't respond
443+
go gc.HandleMessage(hello)
444+
// Ensure it doesn't know now any other peer
445+
assert.Len(t, gc.GetPeers(), 0)
446+
// Sleep 3 times the pull interval.
447+
// we're not supposed to send a pull during this time.
448+
time.Sleep(conf.PullInterval * 3)
449+
450+
}
451+
377452
func TestChannelPeriodicalPublishStateInfo(t *testing.T) {
378453
t.Parallel()
379454
ledgerHeight := 5

gossip/gossip/gossip.go

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -58,6 +58,12 @@ type Gossip interface {
5858
// JoinChan makes the Gossip instance join a channel
5959
JoinChan(joinMsg api.JoinChannelMessage, chainID common.ChainID)
6060

61+
// LeaveChan makes the Gossip instance leave a channel.
62+
// It still disseminates stateInfo message, but doesn't participate
63+
// in block pulling anymore, and can't return anymore a list of peers
64+
// in the channel.
65+
LeaveChan(chainID common.ChainID)
66+
6167
// SuspectPeers makes the gossip instance validate identities of suspected peers, and close
6268
// any connections to peers with identities that are found invalid
6369
SuspectPeers(s api.PeerSuspector)

gossip/gossip/gossip_impl.go

Lines changed: 21 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -188,6 +188,22 @@ func (g *gossipServiceImpl) JoinChan(joinMsg api.JoinChannelMessage, chainID com
188188
}
189189
}
190190

191+
func (g *gossipServiceImpl) LeaveChan(chainID common.ChainID) {
192+
gc := g.chanState.getGossipChannelByChainID(chainID)
193+
if gc == nil {
194+
g.logger.Debug("No such channel", chainID)
195+
return
196+
}
197+
b, _ := (&common.NodeMetastate{}).Bytes()
198+
stateInfMsg, err := g.createStateInfoMsg(b, chainID, true)
199+
if err != nil {
200+
g.logger.Errorf("Failed creating StateInfo message: %+v", errors.WithStack(err))
201+
return
202+
}
203+
gc.UpdateStateInfo(stateInfMsg)
204+
gc.LeaveChannel()
205+
}
206+
191207
// SuspectPeers makes the gossip instance validate identities of suspected peers, and close
192208
// any connections to peers with identities that are found invalid
193209
func (g *gossipServiceImpl) SuspectPeers(isSuspected api.PeerSuspector) {
@@ -733,7 +749,7 @@ func (g *gossipServiceImpl) UpdateChannelMetadata(md []byte, chainID common.Chai
733749
g.logger.Debug("No such channel", chainID)
734750
return
735751
}
736-
stateInfMsg, err := g.createStateInfoMsg(md, chainID)
752+
stateInfMsg, err := g.createStateInfoMsg(md, chainID, false)
737753
if err != nil {
738754
g.logger.Errorf("Failed creating StateInfo message: %+v", errors.WithStack(err))
739755
return
@@ -1088,7 +1104,7 @@ func (g *gossipServiceImpl) connect2BootstrapPeers() {
10881104

10891105
}
10901106

1091-
func (g *gossipServiceImpl) createStateInfoMsg(metadata []byte, chainID common.ChainID) (*proto.SignedGossipMessage, error) {
1107+
func (g *gossipServiceImpl) createStateInfoMsg(metadata []byte, chainID common.ChainID, leftChannel bool) (*proto.SignedGossipMessage, error) {
10921108
metaState, err := common.FromBytes(metadata)
10931109
if err != nil {
10941110
return nil, err
@@ -1106,6 +1122,9 @@ func (g *gossipServiceImpl) createStateInfoMsg(metadata []byte, chainID common.C
11061122
LedgerHeight: metaState.LedgerHeight,
11071123
},
11081124
}
1125+
if leftChannel {
1126+
stateInfMsg.Properties.LeftChannel = true
1127+
}
11091128
m := &proto.GossipMessage{
11101129
Nonce: 0,
11111130
Tag: proto.GossipMessage_CHAN_OR_ORG,

gossip/gossip/gossip_test.go

Lines changed: 45 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -42,6 +42,7 @@ var tests = []func(t *testing.T){
4242
TestMembershipConvergence,
4343
TestMembershipRequestSpoofing,
4444
TestDataLeakage,
45+
TestLeaveChannel,
4546
//TestDisseminateAll2All: {},
4647
TestIdentityExpiration,
4748
TestSendByCriteria,
@@ -272,6 +273,50 @@ func newGossipInstanceWithOnlyPull(portPrefix int, id int, maxMsgCount int, boot
272273
return g
273274
}
274275

276+
func TestLeaveChannel(t *testing.T) {
277+
t.Parallel()
278+
defer testWG.Done()
279+
portPrefix := 4500
280+
// Scenario: Have 3 peers in a channel and make one of them leave it.
281+
// Ensure the peers don't recognize the other peer when it left the channel
282+
283+
p0 := newGossipInstance(portPrefix, 0, 100, 2)
284+
p0.JoinChan(&joinChanMsg{}, common.ChainID("A"))
285+
p0.UpdateChannelMetadata(createMetadata(1), common.ChainID("A"))
286+
defer p0.Stop()
287+
288+
p1 := newGossipInstance(portPrefix, 1, 100, 0)
289+
p1.JoinChan(&joinChanMsg{}, common.ChainID("A"))
290+
p1.UpdateChannelMetadata(createMetadata(1), common.ChainID("A"))
291+
defer p1.Stop()
292+
293+
p2 := newGossipInstance(portPrefix, 2, 100, 1)
294+
p2.JoinChan(&joinChanMsg{}, common.ChainID("A"))
295+
p2.UpdateChannelMetadata(createMetadata(1), common.ChainID("A"))
296+
defer p2.Stop()
297+
298+
countMembership := func(g Gossip, expected int) func() bool {
299+
return func() bool {
300+
peers := g.PeersOfChannel(common.ChainID("A"))
301+
return len(peers) == expected
302+
}
303+
}
304+
305+
// Wait until everyone sees each other in the channel
306+
waitUntilOrFail(t, countMembership(p0, 2))
307+
waitUntilOrFail(t, countMembership(p1, 2))
308+
waitUntilOrFail(t, countMembership(p2, 2))
309+
310+
// Now p2 leaves the channel
311+
p2.LeaveChan(common.ChainID("A"))
312+
313+
// Ensure channel membership is adjusted accordingly
314+
waitUntilOrFail(t, countMembership(p0, 1))
315+
waitUntilOrFail(t, countMembership(p1, 1))
316+
waitUntilOrFail(t, countMembership(p2, 0))
317+
318+
}
319+
275320
func TestPull(t *testing.T) {
276321
t.Parallel()
277322
defer testWG.Done()

gossip/service/join_test.go

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -80,6 +80,10 @@ func (g *gossipMock) JoinChan(joinMsg api.JoinChannelMessage, chainID common.Cha
8080
g.Called(joinMsg, chainID)
8181
}
8282

83+
func (g *gossipMock) LeaveChan(chainID common.ChainID) {
84+
panic("implement me")
85+
}
86+
8387
func (*gossipMock) Stop() {
8488
panic("implement me")
8589
}

gossip/state/mocks/gossip.go

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,10 @@ func (g *GossipMock) SuspectPeers(s api.PeerSuspector) {
3030

3131
}
3232

33+
func (g *GossipMock) LeaveChan(_ common.ChainID) {
34+
panic("implement me")
35+
}
36+
3337
func (g *GossipMock) Send(msg *proto.GossipMessage, peers ...*comm.RemotePeer) {
3438
g.Called(msg, peers)
3539
}

0 commit comments

Comments
 (0)