Skip to content

Commit

Permalink
Merge pull request #22 from ipfs/feat/fallback-dialer
Browse files Browse the repository at this point in the history
Add fallback dialer
  • Loading branch information
whyrusleeping committed Mar 5, 2016
2 parents 03aa3d2 + 5ec1f55 commit be1f558
Show file tree
Hide file tree
Showing 7 changed files with 283 additions and 5 deletions.
2 changes: 1 addition & 1 deletion .travis.yml
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ os:
language: go

go:
- 1.5.1
- 1.5.2

env:
- GO15VENDOREXPERIMENT=1
Expand Down
5 changes: 5 additions & 0 deletions p2p/net/conn/dial.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ func NewDialer(p peer.ID, pk ci.PrivKey, wrap WrapFunc) *Dialer {
LocalPeer: p,
PrivateKey: pk,
Wrapper: wrap,
fallback: new(transport.FallbackDialer),
}
}

Expand Down Expand Up @@ -115,6 +116,10 @@ func (d *Dialer) subDialerForAddr(raddr ma.Multiaddr) transport.Dialer {
}
}

if d.fallback.Matches(raddr) {
return d.fallback
}

return nil
}

Expand Down
2 changes: 2 additions & 0 deletions p2p/net/conn/interface.go
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,8 @@ type Dialer struct {

// Wrapper to wrap the raw connection (optional)
Wrapper WrapFunc

fallback transport.Dialer
}

// Listener is an object that can accept connections. It matches net.Listener
Expand Down
55 changes: 55 additions & 0 deletions p2p/net/swarm/dial_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,61 @@ import (
ma "gx/ipfs/QmcobAGsCjYt5DXoq9et9L8yR8er7o7Cu3DTvpaq12jYSz/go-multiaddr"
)

func closeSwarms(swarms []*Swarm) {
for _, s := range swarms {
s.Close()
}
}

func TestBasicDial(t *testing.T) {
t.Parallel()
ctx := context.Background()

swarms := makeSwarms(ctx, t, 2)
defer closeSwarms(swarms)
s1 := swarms[0]
s2 := swarms[1]

s1.peers.AddAddrs(s2.local, s2.ListenAddresses(), peer.PermanentAddrTTL)

c, err := s1.Dial(ctx, s2.local)
if err != nil {
t.Fatal(err)
}

s, err := c.NewStream()
if err != nil {
t.Fatal(err)
}

s.Close()
}

func TestDialWithNoListeners(t *testing.T) {
t.Parallel()
ctx := context.Background()

s1 := makeDialOnlySwarm(ctx, t)

swarms := makeSwarms(ctx, t, 1)
defer closeSwarms(swarms)
s2 := swarms[0]

s1.peers.AddAddrs(s2.local, s2.ListenAddresses(), peer.PermanentAddrTTL)

c, err := s1.Dial(ctx, s2.local)
if err != nil {
t.Fatal(err)
}

s, err := c.NewStream()
if err != nil {
t.Fatal(err)
}

s.Close()
}

func acceptAndHang(l net.Listener) {
conns := make([]net.Conn, 0, 10)
for {
Expand Down
25 changes: 21 additions & 4 deletions p2p/net/swarm/swarm_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,31 +24,48 @@ func EchoStreamHandler(stream inet.Stream) {

// pull out the ipfs conn
c := stream.Conn()
log.Infof("%s ponging to %s", c.LocalPeer(), c.RemotePeer())
log.Errorf("%s ponging to %s", c.LocalPeer(), c.RemotePeer())

buf := make([]byte, 4)

for {
if _, err := stream.Read(buf); err != nil {
if err != io.EOF {
log.Info("ping receive error:", err)
log.Error("ping receive error:", err)
}
return
}

if !bytes.Equal(buf, []byte("ping")) {
log.Infof("ping receive error: ping != %s %v", buf, buf)
log.Errorf("ping receive error: ping != %s %v", buf, buf)
return
}

if _, err := stream.Write([]byte("pong")); err != nil {
log.Info("pond send error:", err)
log.Error("pond send error:", err)
return
}
}
}()
}

func makeDialOnlySwarm(ctx context.Context, t *testing.T) *Swarm {
id := testutil.RandIdentityOrFatal(t)

peerstore := peer.NewPeerstore()
peerstore.AddPubKey(id.ID(), id.PublicKey())
peerstore.AddPrivKey(id.ID(), id.PrivateKey())

swarm, err := NewSwarm(ctx, nil, id.ID(), peerstore, metrics.NewBandwidthCounter())
if err != nil {
t.Fatal(err)
}

swarm.SetStreamHandler(EchoStreamHandler)

return swarm
}

func makeSwarms(ctx context.Context, t *testing.T, num int) []*Swarm {
swarms := make([]*Swarm, 0, num)

Expand Down
64 changes: 64 additions & 0 deletions p2p/net/transport/fallback.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,64 @@
package transport

import (
"fmt"

manet "gx/ipfs/QmQB7mNP3QE7b4zP2MQmsyJDqG5hzYE2CL8k1VyLWky2Ed/go-multiaddr-net"
mautp "gx/ipfs/QmQB7mNP3QE7b4zP2MQmsyJDqG5hzYE2CL8k1VyLWky2Ed/go-multiaddr-net/utp"
utp "gx/ipfs/QmVs3wq4cN64TFCxANzgSHjGPrjMnRnwPrxU8bqc7YP42s/utp"
mafmt "gx/ipfs/QmWLfU4tstw2aNcTykDm44xbSTCYJ9pUJwfhQCKGwckcHx/mafmt"
ma "gx/ipfs/QmcobAGsCjYt5DXoq9et9L8yR8er7o7Cu3DTvpaq12jYSz/go-multiaddr"
)

type FallbackDialer struct {
madialer manet.Dialer
}

func (fbd *FallbackDialer) Matches(a ma.Multiaddr) bool {
return mafmt.TCP.Matches(a) || mafmt.UTP.Matches(a)
}

func (fbd *FallbackDialer) Dial(a ma.Multiaddr) (Conn, error) {
if mafmt.TCP.Matches(a) {
return fbd.tcpDial(a)
}
if mafmt.UTP.Matches(a) {
return fbd.tcpDial(a)
}
return nil, fmt.Errorf("cannot dial %s with fallback dialer", a)
}

func (fbd *FallbackDialer) tcpDial(raddr ma.Multiaddr) (Conn, error) {
var c manet.Conn
var err error
c, err = fbd.madialer.Dial(raddr)

if err != nil {
return nil, err
}

return &connWrap{
Conn: c,
}, nil
}

func (fbd *FallbackDialer) utpDial(raddr ma.Multiaddr) (Conn, error) {
_, addr, err := manet.DialArgs(raddr)
if err != nil {
return nil, err
}

con, err := utp.Dial(addr)
if err != nil {
return nil, err
}

mnc, err := manet.WrapNetConn(&mautp.Conn{Conn: con})
if err != nil {
return nil, err
}

return &connWrap{
Conn: mnc,
}, nil
}
135 changes: 135 additions & 0 deletions p2p/net/transport/transport_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,135 @@
package transport

import (
"fmt"
"io"
"testing"

ma "gx/ipfs/QmcobAGsCjYt5DXoq9et9L8yR8er7o7Cu3DTvpaq12jYSz/go-multiaddr"
)

func TestTcpTransport(t *testing.T) {
ta := NewTCPTransport()
tb := NewTCPTransport()

zero := "/ip4/127.0.0.1/tcp/0"
subtestTransport(t, ta, tb, zero)
}

func TestUtpTransport(t *testing.T) {
ta := NewUtpTransport()
tb := NewUtpTransport()

zero := "/ip4/127.0.0.1/udp/0/utp"
subtestTransport(t, ta, tb, zero)
}

func subtestTransport(t *testing.T, ta, tb Transport, addr string) {
maddr, err := ma.NewMultiaddr(addr)
if err != nil {
t.Fatal(err)
}

list, err := ta.Listen(maddr)
if err != nil {
t.Fatal(err)
}

dialer, err := tb.Dialer(maddr)
if err != nil {
t.Fatal(err)
}

accepted := make(chan Conn, 1)
errs := make(chan error, 1)
go func() {
b, err := list.Accept()
if err != nil {
errs <- err
return
}

accepted <- b
}()

a, err := dialer.Dial(list.Multiaddr())
if err != nil {
t.Fatal(err)
}

var b Conn
select {
case b = <-accepted:
case err := <-errs:
t.Fatal(err)
}

defer a.Close()
defer b.Close()

err = checkDataTransfer(a, b)
if err != nil {
t.Fatal(err)
}

}

func checkDataTransfer(a, b io.ReadWriter) error {
errs := make(chan error, 2)
data := []byte("this is some test data")

go func() {
n, err := a.Write(data)
if err != nil {
errs <- err
return
}

if n != len(data) {
errs <- fmt.Errorf("failed to write enough data (a->b)")
return
}

buf := make([]byte, len(data))
_, err = io.ReadFull(a, buf)
if err != nil {
errs <- err
return
}

errs <- nil
}()

go func() {
buf := make([]byte, len(data))
_, err := io.ReadFull(b, buf)
if err != nil {
errs <- err
return
}

n, err := b.Write(data)
if err != nil {
errs <- err
return
}

if n != len(data) {
errs <- fmt.Errorf("failed to write enough data (b->a)")
return
}

errs <- nil
}()

err := <-errs
if err != nil {
return err
}
err = <-errs
if err != nil {
return err
}

return nil
}

0 comments on commit be1f558

Please sign in to comment.