From c42b501f7e0a2e2289353a3611d49acf3f8a2834 Mon Sep 17 00:00:00 2001 From: Juan Batiz-Benet Date: Sun, 12 Apr 2015 15:39:04 -0700 Subject: [PATCH 1/2] remove msgio double wrap There was doublewrapping with an unneeded msgio. given that we use a stream muxer now, msgio is only needed by secureConn -- to signal the boundaries of an encrypted / mac-ed ciphertext. Side note: i think including the varint length in the clear is actually a bad idea that can be exploited by an attacker. it should be encrypted, too. (TODO) --- p2p/net/conn/conn.go | 28 +++------------------------- p2p/net/conn/conn_test.go | 30 ++++++++++++++++++++++-------- p2p/net/conn/dial_test.go | 10 +++++----- p2p/net/conn/interface.go | 5 ++--- p2p/net/conn/secure_conn.go | 14 -------------- p2p/net/conn/secure_conn_test.go | 11 +++++++---- 6 files changed, 39 insertions(+), 59 deletions(-) diff --git a/p2p/net/conn/conn.go b/p2p/net/conn/conn.go index 8b617515998..d74181e02ae 100644 --- a/p2p/net/conn/conn.go +++ b/p2p/net/conn/conn.go @@ -6,7 +6,6 @@ import ( "net" "time" - msgio "github.com/ipfs/go-ipfs/Godeps/_workspace/src/github.com/jbenet/go-msgio" mpool "github.com/ipfs/go-ipfs/Godeps/_workspace/src/github.com/jbenet/go-msgio/mpool" ma "github.com/ipfs/go-ipfs/Godeps/_workspace/src/github.com/jbenet/go-multiaddr" manet "github.com/ipfs/go-ipfs/Godeps/_workspace/src/github.com/jbenet/go-multiaddr-net" @@ -32,7 +31,6 @@ type singleConn struct { local peer.ID remote peer.ID maconn manet.Conn - msgrw msgio.ReadWriteCloser event io.Closer } @@ -44,7 +42,6 @@ func newSingleConn(ctx context.Context, local, remote peer.ID, maconn manet.Conn local: local, remote: remote, maconn: maconn, - msgrw: msgio.NewReadWriter(maconn), event: log.EventBegin(ctx, "connLifetime", ml), } @@ -62,7 +59,7 @@ func (c *singleConn) Close() error { }() // close underlying connection - return c.msgrw.Close() + return c.maconn.Close() } // ID is an identifier unique to this connection. @@ -123,31 +120,12 @@ func (c *singleConn) RemotePeer() peer.ID { // Read reads data, net.Conn style func (c *singleConn) Read(buf []byte) (int, error) { - return c.msgrw.Read(buf) + return c.maconn.Read(buf) } // Write writes data, net.Conn style func (c *singleConn) Write(buf []byte) (int, error) { - return c.msgrw.Write(buf) -} - -func (c *singleConn) NextMsgLen() (int, error) { - return c.msgrw.NextMsgLen() -} - -// ReadMsg reads data, net.Conn style -func (c *singleConn) ReadMsg() ([]byte, error) { - return c.msgrw.ReadMsg() -} - -// WriteMsg writes data, net.Conn style -func (c *singleConn) WriteMsg(buf []byte) error { - return c.msgrw.WriteMsg(buf) -} - -// ReleaseMsg releases a buffer -func (c *singleConn) ReleaseMsg(m []byte) { - c.msgrw.ReleaseMsg(m) + return c.maconn.Write(buf) } // ID returns the ID of a given Conn. diff --git a/p2p/net/conn/conn_test.go b/p2p/net/conn/conn_test.go index 03e09d86984..25b23072b1b 100644 --- a/p2p/net/conn/conn_test.go +++ b/p2p/net/conn/conn_test.go @@ -8,17 +8,25 @@ import ( "testing" "time" + msgio "github.com/ipfs/go-ipfs/Godeps/_workspace/src/github.com/jbenet/go-msgio" context "github.com/ipfs/go-ipfs/Godeps/_workspace/src/golang.org/x/net/context" travis "github.com/ipfs/go-ipfs/util/testutil/ci/travis" ) +func msgioWrap(c Conn) msgio.ReadWriter { + return msgio.NewReadWriter(c) +} + func testOneSendRecv(t *testing.T, c1, c2 Conn) { + mc1 := msgioWrap(c1) + mc2 := msgioWrap(c2) + log.Debugf("testOneSendRecv from %s to %s", c1.LocalPeer(), c2.LocalPeer()) m1 := []byte("hello") - if err := c1.WriteMsg(m1); err != nil { + if err := mc1.WriteMsg(m1); err != nil { t.Fatal(err) } - m2, err := c2.ReadMsg() + m2, err := mc2.ReadMsg() if err != nil { t.Fatal(err) } @@ -28,11 +36,14 @@ func testOneSendRecv(t *testing.T, c1, c2 Conn) { } func testNotOneSendRecv(t *testing.T, c1, c2 Conn) { + mc1 := msgioWrap(c1) + mc2 := msgioWrap(c2) + m1 := []byte("hello") - if err := c1.WriteMsg(m1); err == nil { + if err := mc1.WriteMsg(m1); err == nil { t.Fatal("write should have failed", err) } - _, err := c2.ReadMsg() + _, err := mc2.ReadMsg() if err == nil { t.Fatal("read should have failed", err) } @@ -72,10 +83,13 @@ func TestCloseLeak(t *testing.T) { ctx, cancel := context.WithCancel(context.Background()) c1, c2, _, _ := setupSingleConn(t, ctx) + mc1 := msgioWrap(c1) + mc2 := msgioWrap(c2) + for i := 0; i < num; i++ { b1 := []byte(fmt.Sprintf("beep%d", i)) - c1.WriteMsg(b1) - b2, err := c2.ReadMsg() + mc1.WriteMsg(b1) + b2, err := mc2.ReadMsg() if err != nil { panic(err) } @@ -84,8 +98,8 @@ func TestCloseLeak(t *testing.T) { } b2 = []byte(fmt.Sprintf("boop%d", i)) - c2.WriteMsg(b2) - b1, err = c1.ReadMsg() + mc2.WriteMsg(b2) + b1, err = mc1.ReadMsg() if err != nil { panic(err) } diff --git a/p2p/net/conn/dial_test.go b/p2p/net/conn/dial_test.go index 4c5b584bc8a..b4c5cf40023 100644 --- a/p2p/net/conn/dial_test.go +++ b/p2p/net/conn/dial_test.go @@ -161,10 +161,10 @@ func testDialer(t *testing.T, secure bool) { } // fmt.Println("sending") - c.WriteMsg([]byte("beep")) - c.WriteMsg([]byte("boop")) - - out, err := c.ReadMsg() + mc := msgioWrap(c) + mc.WriteMsg([]byte("beep")) + mc.WriteMsg([]byte("boop")) + out, err := mc.ReadMsg() if err != nil { t.Fatal(err) } @@ -175,7 +175,7 @@ func testDialer(t *testing.T, secure bool) { t.Error("unexpected conn output", data) } - out, err = c.ReadMsg() + out, err = mc.ReadMsg() if err != nil { t.Fatal(err) } diff --git a/p2p/net/conn/interface.go b/p2p/net/conn/interface.go index 82008593057..d505f9e7a4b 100644 --- a/p2p/net/conn/interface.go +++ b/p2p/net/conn/interface.go @@ -10,7 +10,6 @@ import ( filter "github.com/ipfs/go-ipfs/p2p/net/filter" peer "github.com/ipfs/go-ipfs/p2p/peer" - msgio "github.com/ipfs/go-ipfs/Godeps/_workspace/src/github.com/jbenet/go-msgio" ma "github.com/ipfs/go-ipfs/Godeps/_workspace/src/github.com/jbenet/go-multiaddr" manet "github.com/ipfs/go-ipfs/Godeps/_workspace/src/github.com/jbenet/go-multiaddr-net" ) @@ -46,8 +45,8 @@ type Conn interface { SetReadDeadline(t time.Time) error SetWriteDeadline(t time.Time) error - msgio.Reader - msgio.Writer + io.Reader + io.Writer } // Dialer is an object that can open connections. We could have a "convenience" diff --git a/p2p/net/conn/secure_conn.go b/p2p/net/conn/secure_conn.go index f5ac698e62f..4e786c4b271 100644 --- a/p2p/net/conn/secure_conn.go +++ b/p2p/net/conn/secure_conn.go @@ -119,20 +119,6 @@ func (c *secureConn) Write(buf []byte) (int, error) { return c.secure.ReadWriter().Write(buf) } -func (c *secureConn) NextMsgLen() (int, error) { - return c.secure.ReadWriter().NextMsgLen() -} - -// ReadMsg reads data, net.Conn style -func (c *secureConn) ReadMsg() ([]byte, error) { - return c.secure.ReadWriter().ReadMsg() -} - -// WriteMsg writes data, net.Conn style -func (c *secureConn) WriteMsg(buf []byte) error { - return c.secure.ReadWriter().WriteMsg(buf) -} - // ReleaseMsg releases a buffer func (c *secureConn) ReleaseMsg(m []byte) { c.secure.ReadWriter().ReleaseMsg(m) diff --git a/p2p/net/conn/secure_conn_test.go b/p2p/net/conn/secure_conn_test.go index f027b6a4c6d..9f5a53794ee 100644 --- a/p2p/net/conn/secure_conn_test.go +++ b/p2p/net/conn/secure_conn_test.go @@ -145,13 +145,16 @@ func TestSecureCloseLeak(t *testing.T) { } runPair := func(c1, c2 Conn, num int) { + mc1 := msgioWrap(c1) + mc2 := msgioWrap(c2) + log.Debugf("runPair %d", num) for i := 0; i < num; i++ { log.Debugf("runPair iteration %d", i) b1 := []byte("beep") - c1.WriteMsg(b1) - b2, err := c2.ReadMsg() + mc1.WriteMsg(b1) + b2, err := mc2.ReadMsg() if err != nil { panic(err) } @@ -160,8 +163,8 @@ func TestSecureCloseLeak(t *testing.T) { } b2 = []byte("beep") - c2.WriteMsg(b2) - b1, err = c1.ReadMsg() + mc2.WriteMsg(b2) + b1, err = mc1.ReadMsg() if err != nil { panic(err) } From 956e398768b2e28f2dbd565efe3dbb56d4f1fa15 Mon Sep 17 00:00:00 2001 From: Jeromy Date: Tue, 2 Jun 2015 11:47:05 -0700 Subject: [PATCH 2/2] buffer msgio --- .../src/github.com/jbenet/go-msgio/msgio.go | 17 ++++++++++++----- 1 file changed, 12 insertions(+), 5 deletions(-) diff --git a/Godeps/_workspace/src/github.com/jbenet/go-msgio/msgio.go b/Godeps/_workspace/src/github.com/jbenet/go-msgio/msgio.go index 6547e68901a..b2f8c206571 100644 --- a/Godeps/_workspace/src/github.com/jbenet/go-msgio/msgio.go +++ b/Godeps/_workspace/src/github.com/jbenet/go-msgio/msgio.go @@ -1,6 +1,7 @@ package msgio import ( + "bufio" "encoding/binary" "errors" "io" @@ -79,7 +80,8 @@ type ReadWriteCloser interface { // writer is the underlying type that implements the Writer interface. type writer struct { - W io.Writer + W io.Writer + buf *bufio.Writer lock sync.Locker } @@ -87,7 +89,7 @@ type writer struct { // NewWriter wraps an io.Writer with a msgio framed writer. The msgio.Writer // will write the length prefix of every message written. func NewWriter(w io.Writer) WriteCloser { - return &writer{W: w, lock: new(sync.Mutex)} + return &writer{W: w, buf: bufio.NewWriter(w), lock: new(sync.Mutex)} } func (s *writer) Write(msg []byte) (int, error) { @@ -103,11 +105,16 @@ func (s *writer) WriteMsg(msg []byte) (err error) { defer s.lock.Unlock() length := uint32(len(msg)) - if err := binary.Write(s.W, NBO, &length); err != nil { + if err := binary.Write(s.buf, NBO, &length); err != nil { return err } - _, err = s.W.Write(msg) - return err + + _, err = s.buf.Write(msg) + if err != nil { + return err + } + + return s.buf.Flush() } func (s *writer) Close() error {