Skip to content

Commit

Permalink
p2p/simulations: Snapshot support
Browse files Browse the repository at this point in the history
Signed-off-by: Lewis Marshall <lewis@lmars.net>
  • Loading branch information
lmars committed May 12, 2017
1 parent e39a33e commit d7eb6b5
Show file tree
Hide file tree
Showing 14 changed files with 592 additions and 554 deletions.
23 changes: 12 additions & 11 deletions p2p/simulations/adapters/docker.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,18 +47,19 @@ func (d *DockerAdapter) NewNode(config *NodeConfig) (Node, error) {
}

// generate the config
conf := node.DefaultConfig
conf.DataDir = "/data"
conf.P2P.EnableMsgEvents = true
conf.P2P.NoDiscovery = true
conf.P2P.NAT = nil
conf := &execNodeConfig{
Stack: node.DefaultConfig,
Node: config,
}
conf.Stack.DataDir = "/data"
conf.Stack.P2P.EnableMsgEvents = true
conf.Stack.P2P.NoDiscovery = true
conf.Stack.P2P.NAT = nil

node := &DockerNode{
ExecNode: ExecNode{
ID: config.Id,
Service: config.Service,
Config: &conf,
key: config.PrivateKey,
ID: config.Id,
Config: conf,
},
}
node.newCmd = node.dockerCommand
Expand All @@ -80,8 +81,8 @@ func (n *DockerNode) dockerCommand() *exec.Cmd {
return exec.Command(
"sh", "-c",
fmt.Sprintf(
`exec docker run --interactive --env _P2P_NODE_CONFIG="${_P2P_NODE_CONFIG}" --env _P2P_NODE_KEY="${_P2P_NODE_KEY}" %s p2p-node %s %s`,
dockerImage, n.Service, n.ID.String(),
`exec docker run --interactive --env _P2P_NODE_CONFIG="${_P2P_NODE_CONFIG}" %s p2p-node %s %s`,
dockerImage, n.Config.Node.Service, n.ID.String(),
),
)
}
Expand Down
145 changes: 86 additions & 59 deletions p2p/simulations/adapters/exec.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@ package adapters
import (
"context"
"crypto/ecdsa"
"encoding/hex"
"encoding/json"
"errors"
"fmt"
Expand All @@ -18,7 +17,6 @@ import (
"time"

"github.com/docker/docker/pkg/reexec"
"github.com/ethereum/go-ethereum/crypto"
"github.com/ethereum/go-ethereum/log"
"github.com/ethereum/go-ethereum/node"
"github.com/ethereum/go-ethereum/p2p"
Expand Down Expand Up @@ -60,22 +58,23 @@ func (e *ExecAdapter) NewNode(config *NodeConfig) (Node, error) {
}

// generate the config
conf := node.DefaultConfig
conf.DataDir = filepath.Join(dir, "data")
conf.P2P.EnableMsgEvents = true
conf.P2P.NoDiscovery = true
conf.P2P.NAT = nil
conf := &execNodeConfig{
Stack: node.DefaultConfig,
Node: config,
}
conf.Stack.DataDir = filepath.Join(dir, "data")
conf.Stack.P2P.EnableMsgEvents = true
conf.Stack.P2P.NoDiscovery = true
conf.Stack.P2P.NAT = nil

// listen on a random localhost port (we'll get the actual port after
// starting the node through the RPC admin.nodeInfo method)
conf.P2P.ListenAddr = "127.0.0.1:0"
conf.Stack.P2P.ListenAddr = "127.0.0.1:0"

node := &ExecNode{
ID: config.Id,
Service: config.Service,
Dir: dir,
Config: &conf,
key: config.PrivateKey,
ID: config.Id,
Dir: dir,
Config: conf,
}
node.newCmd = node.execCommand
return node, nil
Expand All @@ -89,12 +88,11 @@ func (e *ExecAdapter) NewNode(config *NodeConfig) (Node, error) {
// (so for example we can run the node in a remote Docker container and
// still communicate with it).
type ExecNode struct {
ID *NodeId
Service string
Dir string
Config *node.Config
Cmd *exec.Cmd
Info *p2p.NodeInfo
ID *NodeId
Dir string
Config *execNodeConfig
Cmd *exec.Cmd
Info *p2p.NodeInfo

client *rpc.Client
rpcMux *rpcMux
Expand All @@ -116,11 +114,10 @@ func (n *ExecNode) Client() (*rpc.Client, error) {
return n.client, nil
}

// Start exec's the node passing the ID and service as command line arguments,
// the node config encoded as JSON in the _P2P_NODE_CONFIG environment
// variable and the node's private key hex-endoded in the _P2P_NODE_KEY
// environment variable
func (n *ExecNode) Start() (err error) {
// Start exec's the node passing the ID and service as command line arguments
// and the node config encoded as JSON in the _P2P_NODE_CONFIG environment
// variable
func (n *ExecNode) Start(snapshot []byte) (err error) {
if n.Cmd != nil {
return errors.New("already started")
}
Expand All @@ -131,15 +128,14 @@ func (n *ExecNode) Start() (err error) {
}
}()

// encode the config
conf, err := json.Marshal(n.Config)
// encode a copy of the config containing the snapshot
confCopy := *n.Config
confCopy.Snapshot = snapshot
confData, err := json.Marshal(confCopy)
if err != nil {
return fmt.Errorf("error generating node config: %s", err)
}

// encode the private key
key := hex.EncodeToString(crypto.FromECDSA(n.key))

// create a net.Pipe for RPC communication over stdin / stdout
pipe1, pipe2 := net.Pipe()

Expand All @@ -148,10 +144,7 @@ func (n *ExecNode) Start() (err error) {
cmd.Stdin = pipe1
cmd.Stdout = pipe1
cmd.Stderr = os.Stderr
cmd.Env = append(os.Environ(),
fmt.Sprintf("_P2P_NODE_CONFIG=%s", conf),
fmt.Sprintf("_P2P_NODE_KEY=%s", key),
)
cmd.Env = append(os.Environ(), fmt.Sprintf("_P2P_NODE_CONFIG=%s", confData))
if err := cmd.Start(); err != nil {
return fmt.Errorf("error starting node: %s", err)
}
Expand All @@ -177,7 +170,7 @@ func (n *ExecNode) Start() (err error) {
func (n *ExecNode) execCommand() *exec.Cmd {
return &exec.Cmd{
Path: reexec.Self(),
Args: []string{"p2p-node", n.Service, n.ID.String()},
Args: []string{"p2p-node", n.Config.Node.Service, n.ID.String()},
}
}

Expand Down Expand Up @@ -214,12 +207,11 @@ func (n *ExecNode) Stop() error {

// NodeInfo returns information about the node
func (n *ExecNode) NodeInfo() *p2p.NodeInfo {
if n.client == nil {
return n.Info
info := &p2p.NodeInfo{
ID: n.ID.String(),
}
info := &p2p.NodeInfo{}
if err := n.client.Call(&info, "admin_nodeInfo"); err != nil {
return n.Info
if n.client != nil {
n.client.Call(&info, "admin_nodeInfo")
}
return info
}
Expand All @@ -234,16 +226,33 @@ func (n *ExecNode) ServeRPC(conn net.Conn) error {
return nil
}

// Snapshot creates a snapshot of the service state by calling the
// simulation_snapshot RPC method
func (n *ExecNode) Snapshot() ([]byte, error) {
if n.client == nil {
return nil, errors.New("RPC not started")
}
var snapshot []byte
return snapshot, n.client.Call(&snapshot, "simulation_snapshot")
}

func init() {
// register a reexec function to start a devp2p node when the current
// binary is executed as "p2p-node"
reexec.Register("p2p-node", execP2PNode)
}

// execNodeConfig is used to serialize the node configuration so it can be
// passed to the child process as a JSON encoded environment variable
type execNodeConfig struct {
Stack node.Config `json:"stack"`
Node *NodeConfig `json:"node"`
Snapshot []byte `json:"snapshot,omitempty"`
}

// execP2PNode starts a devp2p node when the current binary is executed with
// argv[0] being "p2p-node", reading the service / ID from argv[1] / argv[2],
// the node config from the _P2P_NODE_CONFIG environment variable and the
// private key from the _P2P_NODE_KEY environment variable
// argv[0] being "p2p-node", reading the service / ID from argv[1] / argv[2]
// and the node config from the _P2P_NODE_CONFIG environment variable
func execP2PNode() {
glogger := log.NewGlogHandler(log.StreamHandler(os.Stderr, log.LogfmtFormat()))
glogger.Verbosity(log.LvlInfo)
Expand All @@ -258,45 +267,35 @@ func execP2PNode() {
if confEnv == "" {
log.Crit("missing _P2P_NODE_CONFIG")
}
var conf node.Config
var conf execNodeConfig
if err := json.Unmarshal([]byte(confEnv), &conf); err != nil {
log.Crit("error decoding _P2P_NODE_CONFIG", "err", err)
}

// decode the private key
keyEnv := os.Getenv("_P2P_NODE_KEY")
if keyEnv == "" {
log.Crit("missing _P2P_NODE_KEY")
}
key, err := hex.DecodeString(keyEnv)
if err != nil {
log.Crit("error decoding _P2P_NODE_KEY", "err", err)
}
conf.P2P.PrivateKey = crypto.ToECDSA(key)
conf.Stack.P2P.PrivateKey = conf.Node.PrivateKey

// initialize the service
serviceFunc, exists := serviceFuncs[serviceName]
if !exists {
log.Crit(fmt.Sprintf("unknown node service %q", serviceName))
}
service := serviceFunc(id)
service := serviceFunc(id, conf.Snapshot)

// use explicit IP address in ListenAddr so that Enode URL is usable
if strings.HasPrefix(conf.P2P.ListenAddr, ":") {
if strings.HasPrefix(conf.Stack.P2P.ListenAddr, ":") {
addrs, err := net.InterfaceAddrs()
if err != nil {
log.Crit("error getting IP address", "err", err)
}
for _, addr := range addrs {
if ip, ok := addr.(*net.IPNet); ok && !ip.IP.IsLoopback() {
conf.P2P.ListenAddr = ip.IP.String() + conf.P2P.ListenAddr
conf.Stack.P2P.ListenAddr = ip.IP.String() + conf.Stack.P2P.ListenAddr
break
}
}
}

// start the devp2p stack
stack, err := startP2PNode(&conf, service)
stack, err := startP2PNode(&conf.Stack, service)
if err != nil {
log.Crit("error starting p2p node", "err", err)
}
Expand Down Expand Up @@ -328,7 +327,7 @@ func startP2PNode(conf *node.Config, service node.Service) (*node.Node, error) {
return nil, err
}
constructor := func(ctx *node.ServiceContext) (node.Service, error) {
return service, nil
return &snapshotService{service}, nil
}
if err := stack.Register(constructor); err != nil {
return nil, err
Expand All @@ -339,6 +338,34 @@ func startP2PNode(conf *node.Config, service node.Service) (*node.Node, error) {
return stack, nil
}

// snapshotService wraps a node.Service and injects a snapshot API into the
// list of RPC APIs
type snapshotService struct {
node.Service
}

func (s *snapshotService) APIs() []rpc.API {
return append([]rpc.API{{
Namespace: "simulation",
Version: "1.0",
Service: SnapshotAPI{s.Service},
}}, s.Service.APIs()...)
}

// SnapshotAPI provides an RPC method to create a snapshot of a node.Service
type SnapshotAPI struct {
service node.Service
}

func (api SnapshotAPI) Snapshot() ([]byte, error) {
if s, ok := api.service.(interface {
Snapshot() ([]byte, error)
}); ok {
return s.Snapshot()
}
return nil, nil
}

// stdioConn wraps os.Stdin / os.Stdout with a no-op Close method so we can
// use stdio for RPC messages
type stdioConn struct {
Expand Down
Loading

0 comments on commit d7eb6b5

Please sign in to comment.