Skip to content
This repository has been archived by the owner on May 26, 2022. It is now read-only.

Commit

Permalink
use the ResourceManager
Browse files Browse the repository at this point in the history
  • Loading branch information
marten-seemann committed Jan 1, 2022
1 parent 1c17791 commit c6ace78
Show file tree
Hide file tree
Showing 4 changed files with 30 additions and 12 deletions.
4 changes: 2 additions & 2 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -5,10 +5,10 @@ go 1.16
require (
github.com/ipfs/go-log/v2 v2.3.0
github.com/libp2p/go-conn-security-multistream v0.3.0
github.com/libp2p/go-libp2p-core v0.10.0
github.com/libp2p/go-libp2p-core v0.13.1-0.20211231121257-5fcc7b607478
github.com/libp2p/go-libp2p-mplex v0.4.1
github.com/libp2p/go-libp2p-testing v0.5.0
github.com/libp2p/go-libp2p-transport-upgrader v0.5.0
github.com/libp2p/go-libp2p-transport-upgrader v0.6.1-0.20220101094616-076e334df9ce
github.com/libp2p/go-netroute v0.1.5 // indirect
github.com/libp2p/go-reuseport v0.1.0
github.com/libp2p/go-reuseport-transport v0.1.0
Expand Down
7 changes: 4 additions & 3 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -204,8 +204,9 @@ github.com/libp2p/go-flow-metrics v0.0.3/go.mod h1:HeoSNUrOJVK1jEpDqVEiUOIXqhbnS
github.com/libp2p/go-libp2p-core v0.3.0/go.mod h1:ACp3DmS3/N64c2jDzcV429ukDpicbL6+TrrxANBjPGw=
github.com/libp2p/go-libp2p-core v0.5.0/go.mod h1:49XGI+kc38oGVwqSBhDEwytaAxgZasHhFfQKibzTls0=
github.com/libp2p/go-libp2p-core v0.8.0/go.mod h1:FfewUH/YpvWbEB+ZY9AQRQ4TAD8sJBt/G1rVvhz5XT8=
github.com/libp2p/go-libp2p-core v0.10.0 h1:jFy7v5Muq58GTeYkPhGzIH8Qq4BFfziqc0ixPd/pP9k=
github.com/libp2p/go-libp2p-core v0.10.0/go.mod h1:ECdxehoYosLYHgDDFa2N4yE8Y7aQRAMf0sX9mf2sbGg=
github.com/libp2p/go-libp2p-core v0.13.1-0.20211231121257-5fcc7b607478 h1:V4qhaW1u7hJ54vditELylooLRTTcc7wmirh0yPgcxaA=
github.com/libp2p/go-libp2p-core v0.13.1-0.20211231121257-5fcc7b607478/go.mod h1:ECdxehoYosLYHgDDFa2N4yE8Y7aQRAMf0sX9mf2sbGg=
github.com/libp2p/go-libp2p-mplex v0.4.1 h1:/pyhkP1nLwjG3OM+VuaNJkQT/Pqq73WzB3aDN3Fx1sc=
github.com/libp2p/go-libp2p-mplex v0.4.1/go.mod h1:cmy+3GfqfM1PceHTLL7zQzAAYaryDu6iPSC+CIb094g=
github.com/libp2p/go-libp2p-pnet v0.2.0 h1:J6htxttBipJujEjz1y0a5+eYoiPcFHhSYHH6na5f0/k=
Expand All @@ -214,8 +215,8 @@ github.com/libp2p/go-libp2p-testing v0.1.2-0.20200422005655-8775583591d8/go.mod
github.com/libp2p/go-libp2p-testing v0.4.0/go.mod h1:Q+PFXYoiYFN5CAEG2w3gLPEzotlKsNSbKQ/lImlOWF0=
github.com/libp2p/go-libp2p-testing v0.5.0 h1:bTjC29TTQ/ODq0ld3+0KLq3irdA5cAH3OMbRi0/QsvE=
github.com/libp2p/go-libp2p-testing v0.5.0/go.mod h1:QBk8fqIL1XNcno/l3/hhaIEn4aLRijpYOR+zVjjlh+A=
github.com/libp2p/go-libp2p-transport-upgrader v0.5.0 h1:7SDl3O2+AYOgfE40Mis83ClpfGNkNA6m4FwhbOHs+iI=
github.com/libp2p/go-libp2p-transport-upgrader v0.5.0/go.mod h1:Rc+XODlB3yce7dvFV4q/RmyJGsFcCZRkeZMu/Zdg0mo=
github.com/libp2p/go-libp2p-transport-upgrader v0.6.1-0.20220101094616-076e334df9ce h1:c/1P03KzVcucwj8CaREviCNpYHLYfyuKQzFLT80BnZ4=
github.com/libp2p/go-libp2p-transport-upgrader v0.6.1-0.20220101094616-076e334df9ce/go.mod h1:gRl3evtqJSAjlPvzYGVp3zBPWd0/ni/DUhB3wA67Krg=
github.com/libp2p/go-maddr-filter v0.1.0/go.mod h1:VzZhTXkMucEGGEOSKddrwGiOv0tUhgnKqNEmIAz/bPU=
github.com/libp2p/go-mplex v0.3.0 h1:U1T+vmCYJaEoDJPV1aq31N56hS+lJgb397GsylNSgrU=
github.com/libp2p/go-mplex v0.3.0/go.mod h1:0Oy/A9PQlwBytDRp4wSkFnzHYDKcpLot35JQ6msjvYQ=
Expand Down
23 changes: 20 additions & 3 deletions tcp.go
Original file line number Diff line number Diff line change
Expand Up @@ -95,6 +95,7 @@ func DisableReuseport() Option {
return nil
}
}

func WithConnectionTimeout(d time.Duration) Option {
return func(tr *TcpTransport) error {
tr.connectTimeout = d
Expand All @@ -114,17 +115,23 @@ type TcpTransport struct {
// TCP connect timeout
connectTimeout time.Duration

rcmgr network.ResourceManager

reuse rtpt.Transport
}

var _ transport.Transport = &TcpTransport{}

// NewTCPTransport creates a tcp transport object that tracks dialers and listeners
// created. It represents an entire TCP stack (though it might not necessarily be).
func NewTCPTransport(upgrader *tptu.Upgrader, opts ...Option) (*TcpTransport, error) {
func NewTCPTransport(upgrader *tptu.Upgrader, rcmgr network.ResourceManager, opts ...Option) (*TcpTransport, error) {
if rcmgr == nil {
rcmgr = network.NullResourceManager
}
tr := &TcpTransport{
Upgrader: upgrader,
connectTimeout: defaultConnectTimeout, // can be set by using the WithConnectionTimeout option
rcmgr: rcmgr,
}
for _, o := range opts {
if err := o(tr); err != nil {
Expand Down Expand Up @@ -159,8 +166,17 @@ func (t *TcpTransport) maDial(ctx context.Context, raddr ma.Multiaddr) (manet.Co

// Dial dials the peer at the remote address.
func (t *TcpTransport) Dial(ctx context.Context, raddr ma.Multiaddr, p peer.ID) (transport.CapableConn, error) {
connScope, err := t.rcmgr.OpenConnection(network.DirOutbound, true)
if err != nil {
return nil, err
}
if err := connScope.SetPeer(p); err != nil {
connScope.Done()
return nil, err
}
conn, err := t.maDial(ctx, raddr)
if err != nil {
connScope.Done()
return nil, err
}
// Set linger to 0 so we never get stuck in the TIME-WAIT state. When
Expand All @@ -170,13 +186,14 @@ func (t *TcpTransport) Dial(ctx context.Context, raddr ma.Multiaddr, p peer.ID)
tryKeepAlive(conn, true)
c, err := newTracingConn(conn, true)
if err != nil {
connScope.Done()
return nil, err
}
direction := network.DirOutbound
if ok, isClient, _ := network.GetSimultaneousConnect(ctx); ok && !isClient {
direction = network.DirInbound
}
return t.Upgrader.Upgrade(ctx, t, c, direction, p)
return t.Upgrader.Upgrade(ctx, t, c, direction, p, connScope)
}

// UseReuseport returns true if reuseport is enabled and available.
Expand All @@ -198,7 +215,7 @@ func (t *TcpTransport) Listen(laddr ma.Multiaddr) (transport.Listener, error) {
return nil, err
}
list = newTracingListener(&tcpListener{list, 0})
return t.Upgrader.UpgradeListener(t, list), nil
return t.Upgrader.UpgradeListener(t, list, t.rcmgr), nil
}

// Protocols returns the list of terminal protocols this transport can dial.
Expand Down
8 changes: 4 additions & 4 deletions tcp_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,12 +25,12 @@ func TestTcpTransport(t *testing.T) {
ta, err := NewTCPTransport(&tptu.Upgrader{
Secure: ia,
Muxer: new(mplex.Transport),
})
}, nil)
require.NoError(t, err)
tb, err := NewTCPTransport(&tptu.Upgrader{
Secure: ib,
Muxer: new(mplex.Transport),
})
}, nil)
require.NoError(t, err)

zero := "/ip4/127.0.0.1/tcp/0"
Expand All @@ -50,7 +50,7 @@ func TestTcpTransportCantDialDNS(t *testing.T) {
tpt, err := NewTCPTransport(&tptu.Upgrader{
Secure: sm,
Muxer: new(mplex.Transport),
})
}, nil)
require.NoError(t, err)

if tpt.CanDial(dnsa) {
Expand All @@ -71,7 +71,7 @@ func TestTcpTransportCantListenUtp(t *testing.T) {
tpt, err := NewTCPTransport(&tptu.Upgrader{
Secure: sm,
Muxer: new(mplex.Transport),
})
}, nil)
require.NoError(t, err)

_, err = tpt.Listen(utpa)
Expand Down

0 comments on commit c6ace78

Please sign in to comment.