Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

p2p/simulations: use github.com/gorilla/websocket #20289

Merged
merged 4 commits into from
Nov 18, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
48 changes: 27 additions & 21 deletions p2p/simulations/adapters/exec.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ import (
"github.com/ethereum/go-ethereum/p2p"
"github.com/ethereum/go-ethereum/p2p/enode"
"github.com/ethereum/go-ethereum/rpc"
"golang.org/x/net/websocket"
"github.com/gorilla/websocket"
)

func init() {
Expand Down Expand Up @@ -118,7 +118,7 @@ func (e *ExecAdapter) NewNode(config *NodeConfig) (Node, error) {
conf.Stack.P2P.NAT = nil
conf.Stack.NoUSB = true

// listen on a localhost port, which we set when we
// Listen on a localhost port, which we set when we
// initialise NodeConfig (usually a random port)
conf.Stack.P2P.ListenAddr = fmt.Sprintf(":%d", config.Port)

Expand Down Expand Up @@ -205,17 +205,17 @@ func (n *ExecNode) Start(snapshots map[string][]byte) (err error) {
}
n.Cmd = cmd

// read the WebSocket address from the stderr logs
// Wait for the node to start.
status := <-statusC
if status.Err != "" {
return errors.New(status.Err)
}
client, err := rpc.DialWebsocket(ctx, status.WSEndpoint, "http://localhost")
client, err := rpc.DialWebsocket(ctx, status.WSEndpoint, "")
if err != nil {
return fmt.Errorf("can't connect to RPC server: %v", err)
}

// node ready :)
// Node ready :)
n.client = client
n.wsAddr = status.WSEndpoint
n.Info = status.NodeInfo
Expand Down Expand Up @@ -314,31 +314,37 @@ func (n *ExecNode) NodeInfo() *p2p.NodeInfo {

// ServeRPC serves RPC requests over the given connection by dialling the
// node's WebSocket address and joining the two connections
func (n *ExecNode) ServeRPC(clientConn net.Conn) error {
conn, err := websocket.Dial(n.wsAddr, "", "http://localhost")
func (n *ExecNode) ServeRPC(clientConn *websocket.Conn) error {
conn, _, err := websocket.DefaultDialer.Dial(n.wsAddr, nil)
if err != nil {
return err
}
var wg sync.WaitGroup
wg.Add(2)
join := func(src, dst net.Conn) {
defer wg.Done()
io.Copy(dst, src)
// close the write end of the destination connection
if cw, ok := dst.(interface {
CloseWrite() error
}); ok {
cw.CloseWrite()
} else {
dst.Close()
}
}
go join(conn, clientConn)
go join(clientConn, conn)
go wsCopy(&wg, conn, clientConn)
go wsCopy(&wg, clientConn, conn)
wg.Wait()
conn.Close()
return nil
}

func wsCopy(wg *sync.WaitGroup, src, dst *websocket.Conn) {
defer wg.Done()
for {
msgType, r, err := src.NextReader()
if err != nil {
return
}
w, err := dst.NextWriter(msgType)
if err != nil {
return
}
if _, err = io.Copy(w, r); err != nil {
return
}
}
}

// Snapshots creates snapshots of the services by calling the
// simulation_snapshot RPC method
func (n *ExecNode) Snapshots() (map[string][]byte, error) {
Expand Down
8 changes: 5 additions & 3 deletions p2p/simulations/adapters/inproc.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ import (
"github.com/ethereum/go-ethereum/p2p/enode"
"github.com/ethereum/go-ethereum/p2p/simulations/pipes"
"github.com/ethereum/go-ethereum/rpc"
"github.com/gorilla/websocket"
)

// SimAdapter is a NodeAdapter which creates in-memory simulation nodes and
Expand Down Expand Up @@ -210,13 +211,14 @@ func (sn *SimNode) Client() (*rpc.Client, error) {
}

// ServeRPC serves RPC requests over the given connection by creating an
// in-memory client to the node's RPC server
func (sn *SimNode) ServeRPC(conn net.Conn) error {
// in-memory client to the node's RPC server.
func (sn *SimNode) ServeRPC(conn *websocket.Conn) error {
handler, err := sn.node.RPCHandler()
if err != nil {
return err
}
handler.ServeCodec(rpc.NewJSONCodec(conn), rpc.OptionMethodInvocation|rpc.OptionSubscriptions)
codec := rpc.NewFuncCodec(conn, conn.WriteJSON, conn.ReadJSON)
handler.ServeCodec(codec, 0)
return nil
}

Expand Down
3 changes: 2 additions & 1 deletion p2p/simulations/adapters/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ import (
"github.com/ethereum/go-ethereum/p2p/enode"
"github.com/ethereum/go-ethereum/p2p/enr"
"github.com/ethereum/go-ethereum/rpc"
"github.com/gorilla/websocket"
)

// Node represents a node in a simulation network which is created by a
Expand All @@ -51,7 +52,7 @@ type Node interface {
Client() (*rpc.Client, error)

// ServeRPC serves RPC requests over the given connection
ServeRPC(net.Conn) error
ServeRPC(*websocket.Conn) error

// Start starts the node with the given snapshots
Start(snapshots map[string][]byte) error
Expand Down
18 changes: 11 additions & 7 deletions p2p/simulations/http.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,8 +34,8 @@ import (
"github.com/ethereum/go-ethereum/p2p/enode"
"github.com/ethereum/go-ethereum/p2p/simulations/adapters"
"github.com/ethereum/go-ethereum/rpc"
"github.com/gorilla/websocket"
"github.com/julienschmidt/httprouter"
"golang.org/x/net/websocket"
)

// DefaultClient is the default simulation API client which expects the API
Expand Down Expand Up @@ -654,16 +654,20 @@ func (s *Server) Options(w http.ResponseWriter, req *http.Request) {
w.WriteHeader(http.StatusOK)
}

var wsUpgrade = websocket.Upgrader{
CheckOrigin: func(*http.Request) bool { return true },
}

// NodeRPC forwards RPC requests to a node in the network via a WebSocket
// connection
func (s *Server) NodeRPC(w http.ResponseWriter, req *http.Request) {
node := req.Context().Value("node").(*Node)

handler := func(conn *websocket.Conn) {
node.ServeRPC(conn)
conn, err := wsUpgrade.Upgrade(w, req, nil)
if err != nil {
return
}

websocket.Server{Handler: handler}.ServeHTTP(w, req)
defer conn.Close()
node := req.Context().Value("node").(*Node)
node.ServeRPC(conn)
}

// ServeHTTP implements the http.Handler interface by delegating to the
Expand Down
12 changes: 6 additions & 6 deletions rpc/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -117,7 +117,7 @@ func (c *Client) newClientConn(conn ServerCodec) *clientConn {

func (cc *clientConn) close(err error, inflightReq *requestOp) {
cc.handler.close(err, inflightReq)
cc.codec.Close()
cc.codec.close()
}

type readOp struct {
Expand Down Expand Up @@ -484,7 +484,7 @@ func (c *Client) write(ctx context.Context, msg interface{}) error {
return err
}
}
err := c.writeConn.Write(ctx, msg)
err := c.writeConn.writeJSON(ctx, msg)
if err != nil {
c.writeConn = nil
}
Expand All @@ -511,7 +511,7 @@ func (c *Client) reconnect(ctx context.Context) error {
c.writeConn = newconn
return nil
case <-c.didClose:
newconn.Close()
newconn.close()
return ErrClientQuit
}
}
Expand Down Expand Up @@ -558,7 +558,7 @@ func (c *Client) dispatch(codec ServerCodec) {

// Reconnect:
case newcodec := <-c.reconnected:
log.Debug("RPC client reconnected", "reading", reading, "conn", newcodec.RemoteAddr())
log.Debug("RPC client reconnected", "reading", reading, "conn", newcodec.remoteAddr())
if reading {
// Wait for the previous read loop to exit. This is a rare case which
// happens if this loop isn't notified in time after the connection breaks.
Expand Down Expand Up @@ -612,9 +612,9 @@ func (c *Client) drainRead() {
// read decodes RPC messages from a codec, feeding them into dispatch.
func (c *Client) read(codec ServerCodec) {
for {
msgs, batch, err := codec.Read()
msgs, batch, err := codec.readBatch()
if _, ok := err.(*json.SyntaxError); ok {
codec.Write(context.Background(), errorMessage(&parseError{err.Error()}))
codec.writeJSON(context.Background(), errorMessage(&parseError{err.Error()}))
}
if err != nil {
c.readErr <- err
Expand Down
10 changes: 5 additions & 5 deletions rpc/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -85,8 +85,8 @@ func newHandler(connCtx context.Context, conn jsonWriter, idgen func() ID, reg *
serverSubs: make(map[ID]*Subscription),
log: log.Root(),
}
if conn.RemoteAddr() != "" {
h.log = h.log.New("conn", conn.RemoteAddr())
if conn.remoteAddr() != "" {
h.log = h.log.New("conn", conn.remoteAddr())
}
h.unsubscribeCb = newCallback(reflect.Value{}, reflect.ValueOf(h.unsubscribe))
return h
Expand All @@ -97,7 +97,7 @@ func (h *handler) handleBatch(msgs []*jsonrpcMessage) {
// Emit error response for empty batches:
if len(msgs) == 0 {
h.startCallProc(func(cp *callProc) {
h.conn.Write(cp.ctx, errorMessage(&invalidRequestError{"empty batch"}))
h.conn.writeJSON(cp.ctx, errorMessage(&invalidRequestError{"empty batch"}))
})
return
}
Expand All @@ -122,7 +122,7 @@ func (h *handler) handleBatch(msgs []*jsonrpcMessage) {
}
h.addSubscriptions(cp.notifiers)
if len(answers) > 0 {
h.conn.Write(cp.ctx, answers)
h.conn.writeJSON(cp.ctx, answers)
}
for _, n := range cp.notifiers {
n.activate()
Expand All @@ -139,7 +139,7 @@ func (h *handler) handleMsg(msg *jsonrpcMessage) {
answer := h.handleCallMsg(cp, msg)
h.addSubscriptions(cp.notifiers)
if answer != nil {
h.conn.Write(cp.ctx, answer)
h.conn.writeJSON(cp.ctx, answer)
}
for _, n := range cp.notifiers {
n.activate()
Expand Down
26 changes: 13 additions & 13 deletions rpc/http.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,29 +47,29 @@ type httpConn struct {
client *http.Client
req *http.Request
closeOnce sync.Once
closed chan interface{}
closeCh chan interface{}
}

// httpConn is treated specially by Client.
func (hc *httpConn) Write(context.Context, interface{}) error {
panic("Write called on httpConn")
func (hc *httpConn) writeJSON(context.Context, interface{}) error {
panic("writeJSON called on httpConn")
}

func (hc *httpConn) RemoteAddr() string {
func (hc *httpConn) remoteAddr() string {
return hc.req.URL.String()
}

func (hc *httpConn) Read() ([]*jsonrpcMessage, bool, error) {
<-hc.closed
func (hc *httpConn) readBatch() ([]*jsonrpcMessage, bool, error) {
<-hc.closeCh
return nil, false, io.EOF
}

func (hc *httpConn) Close() {
hc.closeOnce.Do(func() { close(hc.closed) })
func (hc *httpConn) close() {
hc.closeOnce.Do(func() { close(hc.closeCh) })
}

func (hc *httpConn) Closed() <-chan interface{} {
return hc.closed
func (hc *httpConn) closed() <-chan interface{} {
return hc.closeCh
}

// HTTPTimeouts represents the configuration params for the HTTP RPC server.
Expand Down Expand Up @@ -116,7 +116,7 @@ func DialHTTPWithClient(endpoint string, client *http.Client) (*Client, error) {

initctx := context.Background()
return newClient(initctx, func(context.Context) (ServerCodec, error) {
return &httpConn{client: client, req: req, closed: make(chan interface{})}, nil
return &httpConn{client: client, req: req, closeCh: make(chan interface{})}, nil
})
}

Expand Down Expand Up @@ -195,7 +195,7 @@ type httpServerConn struct {
func newHTTPServerConn(r *http.Request, w http.ResponseWriter) ServerCodec {
body := io.LimitReader(r.Body, maxRequestContentLength)
conn := &httpServerConn{Reader: body, Writer: w, r: r}
return NewJSONCodec(conn)
return NewCodec(conn)
}

// Close does nothing and always returns nil.
Expand Down Expand Up @@ -266,7 +266,7 @@ func (s *Server) ServeHTTP(w http.ResponseWriter, r *http.Request) {

w.Header().Set("content-type", contentType)
codec := newHTTPServerConn(r, w)
defer codec.Close()
defer codec.close()
s.serveSingleRequest(ctx, codec)
}

Expand Down
4 changes: 2 additions & 2 deletions rpc/inproc.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,8 +26,8 @@ func DialInProc(handler *Server) *Client {
initctx := context.Background()
c, _ := newClient(initctx, func(context.Context) (ServerCodec, error) {
p1, p2 := net.Pipe()
go handler.ServeCodec(NewJSONCodec(p1), OptionMethodInvocation|OptionSubscriptions)
return NewJSONCodec(p2), nil
go handler.ServeCodec(NewCodec(p1), 0)
return NewCodec(p2), nil
})
return c
}
4 changes: 2 additions & 2 deletions rpc/ipc.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ func (s *Server) ServeListener(l net.Listener) error {
return err
}
log.Trace("Accepted RPC connection", "conn", conn.RemoteAddr())
go s.ServeCodec(NewJSONCodec(conn), OptionMethodInvocation|OptionSubscriptions)
go s.ServeCodec(NewCodec(conn), 0)
}
}

Expand All @@ -51,6 +51,6 @@ func DialIPC(ctx context.Context, endpoint string) (*Client, error) {
if err != nil {
return nil, err
}
return NewJSONCodec(conn), err
return NewCodec(conn), err
})
}
Loading