diff --git a/gossip/discovery/discovery.go b/gossip/discovery/discovery.go index b0adf9dbf9d..acd5b095f67 100644 --- a/gossip/discovery/discovery.go +++ b/gossip/discovery/discovery.go @@ -80,4 +80,7 @@ type Discovery interface { // InitiateSync makes the instance ask a given number of peers // for their membership information InitiateSync(peerNum int) + + // Connect makes this instance to connect to a remote instance + Connect(NetworkMember) } diff --git a/gossip/discovery/discovery_impl.go b/gossip/discovery/discovery_impl.go index 403f59d3533..335a794baa3 100644 --- a/gossip/discovery/discovery_impl.go +++ b/gossip/discovery/discovery_impl.go @@ -133,6 +133,31 @@ func NewDiscoveryService(bootstrapPeers []string, self NetworkMember, comm CommS return d } +func (d *gossipDiscoveryImpl) Connect(member NetworkMember) { + d.logger.Debug("Entering", member) + defer d.logger.Debug("Exiting") + + if member.PKIid == nil { + d.logger.Warning("Empty PkiID, aborting") + return + } + + d.lock.Lock() + defer d.lock.Unlock() + + if _, exists := d.id2Member[string(member.PKIid)]; exists { + d.logger.Info("Member", member, "already known") + return + } + + d.deadLastTS[string(member.PKIid)] = ×tamp{ + incTime: time.Unix(0, 0), + lastSeen: time.Now(), + seqNum: 0, + } + d.id2Member[string(member.PKIid)] = &member +} + func (d *gossipDiscoveryImpl) connect2BootstrapPeers(endpoints []string) { d.logger.Info("Entering:", endpoints) defer d.logger.Info("Exiting") diff --git a/gossip/discovery/discovery_test.go b/gossip/discovery/discovery_test.go index a3aee00e061..da2873fe3ea 100644 --- a/gossip/discovery/discovery_test.go +++ b/gossip/discovery/discovery_test.go @@ -258,6 +258,28 @@ func bootPeer(port int) string { return fmt.Sprintf("localhost:%d", port) } +func TestConnect(t *testing.T) { + t.Parallel() + nodeNum := 10 + instances := []*gossipInstance{} + for i := 0; i < nodeNum; i++ { + inst := createDiscoveryInstance(7611+i, fmt.Sprintf("d%d", i), []string{}) + instances = append(instances, inst) + j := (i + 1) % 10 + endpoint := fmt.Sprintf("localhost:%d", 7611+j) + netMember2Connect2 := NetworkMember{Endpoint: endpoint, PKIid: []byte(endpoint)} + inst.Connect(netMember2Connect2) + // Check passing nil PKI-ID doesn't crash peer + inst.Connect(NetworkMember{PKIid: nil, Endpoint: endpoint}) + } + + fullMembership := func() bool { + return nodeNum-1 == len(instances[nodeNum-1].GetMembership()) + } + waitUntilOrFail(t, fullMembership) + stopInstances(t, instances) +} + func TestUpdate(t *testing.T) { t.Parallel() nodeNum := 5 diff --git a/gossip/gossip/gossip_impl.go b/gossip/gossip/gossip_impl.go index e9575d21e9f..d392a2bac28 100644 --- a/gossip/gossip/gossip_impl.go +++ b/gossip/gossip/gossip_impl.go @@ -158,6 +158,24 @@ func (g *gossipServiceImpl) JoinChan(joinMsg api.JoinChannelMessage, chainID com return } g.chanState.joinChannel(joinMsg, chainID) + + selfPkiID := g.mcs.GetPKIidOfCert(g.selfIdentity) + for _, ap := range joinMsg.AnchorPeers() { + if ap.Host == "" { + g.logger.Warning("Got empty hostname, skipping connecting to anchor peer", ap) + } + if ap.Port == 0 { + g.logger.Warning("Got invalid port (0), skipping connecting to anchor peer", ap) + } + pkiID := g.mcs.GetPKIidOfCert(ap.Cert) + // Skip connecting to self + if bytes.Equal([]byte(pkiID), []byte(selfPkiID)) { + g.logger.Info("Anchor peer with same PKI-ID, skipping connecting to myself") + continue + } + endpoint := fmt.Sprintf("%s:%d", ap.Host, ap.Port) + g.disc.Connect(discovery.NetworkMember{Endpoint: endpoint, PKIid: pkiID}) + } } func (g *gossipServiceImpl) handlePresumedDead() { diff --git a/gossip/gossip/gossip_test.go b/gossip/gossip/gossip_test.go index 605def70dce..c9e4c36b962 100644 --- a/gossip/gossip/gossip_test.go +++ b/gossip/gossip/gossip_test.go @@ -47,7 +47,7 @@ func init() { discovery.SetExpirationTimeout(aliveTimeInterval * 10) discovery.SetReconnectInterval(aliveTimeInterval * 5) - testWG.Add(6) + testWG.Add(7) } @@ -63,6 +63,7 @@ func acceptData(m interface{}) bool { } type joinChanMsg struct { + anchorPeers []api.AnchorPeer } // SequenceNumber returns the sequence number of the block this joinChanMsg @@ -72,8 +73,11 @@ func (*joinChanMsg) SequenceNumber() uint64 { } // AnchorPeers returns all the anchor peers that are in the channel -func (*joinChanMsg) AnchorPeers() []api.AnchorPeer { - return []api.AnchorPeer{{Cert: anchorPeerIdentity}} +func (jcm *joinChanMsg) AnchorPeers() []api.AnchorPeer { + if len(jcm.anchorPeers) == 0 { + return []api.AnchorPeer{{Cert: anchorPeerIdentity}} + } + return jcm.anchorPeers } type naiveCryptoService struct { @@ -281,6 +285,60 @@ func TestPull(t *testing.T) { testWG.Done() } +func TestConnectToAnchorPeers(t *testing.T) { + t.Parallel() + portPrefix := 8610 + // Scenario: Spawn 5 peers, and make each of them connect to + // the other 2 using join channel. + stopped := int32(0) + go waitForTestCompletion(&stopped, t) + n := 5 + + jcm := &joinChanMsg{anchorPeers: []api.AnchorPeer{}} + for i := 0; i < n; i++ { + pkiID := fmt.Sprintf("localhost:%d", portPrefix+i) + ap := api.AnchorPeer{ + Port: portPrefix + i, + Host: "localhost", + Cert: []byte(pkiID), + } + jcm.anchorPeers = append(jcm.anchorPeers, ap) + } + + peers := make([]Gossip, n) + wg := sync.WaitGroup{} + for i := 0; i < n; i++ { + wg.Add(1) + go func(i int) { + peers[i] = newGossipInstance(portPrefix, i, 100) + peers[i].JoinChan(jcm, common.ChainID("A")) + peers[i].UpdateChannelMetadata([]byte("bla bla"), common.ChainID("A")) + wg.Done() + }(i) + } + waitUntilOrFailBlocking(t, wg.Wait) + waitUntilOrFail(t, checkPeersMembership(peers, n-1)) + + channelMembership := func() bool { + for _, peer := range peers { + if len(peer.PeersOfChannel(common.ChainID("A"))) != n-1 { + return false + } + } + return true + } + waitUntilOrFail(t, channelMembership) + + stop := func() { + stopPeers(peers) + } + waitUntilOrFailBlocking(t, stop) + + fmt.Println("<<>>") + atomic.StoreInt32(&stopped, int32(1)) + testWG.Done() +} + func TestMembership(t *testing.T) { t.Parallel() portPrefix := 4610