From d2c8fed88257e35926bf175d351b8d4c72f811c3 Mon Sep 17 00:00:00 2001 From: YACOVM Date: Fri, 27 Jan 2017 22:02:04 +0200 Subject: [PATCH] [FAB-1913] Connect to anchor peers at join Channel When the peer joins a channel it gets a list of anchor peers. We need the gossip layer to reach out to these anchor peers and connect to them. - Added 2 tests (one in the discovery module and one in the gossip module) I tested this using the following setup: used docs/docker-compose-channel.yml And added another peer, and didn't give that peer a bootstrap peer, so both peers don't know of each other at startup. Then I created an anchorPeer file with the ip address (172.21.0.4) and port of one of the peers. After that, I created a channel with that anchor peer file, and made the other peer to join the channel. The peer tried contacting the anchor peer (looked at the gossip communication logs) and tried authentication with it: Sending tag:EMPTY signature:"[xxx]" conn: to 172.21.0.4:53637 WARN 213 Remote endpoint claims to be a different peer, expected [PEM FILE] but got [fake PKI-ID I use until MSP is integrated with gossip] This proves that when joinChannel is called upon a peer with suitable anchor peer configuration, the peers try to connect to one another. Signed-off-by: Yacov Manevich Change-Id: Ia033f81eeaf38cb53cb65dc06a01dca07342386b --- gossip/discovery/discovery.go | 3 ++ gossip/discovery/discovery_impl.go | 25 ++++++++++++ gossip/discovery/discovery_test.go | 22 ++++++++++ gossip/gossip/gossip_impl.go | 18 +++++++++ gossip/gossip/gossip_test.go | 64 ++++++++++++++++++++++++++++-- 5 files changed, 129 insertions(+), 3 deletions(-) diff --git a/gossip/discovery/discovery.go b/gossip/discovery/discovery.go index b0adf9dbf9d..acd5b095f67 100644 --- a/gossip/discovery/discovery.go +++ b/gossip/discovery/discovery.go @@ -80,4 +80,7 @@ type Discovery interface { // InitiateSync makes the instance ask a given number of peers // for their membership information InitiateSync(peerNum int) + + // Connect makes this instance to connect to a remote instance + Connect(NetworkMember) } diff --git a/gossip/discovery/discovery_impl.go b/gossip/discovery/discovery_impl.go index 403f59d3533..335a794baa3 100644 --- a/gossip/discovery/discovery_impl.go +++ b/gossip/discovery/discovery_impl.go @@ -133,6 +133,31 @@ func NewDiscoveryService(bootstrapPeers []string, self NetworkMember, comm CommS return d } +func (d *gossipDiscoveryImpl) Connect(member NetworkMember) { + d.logger.Debug("Entering", member) + defer d.logger.Debug("Exiting") + + if member.PKIid == nil { + d.logger.Warning("Empty PkiID, aborting") + return + } + + d.lock.Lock() + defer d.lock.Unlock() + + if _, exists := d.id2Member[string(member.PKIid)]; exists { + d.logger.Info("Member", member, "already known") + return + } + + d.deadLastTS[string(member.PKIid)] = ×tamp{ + incTime: time.Unix(0, 0), + lastSeen: time.Now(), + seqNum: 0, + } + d.id2Member[string(member.PKIid)] = &member +} + func (d *gossipDiscoveryImpl) connect2BootstrapPeers(endpoints []string) { d.logger.Info("Entering:", endpoints) defer d.logger.Info("Exiting") diff --git a/gossip/discovery/discovery_test.go b/gossip/discovery/discovery_test.go index a3aee00e061..da2873fe3ea 100644 --- a/gossip/discovery/discovery_test.go +++ b/gossip/discovery/discovery_test.go @@ -258,6 +258,28 @@ func bootPeer(port int) string { return fmt.Sprintf("localhost:%d", port) } +func TestConnect(t *testing.T) { + t.Parallel() + nodeNum := 10 + instances := []*gossipInstance{} + for i := 0; i < nodeNum; i++ { + inst := createDiscoveryInstance(7611+i, fmt.Sprintf("d%d", i), []string{}) + instances = append(instances, inst) + j := (i + 1) % 10 + endpoint := fmt.Sprintf("localhost:%d", 7611+j) + netMember2Connect2 := NetworkMember{Endpoint: endpoint, PKIid: []byte(endpoint)} + inst.Connect(netMember2Connect2) + // Check passing nil PKI-ID doesn't crash peer + inst.Connect(NetworkMember{PKIid: nil, Endpoint: endpoint}) + } + + fullMembership := func() bool { + return nodeNum-1 == len(instances[nodeNum-1].GetMembership()) + } + waitUntilOrFail(t, fullMembership) + stopInstances(t, instances) +} + func TestUpdate(t *testing.T) { t.Parallel() nodeNum := 5 diff --git a/gossip/gossip/gossip_impl.go b/gossip/gossip/gossip_impl.go index e9575d21e9f..d392a2bac28 100644 --- a/gossip/gossip/gossip_impl.go +++ b/gossip/gossip/gossip_impl.go @@ -158,6 +158,24 @@ func (g *gossipServiceImpl) JoinChan(joinMsg api.JoinChannelMessage, chainID com return } g.chanState.joinChannel(joinMsg, chainID) + + selfPkiID := g.mcs.GetPKIidOfCert(g.selfIdentity) + for _, ap := range joinMsg.AnchorPeers() { + if ap.Host == "" { + g.logger.Warning("Got empty hostname, skipping connecting to anchor peer", ap) + } + if ap.Port == 0 { + g.logger.Warning("Got invalid port (0), skipping connecting to anchor peer", ap) + } + pkiID := g.mcs.GetPKIidOfCert(ap.Cert) + // Skip connecting to self + if bytes.Equal([]byte(pkiID), []byte(selfPkiID)) { + g.logger.Info("Anchor peer with same PKI-ID, skipping connecting to myself") + continue + } + endpoint := fmt.Sprintf("%s:%d", ap.Host, ap.Port) + g.disc.Connect(discovery.NetworkMember{Endpoint: endpoint, PKIid: pkiID}) + } } func (g *gossipServiceImpl) handlePresumedDead() { diff --git a/gossip/gossip/gossip_test.go b/gossip/gossip/gossip_test.go index 605def70dce..c9e4c36b962 100644 --- a/gossip/gossip/gossip_test.go +++ b/gossip/gossip/gossip_test.go @@ -47,7 +47,7 @@ func init() { discovery.SetExpirationTimeout(aliveTimeInterval * 10) discovery.SetReconnectInterval(aliveTimeInterval * 5) - testWG.Add(6) + testWG.Add(7) } @@ -63,6 +63,7 @@ func acceptData(m interface{}) bool { } type joinChanMsg struct { + anchorPeers []api.AnchorPeer } // SequenceNumber returns the sequence number of the block this joinChanMsg @@ -72,8 +73,11 @@ func (*joinChanMsg) SequenceNumber() uint64 { } // AnchorPeers returns all the anchor peers that are in the channel -func (*joinChanMsg) AnchorPeers() []api.AnchorPeer { - return []api.AnchorPeer{{Cert: anchorPeerIdentity}} +func (jcm *joinChanMsg) AnchorPeers() []api.AnchorPeer { + if len(jcm.anchorPeers) == 0 { + return []api.AnchorPeer{{Cert: anchorPeerIdentity}} + } + return jcm.anchorPeers } type naiveCryptoService struct { @@ -281,6 +285,60 @@ func TestPull(t *testing.T) { testWG.Done() } +func TestConnectToAnchorPeers(t *testing.T) { + t.Parallel() + portPrefix := 8610 + // Scenario: Spawn 5 peers, and make each of them connect to + // the other 2 using join channel. + stopped := int32(0) + go waitForTestCompletion(&stopped, t) + n := 5 + + jcm := &joinChanMsg{anchorPeers: []api.AnchorPeer{}} + for i := 0; i < n; i++ { + pkiID := fmt.Sprintf("localhost:%d", portPrefix+i) + ap := api.AnchorPeer{ + Port: portPrefix + i, + Host: "localhost", + Cert: []byte(pkiID), + } + jcm.anchorPeers = append(jcm.anchorPeers, ap) + } + + peers := make([]Gossip, n) + wg := sync.WaitGroup{} + for i := 0; i < n; i++ { + wg.Add(1) + go func(i int) { + peers[i] = newGossipInstance(portPrefix, i, 100) + peers[i].JoinChan(jcm, common.ChainID("A")) + peers[i].UpdateChannelMetadata([]byte("bla bla"), common.ChainID("A")) + wg.Done() + }(i) + } + waitUntilOrFailBlocking(t, wg.Wait) + waitUntilOrFail(t, checkPeersMembership(peers, n-1)) + + channelMembership := func() bool { + for _, peer := range peers { + if len(peer.PeersOfChannel(common.ChainID("A"))) != n-1 { + return false + } + } + return true + } + waitUntilOrFail(t, channelMembership) + + stop := func() { + stopPeers(peers) + } + waitUntilOrFailBlocking(t, stop) + + fmt.Println("<<>>") + atomic.StoreInt32(&stopped, int32(1)) + testWG.Done() +} + func TestMembership(t *testing.T) { t.Parallel() portPrefix := 4610