Skip to content

Commit

Permalink
Merge pull request #33 from libp2p/feat/refactor
Browse files Browse the repository at this point in the history
Update for transport refactor
  • Loading branch information
Stebalien authored Jun 6, 2018
2 parents 79046ed + 98d6147 commit 2dea38c
Show file tree
Hide file tree
Showing 8 changed files with 119 additions and 236 deletions.
33 changes: 1 addition & 32 deletions p2p/protocol/internal/circuitv1-deprecated/conn.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,24 +4,17 @@ import (
"fmt"
"net"

ic "github.com/libp2p/go-libp2p-crypto"
iconn "github.com/libp2p/go-libp2p-interface-conn"
inet "github.com/libp2p/go-libp2p-net"
peer "github.com/libp2p/go-libp2p-peer"
pstore "github.com/libp2p/go-libp2p-peerstore"
tpt "github.com/libp2p/go-libp2p-transport"
ma "github.com/multiformats/go-multiaddr"
manet "github.com/multiformats/go-multiaddr-net"
)

type Conn struct {
inet.Stream
remote pstore.PeerInfo
transport tpt.Transport
remote pstore.PeerInfo
}

var _ iconn.Conn = (*Conn)(nil)

type NetAddr struct {
Relay string
Remote string
Expand Down Expand Up @@ -62,27 +55,3 @@ func (c *Conn) LocalAddr() net.Addr {
}
return na
}

func (c *Conn) Transport() tpt.Transport {
return c.transport
}

func (c *Conn) LocalPeer() peer.ID {
return c.Conn().LocalPeer()
}

func (c *Conn) RemotePeer() peer.ID {
return c.remote.ID
}

func (c *Conn) LocalPrivateKey() ic.PrivKey {
return nil
}

func (c *Conn) RemotePublicKey() ic.PubKey {
return nil
}

func (c *Conn) ID() string {
return iconn.ID(c)
}
48 changes: 18 additions & 30 deletions p2p/protocol/internal/circuitv1-deprecated/dial.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,24 +11,16 @@ import (
ma "github.com/multiformats/go-multiaddr"
)

var _ tpt.Dialer = (*RelayDialer)(nil)

type RelayDialer Relay

func (d *RelayDialer) Relay() *Relay {
return (*Relay)(d)
}

func (r *Relay) Dialer() *RelayDialer {
return (*RelayDialer)(r)
}

func (d *RelayDialer) Dial(a ma.Multiaddr) (tpt.Conn, error) {
return d.DialContext(d.ctx, a)
func (d *RelayTransport) Dial(ctx context.Context, a ma.Multiaddr, p peer.ID) (tpt.Conn, error) {
c, err := d.Relay().Dial(ctx, a)
if err != nil {
return nil, err
}
return d.upgrader.UpgradeOutbound(ctx, d, c, p)
}

func (d *RelayDialer) DialContext(ctx context.Context, a ma.Multiaddr) (tpt.Conn, error) {
if !d.Matches(a) {
func (r *Relay) Dial(ctx context.Context, a ma.Multiaddr) (*Conn, error) {
if !r.Matches(a) {
return nil, fmt.Errorf("%s is not a relay address", a)
}
parts := ma.Split(a)
Expand All @@ -51,24 +43,25 @@ func (d *RelayDialer) DialContext(ctx context.Context, a ma.Multiaddr) (tpt.Conn

if len(relayaddr.Bytes()) == 0 {
// unspecific relay address, try dialing using known hop relays
return d.tryDialRelays(ctx, *dinfo)
return r.tryDialRelays(ctx, *dinfo)
}

rinfo, err := pstore.InfoFromP2pAddr(relayaddr)
var rinfo *pstore.PeerInfo
rinfo, err = pstore.InfoFromP2pAddr(relayaddr)
if err != nil {
return nil, err
}

return d.Relay().DialPeer(ctx, *rinfo, *dinfo)
return r.DialPeer(ctx, *rinfo, *dinfo)
}

func (d *RelayDialer) tryDialRelays(ctx context.Context, dinfo pstore.PeerInfo) (tpt.Conn, error) {
func (r *Relay) tryDialRelays(ctx context.Context, dinfo pstore.PeerInfo) (*Conn, error) {
var relays []peer.ID
d.mx.Lock()
for p := range d.relays {
r.mx.Lock()
for p := range r.relays {
relays = append(relays, p)
}
d.mx.Unlock()
r.mx.Unlock()

// shuffle list of relays, avoid overloading a specific relay
for i := range relays {
Expand All @@ -77,12 +70,12 @@ func (d *RelayDialer) tryDialRelays(ctx context.Context, dinfo pstore.PeerInfo)
}

for _, relay := range relays {
if len(d.host.Network().ConnsToPeer(relay)) == 0 {
if len(r.host.Network().ConnsToPeer(relay)) == 0 {
continue
}

rctx, cancel := context.WithTimeout(ctx, HopConnectTimeout)
c, err := d.Relay().DialPeer(rctx, pstore.PeerInfo{ID: relay}, dinfo)
c, err := r.DialPeer(rctx, pstore.PeerInfo{ID: relay}, dinfo)
cancel()

if err == nil {
Expand All @@ -94,8 +87,3 @@ func (d *RelayDialer) tryDialRelays(ctx context.Context, dinfo pstore.PeerInfo)

return nil, fmt.Errorf("Failed to dial through %d known relay hosts", len(relays))
}

func (d *RelayDialer) Matches(a ma.Multiaddr) bool {
_, err := a.ValueForProtocol(P_CIRCUIT)
return err == nil
}
20 changes: 6 additions & 14 deletions p2p/protocol/internal/circuitv1-deprecated/listen.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,13 +6,11 @@ import (

pb "github.com/libp2p/go-libp2p-circuit/pb"

peer "github.com/libp2p/go-libp2p-peer"
tpt "github.com/libp2p/go-libp2p-transport"
filter "github.com/libp2p/go-maddr-filter"
ma "github.com/multiformats/go-multiaddr"
manet "github.com/multiformats/go-multiaddr-net"
)

var _ tpt.Listener = (*RelayListener)(nil)
var _ manet.Listener = (*RelayListener)(nil)

type RelayListener Relay

Expand All @@ -21,10 +19,11 @@ func (l *RelayListener) Relay() *Relay {
}

func (r *Relay) Listener() *RelayListener {
// TODO: Only allow one!
return (*RelayListener)(r)
}

func (l *RelayListener) Accept() (tpt.Conn, error) {
func (l *RelayListener) Accept() (manet.Conn, error) {
select {
case c := <-l.incoming:
err := l.Relay().writeResponse(c.Stream, pb.CircuitRelay_SUCCESS)
Expand All @@ -34,7 +33,8 @@ func (l *RelayListener) Accept() (tpt.Conn, error) {
return nil, err
}

log.Infof("accepted relay connection: %s", c.ID())
// TODO: Pretty print.
log.Infof("accepted relay connection: %s", c)

return c, nil
case <-l.ctx.Done():
Expand All @@ -57,14 +57,6 @@ func (l *RelayListener) Multiaddr() ma.Multiaddr {
return a
}

func (l *RelayListener) LocalPeer() peer.ID {
return l.self
}

func (l *RelayListener) SetAddrFilters(f *filter.Filters) {
// noop ?
}

func (l *RelayListener) Close() error {
// TODO: noop?
return nil
Expand Down
2 changes: 1 addition & 1 deletion p2p/protocol/internal/circuitv1-deprecated/notify.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ func (n *RelayNotifiee) OpenedStream(net inet.Network, s inet.Stream) {}
func (n *RelayNotifiee) ClosedStream(net inet.Network, s inet.Stream) {}

func (n *RelayNotifiee) Connected(s inet.Network, c inet.Conn) {
if n.Relay().Transport().Matches(c.RemoteMultiaddr()) {
if n.Relay().Matches(c.RemoteMultiaddr()) {
return
}

Expand Down
38 changes: 24 additions & 14 deletions p2p/protocol/internal/circuitv1-deprecated/relay.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,8 @@ import (
inet "github.com/libp2p/go-libp2p-net"
peer "github.com/libp2p/go-libp2p-peer"
pstore "github.com/libp2p/go-libp2p-peerstore"
tptu "github.com/libp2p/go-libp2p-transport-upgrader"
ma "github.com/multiformats/go-multiaddr"
)

var log = logging.Logger("relay")
Expand All @@ -26,9 +28,10 @@ var RelayAcceptTimeout = time.Minute
var HopConnectTimeout = 10 * time.Second

type Relay struct {
host host.Host
ctx context.Context
self peer.ID
host host.Host
upgrader *tptu.Upgrader
ctx context.Context
self peer.ID

active bool
hop bool
Expand All @@ -54,8 +57,9 @@ func (e RelayError) Error() string {
return fmt.Sprintf("error opening relay circuit: %s (%d)", pb.CircuitRelay_Status_name[int32(e.Code)], e.Code)
}

func NewRelay(ctx context.Context, h host.Host, opts ...RelayOpt) (*Relay, error) {
func NewRelay(ctx context.Context, h host.Host, upgrader *tptu.Upgrader, opts ...RelayOpt) (*Relay, error) {
r := &Relay{
upgrader: upgrader,
host: h,
ctx: ctx,
self: h.ID(),
Expand Down Expand Up @@ -126,7 +130,13 @@ func (r *Relay) DialPeer(ctx context.Context, relay pstore.PeerInfo, dest pstore
return nil, RelayError{msg.GetCode()}
}

return &Conn{Stream: s, remote: dest, transport: r.Transport()}, nil
return &Conn{Stream: s, remote: dest}, nil
}

func (r *Relay) Matches(addr ma.Multiaddr) bool {
// TODO: Look at the prefix transport as well.
_, err := addr.ValueForProtocol(P_CIRCUIT)
return err == nil
}

func (r *Relay) CanHop(ctx context.Context, id peer.ID) (bool, error) {
Expand All @@ -142,18 +152,18 @@ func (r *Relay) CanHop(ctx context.Context, id peer.ID) (bool, error) {

msg.Type = pb.CircuitRelay_CAN_HOP.Enum()

err = wr.WriteMsg(&msg)
if err != nil {
if err := wr.WriteMsg(&msg); err != nil {
s.Reset()
return false, err
}

msg.Reset()

err = rd.ReadMsg(&msg)
s.Close()

if err != nil {
if err := rd.ReadMsg(&msg); err != nil {
s.Reset()
return false, err
}
if err := inet.FullClose(s); err != nil {
return false, err
}

Expand Down Expand Up @@ -340,7 +350,7 @@ func (r *Relay) handleStopStream(s inet.Stream, msg *pb.CircuitRelay) {
}

select {
case r.incoming <- &Conn{Stream: s, remote: src, transport: r.Transport()}:
case r.incoming <- &Conn{Stream: s, remote: src}:
case <-time.After(RelayAcceptTimeout):
r.handleError(s, pb.CircuitRelay_STOP_RELAY_REFUSED)
}
Expand All @@ -359,7 +369,7 @@ func (r *Relay) handleCanHop(s inet.Stream, msg *pb.CircuitRelay) {
s.Reset()
log.Debugf("error writing relay response: %s", err.Error())
} else {
s.Close()
inet.FullClose(s)
}
}

Expand All @@ -370,7 +380,7 @@ func (r *Relay) handleError(s inet.Stream, code pb.CircuitRelay_Status) {
s.Reset()
log.Debugf("error writing relay response: %s", err.Error())
} else {
s.Close()
inet.FullClose(s)
}
}

Expand Down
Loading

0 comments on commit 2dea38c

Please sign in to comment.