From 1b84a1cc9d22dbe3fb2a43c3474fca9e3da8471c Mon Sep 17 00:00:00 2001 From: Jeromy Date: Fri, 4 Mar 2016 09:27:52 -0800 Subject: [PATCH 1/2] Add fallback dialer Previously we were unable to dial on a given transport if there were no listener addresses defined for that. This meant that most people couldnt use utp (as they had no utp listener transport to dial from). --- p2p/net/conn/dial.go | 5 +++ p2p/net/conn/interface.go | 2 ++ p2p/net/swarm/dial_test.go | 55 ++++++++++++++++++++++++++++++ p2p/net/swarm/swarm_test.go | 25 +++++++++++--- p2p/net/transport/fallback.go | 63 +++++++++++++++++++++++++++++++++++ 5 files changed, 146 insertions(+), 4 deletions(-) create mode 100644 p2p/net/transport/fallback.go diff --git a/p2p/net/conn/dial.go b/p2p/net/conn/dial.go index 00b3566fd7..3d0101da9d 100644 --- a/p2p/net/conn/dial.go +++ b/p2p/net/conn/dial.go @@ -22,6 +22,7 @@ func NewDialer(p peer.ID, pk ci.PrivKey, wrap WrapFunc) *Dialer { LocalPeer: p, PrivateKey: pk, Wrapper: wrap, + fallback: new(transport.FallbackDialer), } } @@ -115,6 +116,10 @@ func (d *Dialer) subDialerForAddr(raddr ma.Multiaddr) transport.Dialer { } } + if d.fallback.Matches(raddr) { + return d.fallback + } + return nil } diff --git a/p2p/net/conn/interface.go b/p2p/net/conn/interface.go index 3da7e9aea9..babc2ff221 100644 --- a/p2p/net/conn/interface.go +++ b/p2p/net/conn/interface.go @@ -65,6 +65,8 @@ type Dialer struct { // Wrapper to wrap the raw connection (optional) Wrapper WrapFunc + + fallback transport.Dialer } // Listener is an object that can accept connections. It matches net.Listener diff --git a/p2p/net/swarm/dial_test.go b/p2p/net/swarm/dial_test.go index 66bac550a8..a6bc84e861 100644 --- a/p2p/net/swarm/dial_test.go +++ b/p2p/net/swarm/dial_test.go @@ -18,6 +18,61 @@ import ( ma "gx/ipfs/QmcobAGsCjYt5DXoq9et9L8yR8er7o7Cu3DTvpaq12jYSz/go-multiaddr" ) +func closeSwarms(swarms []*Swarm) { + for _, s := range swarms { + s.Close() + } +} + +func TestBasicDial(t *testing.T) { + t.Parallel() + ctx := context.Background() + + swarms := makeSwarms(ctx, t, 2) + defer closeSwarms(swarms) + s1 := swarms[0] + s2 := swarms[1] + + s1.peers.AddAddrs(s2.local, s2.ListenAddresses(), peer.PermanentAddrTTL) + + c, err := s1.Dial(ctx, s2.local) + if err != nil { + t.Fatal(err) + } + + s, err := c.NewStream() + if err != nil { + t.Fatal(err) + } + + s.Close() +} + +func TestDialWithNoListeners(t *testing.T) { + t.Parallel() + ctx := context.Background() + + s1 := makeDialOnlySwarm(ctx, t) + + swarms := makeSwarms(ctx, t, 1) + defer closeSwarms(swarms) + s2 := swarms[0] + + s1.peers.AddAddrs(s2.local, s2.ListenAddresses(), peer.PermanentAddrTTL) + + c, err := s1.Dial(ctx, s2.local) + if err != nil { + t.Fatal(err) + } + + s, err := c.NewStream() + if err != nil { + t.Fatal(err) + } + + s.Close() +} + func acceptAndHang(l net.Listener) { conns := make([]net.Conn, 0, 10) for { diff --git a/p2p/net/swarm/swarm_test.go b/p2p/net/swarm/swarm_test.go index 121d1b0f35..fe2ef2a207 100644 --- a/p2p/net/swarm/swarm_test.go +++ b/p2p/net/swarm/swarm_test.go @@ -24,31 +24,48 @@ func EchoStreamHandler(stream inet.Stream) { // pull out the ipfs conn c := stream.Conn() - log.Infof("%s ponging to %s", c.LocalPeer(), c.RemotePeer()) + log.Errorf("%s ponging to %s", c.LocalPeer(), c.RemotePeer()) buf := make([]byte, 4) for { if _, err := stream.Read(buf); err != nil { if err != io.EOF { - log.Info("ping receive error:", err) + log.Error("ping receive error:", err) } return } if !bytes.Equal(buf, []byte("ping")) { - log.Infof("ping receive error: ping != %s %v", buf, buf) + log.Errorf("ping receive error: ping != %s %v", buf, buf) return } if _, err := stream.Write([]byte("pong")); err != nil { - log.Info("pond send error:", err) + log.Error("pond send error:", err) return } } }() } +func makeDialOnlySwarm(ctx context.Context, t *testing.T) *Swarm { + id := testutil.RandIdentityOrFatal(t) + + peerstore := peer.NewPeerstore() + peerstore.AddPubKey(id.ID(), id.PublicKey()) + peerstore.AddPrivKey(id.ID(), id.PrivateKey()) + + swarm, err := NewSwarm(ctx, nil, id.ID(), peerstore, metrics.NewBandwidthCounter()) + if err != nil { + t.Fatal(err) + } + + swarm.SetStreamHandler(EchoStreamHandler) + + return swarm +} + func makeSwarms(ctx context.Context, t *testing.T, num int) []*Swarm { swarms := make([]*Swarm, 0, num) diff --git a/p2p/net/transport/fallback.go b/p2p/net/transport/fallback.go new file mode 100644 index 0000000000..ca8afe2dfb --- /dev/null +++ b/p2p/net/transport/fallback.go @@ -0,0 +1,63 @@ +package transport + +import ( + "fmt" + + manet "gx/ipfs/QmQB7mNP3QE7b4zP2MQmsyJDqG5hzYE2CL8k1VyLWky2Ed/go-multiaddr-net" + mautp "gx/ipfs/QmQB7mNP3QE7b4zP2MQmsyJDqG5hzYE2CL8k1VyLWky2Ed/go-multiaddr-net/utp" + utp "gx/ipfs/QmVs3wq4cN64TFCxANzgSHjGPrjMnRnwPrxU8bqc7YP42s/utp" + mafmt "gx/ipfs/QmWLfU4tstw2aNcTykDm44xbSTCYJ9pUJwfhQCKGwckcHx/mafmt" + ma "gx/ipfs/QmcobAGsCjYt5DXoq9et9L8yR8er7o7Cu3DTvpaq12jYSz/go-multiaddr" +) + +type FallbackDialer struct { + madialer manet.Dialer +} + +func (fbd *FallbackDialer) Matches(a ma.Multiaddr) bool { + return mafmt.TCP.Matches(a) || mafmt.UTP.Matches(a) +} + +func (fbd *FallbackDialer) Dial(a ma.Multiaddr) (Conn, error) { + if mafmt.TCP.Matches(a) { + return fbd.tcpDial(a) + } + if mafmt.UTP.Matches(a) { + } + return nil, fmt.Errorf("cannot dial %s with fallback dialer", a) +} + +func (fbd *FallbackDialer) tcpDial(raddr ma.Multiaddr) (Conn, error) { + var c manet.Conn + var err error + c, err = fbd.madialer.Dial(raddr) + + if err != nil { + return nil, err + } + + return &connWrap{ + Conn: c, + }, nil +} + +func (fbd *FallbackDialer) utpDial(raddr ma.Multiaddr) (Conn, error) { + _, addr, err := manet.DialArgs(raddr) + if err != nil { + return nil, err + } + + con, err := utp.Dial(addr) + if err != nil { + return nil, err + } + + mnc, err := manet.WrapNetConn(&mautp.Conn{Conn: con}) + if err != nil { + return nil, err + } + + return &connWrap{ + Conn: mnc, + }, nil +} From 5ec1f553f0ac1a5674742d1dcd5036bcebf256a2 Mon Sep 17 00:00:00 2001 From: Jeromy Date: Fri, 4 Mar 2016 16:16:58 -0800 Subject: [PATCH 2/2] fix fallback code and add a few new tests --- .travis.yml | 2 +- p2p/net/transport/fallback.go | 1 + p2p/net/transport/transport_test.go | 135 ++++++++++++++++++++++++++++ 3 files changed, 137 insertions(+), 1 deletion(-) create mode 100644 p2p/net/transport/transport_test.go diff --git a/.travis.yml b/.travis.yml index cf9fec9168..3ed1cf5ab6 100644 --- a/.travis.yml +++ b/.travis.yml @@ -5,7 +5,7 @@ os: language: go go: - - 1.5.1 + - 1.5.2 env: - GO15VENDOREXPERIMENT=1 diff --git a/p2p/net/transport/fallback.go b/p2p/net/transport/fallback.go index ca8afe2dfb..534bc3849f 100644 --- a/p2p/net/transport/fallback.go +++ b/p2p/net/transport/fallback.go @@ -23,6 +23,7 @@ func (fbd *FallbackDialer) Dial(a ma.Multiaddr) (Conn, error) { return fbd.tcpDial(a) } if mafmt.UTP.Matches(a) { + return fbd.tcpDial(a) } return nil, fmt.Errorf("cannot dial %s with fallback dialer", a) } diff --git a/p2p/net/transport/transport_test.go b/p2p/net/transport/transport_test.go new file mode 100644 index 0000000000..b81cdaefee --- /dev/null +++ b/p2p/net/transport/transport_test.go @@ -0,0 +1,135 @@ +package transport + +import ( + "fmt" + "io" + "testing" + + ma "gx/ipfs/QmcobAGsCjYt5DXoq9et9L8yR8er7o7Cu3DTvpaq12jYSz/go-multiaddr" +) + +func TestTcpTransport(t *testing.T) { + ta := NewTCPTransport() + tb := NewTCPTransport() + + zero := "/ip4/127.0.0.1/tcp/0" + subtestTransport(t, ta, tb, zero) +} + +func TestUtpTransport(t *testing.T) { + ta := NewUtpTransport() + tb := NewUtpTransport() + + zero := "/ip4/127.0.0.1/udp/0/utp" + subtestTransport(t, ta, tb, zero) +} + +func subtestTransport(t *testing.T, ta, tb Transport, addr string) { + maddr, err := ma.NewMultiaddr(addr) + if err != nil { + t.Fatal(err) + } + + list, err := ta.Listen(maddr) + if err != nil { + t.Fatal(err) + } + + dialer, err := tb.Dialer(maddr) + if err != nil { + t.Fatal(err) + } + + accepted := make(chan Conn, 1) + errs := make(chan error, 1) + go func() { + b, err := list.Accept() + if err != nil { + errs <- err + return + } + + accepted <- b + }() + + a, err := dialer.Dial(list.Multiaddr()) + if err != nil { + t.Fatal(err) + } + + var b Conn + select { + case b = <-accepted: + case err := <-errs: + t.Fatal(err) + } + + defer a.Close() + defer b.Close() + + err = checkDataTransfer(a, b) + if err != nil { + t.Fatal(err) + } + +} + +func checkDataTransfer(a, b io.ReadWriter) error { + errs := make(chan error, 2) + data := []byte("this is some test data") + + go func() { + n, err := a.Write(data) + if err != nil { + errs <- err + return + } + + if n != len(data) { + errs <- fmt.Errorf("failed to write enough data (a->b)") + return + } + + buf := make([]byte, len(data)) + _, err = io.ReadFull(a, buf) + if err != nil { + errs <- err + return + } + + errs <- nil + }() + + go func() { + buf := make([]byte, len(data)) + _, err := io.ReadFull(b, buf) + if err != nil { + errs <- err + return + } + + n, err := b.Write(data) + if err != nil { + errs <- err + return + } + + if n != len(data) { + errs <- fmt.Errorf("failed to write enough data (b->a)") + return + } + + errs <- nil + }() + + err := <-errs + if err != nil { + return err + } + err = <-errs + if err != nil { + return err + } + + return nil +}