Skip to content

Commit

Permalink
Merge pull request ethereum#70 from ethersphere/network-testing-frame…
Browse files Browse the repository at this point in the history
…work-exec-adapter

p2p/adapters: Add "exec" node adapter
  • Loading branch information
zelig authored Apr 24, 2017
2 parents f591fa7 + f2c7ac2 commit 38e1630
Show file tree
Hide file tree
Showing 14 changed files with 768 additions and 60 deletions.
246 changes: 246 additions & 0 deletions p2p/adapters/exec.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,246 @@
package adapters

import (
"encoding/json"
"errors"
"fmt"
"os"
"os/exec"
"os/signal"
"path/filepath"
"syscall"
"time"

"github.com/docker/docker/pkg/reexec"
"github.com/ethereum/go-ethereum/log"
"github.com/ethereum/go-ethereum/node"
"github.com/ethereum/go-ethereum/p2p"
"github.com/ethereum/go-ethereum/rpc"
)

// serviceFunc returns a node.ServiceConstructor which can be used to boot
// devp2p nodes
type serviceFunc func(id *NodeId) node.ServiceConstructor

// serviceFuncs is a map of registered services which are used to boot devp2p
// nodes
var serviceFuncs = make(map[string]serviceFunc)

// RegisterService registers the given serviceFunc which can then be used to
// start a devp2p node with the given name
func RegisterService(name string, f serviceFunc) {
if _, exists := serviceFuncs[name]; exists {
panic(fmt.Sprintf("node service already exists: %q", name))
}
serviceFuncs[name] = f
}

// ExecNode is a NodeAdapter which starts the node by exec'ing the current
// binary and running a registered serviceFunc
type ExecNode struct {
ID *NodeId
Service string
Dir string
Config *node.Config
Cmd *exec.Cmd
Client *rpc.Client
Info *p2p.NodeInfo
}

// NewExecNode creates a new ExecNode which will run the given service using a
// sub-directory of the given baseDir
func NewExecNode(id *NodeId, service, baseDir string) (*ExecNode, error) {
if _, exists := serviceFuncs[service]; !exists {
return nil, fmt.Errorf("unknown node service %q", service)
}

// create the node directory using the first 12 characters of the ID
dir := filepath.Join(baseDir, id.String()[0:12])
if err := os.Mkdir(dir, 0755); err != nil {
return nil, fmt.Errorf("error creating node directory: %s", err)
}

// generate the config
conf := node.DefaultConfig
conf.DataDir = filepath.Join(dir, "data")
conf.IPCPath = filepath.Join(dir, "ipc.sock")
conf.P2P.ListenAddr = "127.0.0.1:0"
conf.P2P.NoDiscovery = true
conf.P2P.NAT = nil

return &ExecNode{
ID: id,
Service: service,
Dir: dir,
Config: &conf,
}, nil
}

// Addr returns the node's enode URL
func (n *ExecNode) Addr() []byte {
if n.Info == nil {
return nil
}
return []byte(n.Info.Enode)
}

// Start exec's the node passing the ID and service as command line arguments
// and the node config encoded as JSON in the _P2P_NODE_CONFIG environment
// variable
func (n *ExecNode) Start() (err error) {
if n.Cmd != nil {
return errors.New("already started")
}
defer func() {
if err != nil {
log.Error("node failed to start", "err", err)
n.Stop()
}
}()

// encode the config
conf, err := json.Marshal(n.Config)
if err != nil {
return fmt.Errorf("error generating node config: %s", err)
}

// start the node
cmd := &exec.Cmd{
Path: reexec.Self(),
Args: []string{"p2p-node", n.Service, n.ID.String()},
Stdout: os.Stdout,
Stderr: os.Stderr,
Env: append(os.Environ(), fmt.Sprintf("_P2P_NODE_CONFIG=%s", conf)),
}
if err := cmd.Start(); err != nil {
return fmt.Errorf("error starting node: %s", err)
}
n.Cmd = cmd

// create the RPC client
for start := time.Now(); time.Since(start) < 10*time.Second; time.Sleep(50 * time.Millisecond) {
n.Client, err = rpc.Dial(n.Config.IPCPath)
if err == nil {
break
}
}
if n.Client == nil {
return fmt.Errorf("error creating IPC client: %s", err)
}

// load info
var info p2p.NodeInfo
if err := n.Client.Call(&info, "admin_nodeInfo"); err != nil {
return fmt.Errorf("error getting node info: %s", err)
}
n.Info = &info

return nil
}

// Stop stops the node by first sending SIGTERM and then SIGKILL if the node
// doesn't stop within 5s
func (n *ExecNode) Stop() error {
if n.Cmd == nil {
return nil
}
defer func() {
n.Cmd = nil
}()

if n.Client != nil {
n.Client.Close()
n.Client = nil
n.Info = nil
}

if err := n.Cmd.Process.Signal(syscall.SIGTERM); err != nil {
return n.Cmd.Process.Kill()
}
waitErr := make(chan error)
go func() {
waitErr <- n.Cmd.Wait()
}()
select {
case err := <-waitErr:
return err
case <-time.After(5 * time.Second):
return n.Cmd.Process.Kill()
}
}

// Connect connects the node to the given addr by calling the Admin.AddPeer
// IPC method
func (n *ExecNode) Connect(addr []byte) error {
if n.Client == nil {
return errors.New("node not started")
}
return n.Client.Call(nil, "admin_addPeer", string(addr))
}

// Disconnect disconnects the node from the given addr by calling the
// Admin.RemovePeer IPC method
func (n *ExecNode) Disconnect(addr []byte) error {
if n.Client == nil {
return errors.New("node not started")
}
return n.Client.Call(nil, "admin_removePeer", string(addr))
}

func init() {
// register a reexec function to start a devp2p node when the current
// binary is executed as "p2p-node"
reexec.Register("p2p-node", execP2PNode)
}

// execP2PNode starts a devp2p node when the current binary is executed with
// argv[0] being "p2p-node", reading the service / ID from argv[1] / argv[2]
// and the node config from the _P2P_NODE_CONFIG environment variable
func execP2PNode() {
glogger := log.NewGlogHandler(log.StreamHandler(os.Stderr, log.LogfmtFormat()))
glogger.Verbosity(log.LvlInfo)
log.Root().SetHandler(glogger)

// read the service and ID from argv
serviceName := os.Args[1]
id := NewNodeIdFromHex(os.Args[2])

// decode the config
confEnv := os.Getenv("_P2P_NODE_CONFIG")
if confEnv == "" {
log.Crit("missing _P2P_NODE_CONFIG")
}
var conf node.Config
if err := json.Unmarshal([]byte(confEnv), &conf); err != nil {
log.Crit("error decoding _P2P_NODE_CONFIG", "err", err)
}

// lookup the service constructor
service, exists := serviceFuncs[serviceName]
if !exists {
log.Crit(fmt.Sprintf("unknown node service %q", serviceName))
}

// start the devp2p stack
stack, err := node.New(&conf)
if err != nil {
log.Crit("error creating node", "err", err)
}
if err := stack.Register(service(id)); err != nil {
log.Crit("error registering service", "err", err)
}
if err := stack.Start(); err != nil {
log.Crit("error starting node", "err", err)
}

go func() {
sigc := make(chan os.Signal, 1)
signal.Notify(sigc, syscall.SIGTERM)
defer signal.Stop(sigc)
<-sigc
log.Info("Received SIGTERM, shutting down...")
stack.Stop()
}()

stack.Wait()
}
10 changes: 7 additions & 3 deletions p2p/adapters/inproc.go
Original file line number Diff line number Diff line change
Expand Up @@ -66,12 +66,16 @@ func NewSimNode(id *NodeId, n Network) *SimNode {
}
}

func (self *SimNode) LocalAddr() []byte {
func (self *SimNode) Addr() []byte {
return self.Id.Bytes()
}

func (self *SimNode) ParseAddr(p []byte, s string) ([]byte, error) {
return p, nil
func (self *SimNode) Start() error {
return nil
}

func (self *SimNode) Stop() error {
return nil
}

func (self *SimNode) GetPeer(id *NodeId) *Peer {
Expand Down
15 changes: 6 additions & 9 deletions p2p/adapters/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ func (self *NodeId) MarshalJSON() (out []byte, err error) {

func (self *NodeId) UnmarshalJSON(value []byte) error {
s := string(value)
h, err := discover.HexID(s)
h, err := discover.HexID(s[1 : len(s)-1])
if err != nil {
return err
}
Expand All @@ -64,20 +64,17 @@ func (self *NodeId) Label() string {
}

type NodeAdapter interface {
Connect([]byte) error
Disconnect([]byte) error
// Disconnect(*p2p.Peer, p2p.MsgReadWriter)
Addr() []byte
Start() error
Stop() error
Connect(addr []byte) error
Disconnect(addr []byte) error
}

type ProtocolRunner interface {
RunProtocol(id *NodeId, rw, rrw p2p.MsgReadWriter, p *Peer) error
}

type StartAdapter interface {
Start() error
Stop() error
}

type Reporter interface {
DidConnect(*NodeId, *NodeId) error
DidDisconnect(*NodeId, *NodeId) error
Expand Down
41 changes: 22 additions & 19 deletions p2p/simulations/network.go
Original file line number Diff line number Diff line change
Expand Up @@ -424,11 +424,6 @@ func (self *Network) NewNode(conf *NodeConfig) error {
func (self *Network) Config() *NetworkConfig {
return self.conf
}
func (self *Network) NewSimNode(conf *NodeConfig) adapters.NodeAdapter {
id := conf.Id
na := adapters.NewSimNode(id, self)
return na
}

// newConn adds a new connection to the network
// it errors if the respective nodes do not exist
Expand Down Expand Up @@ -469,12 +464,8 @@ func (self *Network) Start(id *adapters.NodeId) error {
return fmt.Errorf("node %v already up", id)
}
log.Trace(fmt.Sprintf("starting node %v: %v adapter %v", id, node.Up, node.Adapter()))
sa, ok := node.Adapter().(adapters.StartAdapter)
if ok {
err := sa.Start()
if err != nil {
return err
}
if err := node.Adapter().Start(); err != nil {
return err
}
node.Up = true
log.Info(fmt.Sprintf("started node %v: %v", id, node.Up))
Expand All @@ -496,12 +487,8 @@ func (self *Network) Stop(id *adapters.NodeId) error {
if !node.Up {
return fmt.Errorf("node %v already down", id)
}
sa, ok := node.Adapter().(adapters.StartAdapter)
if ok {
err := sa.Stop()
if err != nil {
return err
}
if err := node.Adapter().Stop(); err != nil {
return err
}
node.Up = false
log.Info(fmt.Sprintf("stop node %v: %v", id, node.Up))
Expand Down Expand Up @@ -539,9 +526,9 @@ func (self *Network) Connect(oneId, otherId *adapters.NodeId) error {
// to this method with connect = false to avoid infinite recursion
// this is not relevant for nodes starting up (which can only be externally triggered)
if rev {
err = conn.other.na.Connect(oneId.Bytes())
err = conn.other.na.Connect(conn.one.na.Addr())
} else {
err = conn.one.na.Connect(otherId.Bytes())
err = conn.one.na.Connect(conn.other.na.Addr())
}
if err != nil {
return err
Expand Down Expand Up @@ -679,3 +666,19 @@ func (self *Network) getConn(oneId, otherId *adapters.NodeId) *Conn {
}
return self.Conns[i]
}

func (self *Network) Shutdown() {
// disconnect all nodes
for _, conn := range self.Conns {
if err := self.Disconnect(conn.One, conn.Other); err != nil {
log.Warn(fmt.Sprintf("error disconnecting %s from %s", conn.One.Label(), conn.Other.Label()), "err", err)
}
}

// stop all nodes
for _, node := range self.Nodes {
if err := node.na.Stop(); err != nil {
log.Warn(fmt.Sprintf("error stopping node %s", node.Id.Label()), "err", err)
}
}
}
Loading

0 comments on commit 38e1630

Please sign in to comment.