Skip to content

Commit

Permalink
Merge pull request ethereum#86 from ethersphere/network-testing-frame…
Browse files Browse the repository at this point in the history
…work-rpc-subscribe

p2p/simulations: Move PeerEvents to admin RPC namespace
  • Loading branch information
zelig authored May 9, 2017
2 parents 6de0912 + cdab665 commit aa624cd
Show file tree
Hide file tree
Showing 7 changed files with 121 additions and 87 deletions.
40 changes: 40 additions & 0 deletions node/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
package node

import (
"context"
"fmt"
"strings"
"time"
Expand All @@ -25,6 +26,7 @@ import (
"github.com/ethereum/go-ethereum/crypto"
"github.com/ethereum/go-ethereum/p2p"
"github.com/ethereum/go-ethereum/p2p/discover"
"github.com/ethereum/go-ethereum/rpc"
"github.com/rcrowley/go-metrics"
)

Expand Down Expand Up @@ -73,6 +75,44 @@ func (api *PrivateAdminAPI) RemovePeer(url string) (bool, error) {
return true, nil
}

// PeerEvents creates an RPC subscription which receives peer events from the
// node's p2p.Server
func (api *PrivateAdminAPI) PeerEvents(ctx context.Context) (*rpc.Subscription, error) {
// Make sure the server is running, fail otherwise
server := api.node.Server()
if server == nil {
return nil, ErrNodeStopped
}

// Create the subscription
notifier, supported := rpc.NotifierFromContext(ctx)
if !supported {
return nil, rpc.ErrNotificationsUnsupported
}
rpcSub := notifier.CreateSubscription()

go func() {
events := make(chan *p2p.PeerEvent)
sub := server.SubscribeEvents(events)
defer sub.Unsubscribe()

for {
select {
case event := <-events:
notifier.Notify(rpcSub.ID, event)
case <-sub.Err():
return
case <-rpcSub.Err():
return
case <-notifier.Closed():
return
}
}
}()

return rpcSub, nil
}

// StartRPC starts the HTTP RPC API server.
func (api *PrivateAdminAPI) StartRPC(host *string, port *int, cors *string, apis *string) (bool, error) {
api.node.lock.Lock()
Expand Down
77 changes: 3 additions & 74 deletions p2p/simulations/adapters/exec.go
Original file line number Diff line number Diff line change
Expand Up @@ -315,22 +315,10 @@ func startP2PNode(conf *node.Config, service node.Service) (*node.Node, error) {
if err != nil {
return nil, err
}

constructor := func(s node.Service) node.ServiceConstructor {
return func(ctx *node.ServiceContext) (node.Service, error) {
return s, nil
}
}

// register the peer events API
//
// TODO: move this to node.PrivateAdminAPI once the following is merged:
// https://github.com/ethereum/go-ethereum/pull/13885
if err := stack.Register(constructor(&PeerAPI{stack.Server})); err != nil {
return nil, err
constructor := func(ctx *node.ServiceContext) (node.Service, error) {
return service, nil
}

if err := stack.Register(constructor(service)); err != nil {
if err := stack.Register(constructor); err != nil {
return nil, err
}
if err := stack.Start(); err != nil {
Expand All @@ -339,65 +327,6 @@ func startP2PNode(conf *node.Config, service node.Service) (*node.Node, error) {
return stack, nil
}

// PeerAPI is used to expose peer events under the "eth" RPC namespace.
//
// TODO: move this to node.PrivateAdminAPI and expose under the "admin"
// namespace once the following is merged:
// https://github.com/ethereum/go-ethereum/pull/13885
type PeerAPI struct {
server func() p2p.Server
}

func (p *PeerAPI) Protocols() []p2p.Protocol {
return nil
}

func (p *PeerAPI) APIs() []rpc.API {
return []rpc.API{{
Namespace: "eth",
Version: "1.0",
Service: p,
}}
}

func (p *PeerAPI) Start(p2p.Server) error {
return nil
}

func (p *PeerAPI) Stop() error {
return nil
}

// PeerEvents creates an RPC sunscription which receives peer events from the
// underlying p2p.Server
func (p *PeerAPI) PeerEvents(ctx context.Context) (*rpc.Subscription, error) {
notifier, supported := rpc.NotifierFromContext(ctx)
if !supported {
return &rpc.Subscription{}, rpc.ErrNotificationsUnsupported
}

rpcSub := notifier.CreateSubscription()

go func() {
events := make(chan *p2p.PeerEvent)
sub := p.server().SubscribeEvents(events)
defer sub.Unsubscribe()

for {
select {
case event := <-events:
notifier.Notify(rpcSub.ID, event)
case <-rpcSub.Err():
return
case <-notifier.Closed():
return
}
}
}()

return rpcSub, nil
}

// stdioConn wraps os.Stdin / os.Stdout with a no-op Close method so we can
// use stdio for RPC messages
type stdioConn struct {
Expand Down
40 changes: 34 additions & 6 deletions p2p/simulations/adapters/inproc.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
package adapters

import (
"context"
"errors"
"fmt"
"sync"
Expand Down Expand Up @@ -184,19 +185,14 @@ func (self *SimNode) startRPC() error {
return errors.New("RPC already started")
}

// add SimAdminAPI and PeerAPI so that the network can call the
// add SimAdminAPI so that the network can call the
// AddPeer, RemovePeer and PeerEvents RPC methods
apis := append(self.service.APIs(), []rpc.API{
{
Namespace: "admin",
Version: "1.0",
Service: &SimAdminAPI{self},
},
{
Namespace: "eth",
Version: "1.0",
Service: &PeerAPI{func() p2p.Server { return self }},
},
}...)

// start the RPC handler
Expand Down Expand Up @@ -356,3 +352,35 @@ func (api *SimAdminAPI) RemovePeer(url string) (bool, error) {
api.SimNode.RemovePeer(node)
return true, nil
}

// PeerEvents creates an RPC subscription which receives peer events from the
// underlying p2p.Server
func (api *SimAdminAPI) PeerEvents(ctx context.Context) (*rpc.Subscription, error) {
notifier, supported := rpc.NotifierFromContext(ctx)
if !supported {
return &rpc.Subscription{}, rpc.ErrNotificationsUnsupported
}

rpcSub := notifier.CreateSubscription()

go func() {
events := make(chan *p2p.PeerEvent)
sub := api.SubscribeEvents(events)
defer sub.Unsubscribe()

for {
select {
case event := <-events:
notifier.Notify(rpcSub.ID, event)
case <-sub.Err():
return
case <-rpcSub.Err():
return
case <-notifier.Closed():
return
}
}
}()

return rpcSub, nil
}
2 changes: 1 addition & 1 deletion p2p/simulations/network.go
Original file line number Diff line number Diff line change
Expand Up @@ -426,7 +426,7 @@ func (self *Network) Start(id *adapters.NodeId) error {
return fmt.Errorf("error getting rpc client for node %v: %s", id, err)
}
events := make(chan *p2p.PeerEvent)
sub, err := client.EthSubscribe(context.Background(), events, "peerEvents")
sub, err := client.Subscribe(context.Background(), "admin", events, "peerEvents")
if err != nil {
return fmt.Errorf("error getting peer events for node %v: %s", id, err)
}
Expand Down
15 changes: 10 additions & 5 deletions rpc/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -357,19 +357,24 @@ func (c *Client) BatchCallContext(ctx context.Context, b []BatchElem) error {
return err
}

// EthSubscribe calls the "eth_subscribe" method with the given arguments,
// EthSubscribe registers a subscripion under the "eth" namespace.
func (c *Client) EthSubscribe(ctx context.Context, channel interface{}, args ...interface{}) (*ClientSubscription, error) {
return c.Subscribe(ctx, "eth", channel, args)
}

// Subscribe calls the "<namespace>_subscribe" method with the given arguments,
// registering a subscription. Server notifications for the subscription are
// sent to the given channel. The element type of the channel must match the
// expected type of content returned by the subscription.
//
// The context argument cancels the RPC request that sets up the subscription but has no
// effect on the subscription after EthSubscribe has returned.
// effect on the subscription after Subscribe has returned.
//
// Slow subscribers will be dropped eventually. Client buffers up to 8000 notifications
// before considering the subscriber dead. The subscription Err channel will receive
// ErrSubscriptionQueueOverflow. Use a sufficiently large buffer on the channel or ensure
// that the channel usually has at least one reader to prevent this issue.
func (c *Client) EthSubscribe(ctx context.Context, channel interface{}, args ...interface{}) (*ClientSubscription, error) {
func (c *Client) Subscribe(ctx context.Context, namespace string, channel interface{}, args ...interface{}) (*ClientSubscription, error) {
// Check type of channel first.
chanVal := reflect.ValueOf(channel)
if chanVal.Kind() != reflect.Chan || chanVal.Type().ChanDir()&reflect.SendDir == 0 {
Expand All @@ -382,14 +387,14 @@ func (c *Client) EthSubscribe(ctx context.Context, channel interface{}, args ...
return nil, ErrNotificationsUnsupported
}

msg, err := c.newMessage("eth"+subscribeMethodSuffix, args...)
msg, err := c.newMessage(namespace+subscribeMethodSuffix, args...)
if err != nil {
return nil, err
}
op := &requestOp{
ids: []json.RawMessage{msg.ID},
resp: make(chan *jsonrpcMessage),
sub: newClientSubscription(c, "eth", chanVal),
sub: newClientSubscription(c, namespace, chanVal),
}

// Send the subscription request.
Expand Down
32 changes: 32 additions & 0 deletions rpc/client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -251,6 +251,38 @@ func TestClientSubscribe(t *testing.T) {
}
}

func TestClientSubscribeCustomNamespace(t *testing.T) {
namespace := "custom"
server := newTestServer(namespace, new(NotificationTestService))
defer server.Stop()
client := DialInProc(server)
defer client.Close()

nc := make(chan int)
count := 10
sub, err := client.Subscribe(context.Background(), namespace, nc, "someSubscription", count, 0)
if err != nil {
t.Fatal("can't subscribe:", err)
}
for i := 0; i < count; i++ {
if val := <-nc; val != i {
t.Fatalf("value mismatch: got %d, want %d", val, i)
}
}

sub.Unsubscribe()
select {
case v := <-nc:
t.Fatal("received value after unsubscribe:", v)
case err := <-sub.Err():
if err != nil {
t.Fatalf("Err returned a non-nil error after explicit unsubscribe: %q", err)
}
case <-time.After(1 * time.Second):
t.Fatalf("subscription not closed within 1s after unsubscribe")
}
}

// In this test, the connection drops while EthSubscribe is
// waiting for a response.
func TestClientSubscribeClose(t *testing.T) {
Expand Down
2 changes: 1 addition & 1 deletion swarm/network/simulations/discovery/discovery_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -159,7 +159,7 @@ func triggerChecks(trigger chan *adapters.NodeId, net *simulations.Network, id *
return err
}
events := make(chan *p2p.PeerEvent)
sub, err := client.EthSubscribe(context.Background(), events, "peerEvents")
sub, err := client.Subscribe(context.Background(), "admin", events, "peerEvents")
if err != nil {
return fmt.Errorf("error getting peer events for node %v: %s", id, err)
}
Expand Down

0 comments on commit aa624cd

Please sign in to comment.