Skip to content

Commit

Permalink
Merge pull request ethereum#76 from ethersphere/network-testing-frame…
Browse files Browse the repository at this point in the history
…work-discovery-trigger

swarm/network: Trigger simulations checks on peer events
  • Loading branch information
zelig authored Apr 30, 2017
2 parents 5bc8776 + 323ef86 commit f9dd317
Show file tree
Hide file tree
Showing 2 changed files with 65 additions and 59 deletions.
6 changes: 6 additions & 0 deletions swarm/network/hive.go
Original file line number Diff line number Diff line change
Expand Up @@ -213,6 +213,12 @@ func (self *Hive) Stop() {
close(self.quit)
}

func (self *Hive) Healthy() bool {
// TODO: determine if we have enough peers to consider the network
// to be healthy
return true
}

func (self *Hive) wake() {
select {
case self.more <- true:
Expand Down
118 changes: 59 additions & 59 deletions swarm/network/simulations/discovery/discovery_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,22 +43,12 @@ func TestMain(m *testing.M) {
}

func TestDiscoverySimulationDockerAdapter(t *testing.T) {
setup := func(net *simulations.Network, trigger chan *adapters.NodeId) {
var ids []*adapters.NodeId

// TODO: get events from the devp2p node
time.AfterFunc(10*time.Second, func() {
for _, id := range ids {
trigger <- id
}
})

setup := func(net *simulations.Network) {
net.SetNaf(func(conf *simulations.NodeConfig) adapters.NodeAdapter {
node, err := adapters.NewDockerNode(conf.Id, conf.PrivateKey, serviceName)
if err != nil {
panic(err)
}
ids = append(ids, conf.Id)
return node
})
}
Expand All @@ -73,22 +63,12 @@ func TestDiscoverySimulationExecAdapter(t *testing.T) {
}
defer os.RemoveAll(baseDir)

setup := func(net *simulations.Network, trigger chan *adapters.NodeId) {
var ids []*adapters.NodeId

// TODO: get events from the devp2p node
time.AfterFunc(10*time.Second, func() {
for _, id := range ids {
trigger <- id
}
})

setup := func(net *simulations.Network) {
net.SetNaf(func(conf *simulations.NodeConfig) adapters.NodeAdapter {
node, err := adapters.NewExecNode(conf.Id, conf.PrivateKey, serviceName, baseDir)
if err != nil {
panic(err)
}
ids = append(ids, conf.Id)
return node
})
}
Expand All @@ -97,27 +77,25 @@ func TestDiscoverySimulationExecAdapter(t *testing.T) {
}

func TestDiscoverySimulationSimAdapter(t *testing.T) {
setup := func(net *simulations.Network, trigger chan *adapters.NodeId) {
setup := func(net *simulations.Network) {
net.SetNaf(func(conf *simulations.NodeConfig) adapters.NodeAdapter {
node := newNode(conf.Id)
node.trigger = trigger
return adapters.NewSimNode(conf.Id, node, net)
return adapters.NewSimNode(conf.Id, newNode(conf.Id), net)
})
}

testDiscoverySimulation(t, setup)
}

func testDiscoverySimulation(t *testing.T, setup func(net *simulations.Network, trigger chan *adapters.NodeId)) {
func testDiscoverySimulation(t *testing.T, setup func(net *simulations.Network)) {
// create 10 node network
nodeCount := 10
trigger := make(chan *adapters.NodeId)
net := simulations.NewNetwork(&simulations.NetworkConfig{
Id: "0",
Backend: true,
})
defer net.Shutdown()
setup(net, trigger)
setup(net)
trigger := make(chan *adapters.NodeId)
ids := make([]*adapters.NodeId, nodeCount)
for i := 0; i < nodeCount; i++ {
conf, err := net.NewNode()
Expand All @@ -127,6 +105,9 @@ func testDiscoverySimulation(t *testing.T, setup func(net *simulations.Network,
if err := net.Start(conf.Id); err != nil {
t.Fatalf("error starting node %s: %s", conf.Id.Label(), err)
}
if err := triggerChecks(trigger, net, conf.Id); err != nil {
t.Fatal("error triggering checks for node %s: %s", conf.Id.Label(), err)
}
ids[i] = conf.Id
}

Expand All @@ -153,15 +134,19 @@ func testDiscoverySimulation(t *testing.T, setup func(net *simulations.Network,
default:
}

node := net.GetNode(id)
node := net.GetNodeAdapter(id)
if node == nil {
return false, fmt.Errorf("unknown node: %s", id)
}

// TODO: check list of peers
_ = node

return true, nil
client, err := node.Client()
if err != nil {
return false, fmt.Errorf("error getting node client: %s", err)
}
var healthy bool
if err := client.Call(&healthy, "hive_healthy", nil); err != nil {
return false, fmt.Errorf("error getting node health: %s", err)
}
return healthy, nil
}

timeout := 30 * time.Second
Expand Down Expand Up @@ -191,12 +176,42 @@ func testDiscoverySimulation(t *testing.T, setup func(net *simulations.Network,
}
}

// triggerChecks triggers a simulation step check whenever a peer is added or
// removed from the given node
func triggerChecks(trigger chan *adapters.NodeId, net *simulations.Network, id *adapters.NodeId) error {
node := net.GetNodeAdapter(id)
if node == nil {
return fmt.Errorf("unknown node: %s", id)
}
client, err := node.Client()
if err != nil {
return err
}
events := make(chan *p2p.PeerEvent)
sub, err := client.EthSubscribe(context.Background(), events, "peerEvents")
if err != nil {
return fmt.Errorf("error getting peer events for node %v: %s", id, err)
}
go func() {
defer sub.Unsubscribe()
for {
select {
case <-events:
trigger <- id
case err := <-sub.Err():
if err != nil {
log.Error(fmt.Sprintf("error getting peer events for node %v", id), "err", err)
}
return
}
}
}()
return nil
}

type node struct {
*network.Hive
*adapters.SimNode

id *adapters.NodeId
trigger chan *adapters.NodeId
protocol *p2p.Protocol
}

Expand All @@ -205,10 +220,7 @@ func newNode(id *adapters.NodeId) *node {
kademlia := newKademlia(addr.OverlayAddr())
hive := newHive(kademlia)
codeMap := network.BzzCodeMap(network.DiscoveryMsgs...)
node := &node{
Hive: hive,
id: id,
}
node := &node{Hive: hive}
services := func(peer network.Peer) error {
discoveryPeer := network.NewDiscovery(peer, kademlia)
node.Add(discoveryPeer)
Expand Down Expand Up @@ -245,7 +257,11 @@ func (n *node) Protocols() []p2p.Protocol {
}

func (n *node) APIs() []rpc.API {
return nil
return []rpc.API{{
Namespace: "hive",
Version: "1.0",
Service: n.Hive,
}}
}

func (n *node) Start(server p2p.Server) error {
Expand All @@ -257,22 +273,6 @@ func (n *node) Stop() error {
return nil
}

func (n *node) Add(peer network.Peer) error {
err := n.Hive.Add(peer)
n.triggerCheck()
return err
}

func (n *node) Remove(peer network.Peer) {
n.Hive.Remove(peer)
n.triggerCheck()
}

func (n *node) hiveKeepAlive() <-chan time.Time {
return time.Tick(time.Second)
}

func (n *node) triggerCheck() {
// TODO: rate limit the trigger?
go func() { n.trigger <- n.id }()
}

0 comments on commit f9dd317

Please sign in to comment.