diff --git a/swarm/network/hive.go b/swarm/network/hive.go index 2e8a758ce70b..a4f40b44f37b 100644 --- a/swarm/network/hive.go +++ b/swarm/network/hive.go @@ -213,6 +213,12 @@ func (self *Hive) Stop() { close(self.quit) } +func (self *Hive) Healthy() bool { + // TODO: determine if we have enough peers to consider the network + // to be healthy + return true +} + func (self *Hive) wake() { select { case self.more <- true: diff --git a/swarm/network/simulations/discovery/discovery_test.go b/swarm/network/simulations/discovery/discovery_test.go index 2aa58e4a6c02..5fb4ec975054 100644 --- a/swarm/network/simulations/discovery/discovery_test.go +++ b/swarm/network/simulations/discovery/discovery_test.go @@ -43,22 +43,12 @@ func TestMain(m *testing.M) { } func TestDiscoverySimulationDockerAdapter(t *testing.T) { - setup := func(net *simulations.Network, trigger chan *adapters.NodeId) { - var ids []*adapters.NodeId - - // TODO: get events from the devp2p node - time.AfterFunc(10*time.Second, func() { - for _, id := range ids { - trigger <- id - } - }) - + setup := func(net *simulations.Network) { net.SetNaf(func(conf *simulations.NodeConfig) adapters.NodeAdapter { node, err := adapters.NewDockerNode(conf.Id, conf.PrivateKey, serviceName) if err != nil { panic(err) } - ids = append(ids, conf.Id) return node }) } @@ -73,22 +63,12 @@ func TestDiscoverySimulationExecAdapter(t *testing.T) { } defer os.RemoveAll(baseDir) - setup := func(net *simulations.Network, trigger chan *adapters.NodeId) { - var ids []*adapters.NodeId - - // TODO: get events from the devp2p node - time.AfterFunc(10*time.Second, func() { - for _, id := range ids { - trigger <- id - } - }) - + setup := func(net *simulations.Network) { net.SetNaf(func(conf *simulations.NodeConfig) adapters.NodeAdapter { node, err := adapters.NewExecNode(conf.Id, conf.PrivateKey, serviceName, baseDir) if err != nil { panic(err) } - ids = append(ids, conf.Id) return node }) } @@ -97,27 +77,25 @@ func TestDiscoverySimulationExecAdapter(t *testing.T) { } func TestDiscoverySimulationSimAdapter(t *testing.T) { - setup := func(net *simulations.Network, trigger chan *adapters.NodeId) { + setup := func(net *simulations.Network) { net.SetNaf(func(conf *simulations.NodeConfig) adapters.NodeAdapter { - node := newNode(conf.Id) - node.trigger = trigger - return adapters.NewSimNode(conf.Id, node, net) + return adapters.NewSimNode(conf.Id, newNode(conf.Id), net) }) } testDiscoverySimulation(t, setup) } -func testDiscoverySimulation(t *testing.T, setup func(net *simulations.Network, trigger chan *adapters.NodeId)) { +func testDiscoverySimulation(t *testing.T, setup func(net *simulations.Network)) { // create 10 node network nodeCount := 10 - trigger := make(chan *adapters.NodeId) net := simulations.NewNetwork(&simulations.NetworkConfig{ Id: "0", Backend: true, }) defer net.Shutdown() - setup(net, trigger) + setup(net) + trigger := make(chan *adapters.NodeId) ids := make([]*adapters.NodeId, nodeCount) for i := 0; i < nodeCount; i++ { conf, err := net.NewNode() @@ -127,6 +105,9 @@ func testDiscoverySimulation(t *testing.T, setup func(net *simulations.Network, if err := net.Start(conf.Id); err != nil { t.Fatalf("error starting node %s: %s", conf.Id.Label(), err) } + if err := triggerChecks(trigger, net, conf.Id); err != nil { + t.Fatal("error triggering checks for node %s: %s", conf.Id.Label(), err) + } ids[i] = conf.Id } @@ -153,15 +134,19 @@ func testDiscoverySimulation(t *testing.T, setup func(net *simulations.Network, default: } - node := net.GetNode(id) + node := net.GetNodeAdapter(id) if node == nil { return false, fmt.Errorf("unknown node: %s", id) } - - // TODO: check list of peers - _ = node - - return true, nil + client, err := node.Client() + if err != nil { + return false, fmt.Errorf("error getting node client: %s", err) + } + var healthy bool + if err := client.Call(&healthy, "hive_healthy", nil); err != nil { + return false, fmt.Errorf("error getting node health: %s", err) + } + return healthy, nil } timeout := 30 * time.Second @@ -191,12 +176,42 @@ func testDiscoverySimulation(t *testing.T, setup func(net *simulations.Network, } } +// triggerChecks triggers a simulation step check whenever a peer is added or +// removed from the given node +func triggerChecks(trigger chan *adapters.NodeId, net *simulations.Network, id *adapters.NodeId) error { + node := net.GetNodeAdapter(id) + if node == nil { + return fmt.Errorf("unknown node: %s", id) + } + client, err := node.Client() + if err != nil { + return err + } + events := make(chan *p2p.PeerEvent) + sub, err := client.EthSubscribe(context.Background(), events, "peerEvents") + if err != nil { + return fmt.Errorf("error getting peer events for node %v: %s", id, err) + } + go func() { + defer sub.Unsubscribe() + for { + select { + case <-events: + trigger <- id + case err := <-sub.Err(): + if err != nil { + log.Error(fmt.Sprintf("error getting peer events for node %v", id), "err", err) + } + return + } + } + }() + return nil +} + type node struct { *network.Hive - *adapters.SimNode - id *adapters.NodeId - trigger chan *adapters.NodeId protocol *p2p.Protocol } @@ -205,10 +220,7 @@ func newNode(id *adapters.NodeId) *node { kademlia := newKademlia(addr.OverlayAddr()) hive := newHive(kademlia) codeMap := network.BzzCodeMap(network.DiscoveryMsgs...) - node := &node{ - Hive: hive, - id: id, - } + node := &node{Hive: hive} services := func(peer network.Peer) error { discoveryPeer := network.NewDiscovery(peer, kademlia) node.Add(discoveryPeer) @@ -245,7 +257,11 @@ func (n *node) Protocols() []p2p.Protocol { } func (n *node) APIs() []rpc.API { - return nil + return []rpc.API{{ + Namespace: "hive", + Version: "1.0", + Service: n.Hive, + }} } func (n *node) Start(server p2p.Server) error { @@ -257,22 +273,6 @@ func (n *node) Stop() error { return nil } -func (n *node) Add(peer network.Peer) error { - err := n.Hive.Add(peer) - n.triggerCheck() - return err -} - -func (n *node) Remove(peer network.Peer) { - n.Hive.Remove(peer) - n.triggerCheck() -} - func (n *node) hiveKeepAlive() <-chan time.Time { return time.Tick(time.Second) } - -func (n *node) triggerCheck() { - // TODO: rate limit the trigger? - go func() { n.trigger <- n.id }() -}