Skip to content
This repository has been archived by the owner on May 26, 2022. It is now read-only.

add constructor options for timeout, stop using transport.DialTimeout #302

Merged
merged 1 commit into from
Dec 20, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
52 changes: 26 additions & 26 deletions dial_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,6 @@ import (
"github.com/libp2p/go-libp2p-core/peer"
"github.com/libp2p/go-libp2p-core/peerstore"
testutil "github.com/libp2p/go-libp2p-core/test"
"github.com/libp2p/go-libp2p-core/transport"
swarmt "github.com/libp2p/go-libp2p-swarm/testing"
"github.com/libp2p/go-libp2p-testing/ci"

Expand All @@ -24,10 +23,6 @@ import (
"github.com/stretchr/testify/require"
)

func init() {
transport.DialTimeout = time.Second
}

func closeSwarms(swarms []*Swarm) {
for _, s := range swarms {
s.Close()
Expand Down Expand Up @@ -161,8 +156,9 @@ func newSilentPeer(t *testing.T) (peer.ID, ma.Multiaddr, net.Listener) {
func TestDialWait(t *testing.T) {
t.Parallel()

ctx := context.Background()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

again here, why are we deopping this context?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Because it didn't do anything, it's a context.Background().

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ok.

swarms := makeSwarms(t, 1)
const dialTimeout = 250 * time.Millisecond

swarms := makeSwarms(t, 1, swarmt.DialTimeout(dialTimeout))
s1 := swarms[0]
defer s1.Close()

Expand All @@ -173,19 +169,19 @@ func TestDialWait(t *testing.T) {
s1.Peerstore().AddAddr(s2p, s2addr, peerstore.PermanentAddrTTL)

before := time.Now()
if c, err := s1.DialPeer(ctx, s2p); err == nil {
if c, err := s1.DialPeer(context.Background(), s2p); err == nil {
defer c.Close()
t.Fatal("error swarm dialing to unknown peer worked...", err)
} else {
t.Log("correctly got error:", err)
}
duration := time.Since(before)

if duration < transport.DialTimeout*DialAttempts {
t.Error("< transport.DialTimeout * DialAttempts not being respected", duration, transport.DialTimeout*DialAttempts)
if duration < dialTimeout*DialAttempts {
t.Error("< dialTimeout * DialAttempts not being respected", duration, dialTimeout*DialAttempts)
}
if duration > 2*transport.DialTimeout*DialAttempts {
t.Error("> 2*transport.DialTimeout * DialAttempts not being respected", duration, 2*transport.DialTimeout*DialAttempts)
if duration > 2*dialTimeout*DialAttempts {
t.Error("> 2*dialTimeout * DialAttempts not being respected", duration, 2*dialTimeout*DialAttempts)
}

if !s1.Backoff().Backoff(s2p, s2addr) {
Expand All @@ -194,15 +190,16 @@ func TestDialWait(t *testing.T) {
}

func TestDialBackoff(t *testing.T) {
// t.Skip("skipping for another test")
if ci.IsRunning() {
t.Skip("travis will never have fun with this test")
}

t.Parallel()

const dialTimeout = 250 * time.Millisecond

ctx := context.Background()
swarms := makeSwarms(t, 2)
swarms := makeSwarms(t, 2, swarmt.DialTimeout(dialTimeout))
s1 := swarms[0]
s2 := swarms[1]
defer s1.Close()
Expand Down Expand Up @@ -269,8 +266,8 @@ func TestDialBackoff(t *testing.T) {
s3done := dialOfflineNode(s3p, N)

// when all dials should be done by:
dialTimeout1x := time.After(transport.DialTimeout)
dialTimeout10Ax := time.After(transport.DialTimeout * 2 * 10) // DialAttempts * 10)
dialTimeout1x := time.After(dialTimeout)
dialTimeout10Ax := time.After(dialTimeout * 2 * 10) // DialAttempts * 10)

// 2) all dials should hang
select {
Expand Down Expand Up @@ -352,8 +349,8 @@ func TestDialBackoff(t *testing.T) {
s3done := dialOfflineNode(s3p, N)

// when all dials should be done by:
dialTimeout1x := time.After(transport.DialTimeout)
dialTimeout10Ax := time.After(transport.DialTimeout * 2 * 10) // DialAttempts * 10)
dialTimeout1x := time.After(dialTimeout)
dialTimeout10Ax := time.After(dialTimeout * 2 * 10) // DialAttempts * 10)

// 7) s3 dials should all return immediately (except 1)
for i := 0; i < N-1; i++ {
Expand Down Expand Up @@ -405,11 +402,12 @@ func TestDialBackoff(t *testing.T) {
}

func TestDialBackoffClears(t *testing.T) {
// t.Skip("skipping for another test")
t.Parallel()

const dialTimeout = 250 * time.Millisecond

ctx := context.Background()
swarms := makeSwarms(t, 2)
swarms := makeSwarms(t, 2, swarmt.DialTimeout(dialTimeout))
s1 := swarms[0]
s2 := swarms[1]
defer s1.Close()
Expand All @@ -433,11 +431,11 @@ func TestDialBackoffClears(t *testing.T) {
}
duration := time.Since(before)

if duration < transport.DialTimeout*DialAttempts {
t.Error("< transport.DialTimeout * DialAttempts not being respected", duration, transport.DialTimeout*DialAttempts)
if duration < dialTimeout*DialAttempts {
t.Error("< dialTimeout * DialAttempts not being respected", duration, dialTimeout*DialAttempts)
}
if duration > 2*transport.DialTimeout*DialAttempts {
t.Error("> 2*transport.DialTimeout * DialAttempts not being respected", duration, 2*transport.DialTimeout*DialAttempts)
if duration > 2*dialTimeout*DialAttempts {
t.Error("> 2*dialTimeout * DialAttempts not being respected", duration, 2*dialTimeout*DialAttempts)
}

if !s1.Backoff().Backoff(s2.LocalPeer(), s2bad) {
Expand Down Expand Up @@ -561,7 +559,9 @@ func TestDialSimultaneousJoin(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()

swarms := makeSwarms(t, 2)
const dialTimeout = 250 * time.Millisecond

swarms := makeSwarms(t, 2, swarmt.DialTimeout(dialTimeout))
s1 := swarms[0]
s2 := swarms[1]
defer s1.Close()
Expand Down Expand Up @@ -654,7 +654,7 @@ func TestDialSimultaneousJoin(t *testing.T) {
if c1 != c2 {
t.Fatal("expected c1 and c2 to be the same")
}
case <-time.After(2 * transport.DialTimeout):
case <-time.After(2 * dialTimeout):
t.Fatal("no connection from first dial")
}
}
Expand Down
20 changes: 6 additions & 14 deletions limiter.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,25 +20,17 @@ type dialResult struct {
}

type dialJob struct {
addr ma.Multiaddr
peer peer.ID
ctx context.Context
resp chan dialResult
addr ma.Multiaddr
peer peer.ID
ctx context.Context
resp chan dialResult
timeout time.Duration
}

func (dj *dialJob) cancelled() bool {
return dj.ctx.Err() != nil
}

func (dj *dialJob) dialTimeout() time.Duration {
timeout := transport.DialTimeout
if lowTimeoutFilters.AddrBlocked(dj.addr) {
timeout = DialTimeoutLocal
}

return timeout
}

type dialLimiter struct {
lk sync.Mutex

Expand Down Expand Up @@ -221,7 +213,7 @@ func (dl *dialLimiter) executeDial(j *dialJob) {
return
}

dctx, cancel := context.WithTimeout(j.ctx, j.dialTimeout())
dctx, cancel := context.WithTimeout(j.ctx, j.timeout)
defer cancel()

con, err := dl.dialFunc(dctx, j.peer, j.addr)
Expand Down
41 changes: 32 additions & 9 deletions swarm.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,11 +21,15 @@ import (
ma "github.com/multiformats/go-multiaddr"
)

// DialTimeoutLocal is the maximum duration a Dial to local network address
// is allowed to take.
// This includes the time between dialing the raw network connection,
// protocol selection as well the handshake, if applicable.
var DialTimeoutLocal = 5 * time.Second
const (
defaultDialTimeout = 15 * time.Second

// defaultDialTimeoutLocal is the maximum duration a Dial to local network address
// is allowed to take.
// This includes the time between dialing the raw network connection,
// protocol selection as well the handshake, if applicable.
defaultDialTimeoutLocal = 5 * time.Second
)

var log = logging.Logger("swarm2")

Expand Down Expand Up @@ -58,6 +62,20 @@ func WithMetrics(reporter metrics.Reporter) Option {
}
}

func WithDialTimeout(t time.Duration) Option {
return func(s *Swarm) error {
s.dialTimeout = t
return nil
}
}

func WithDialTimeoutLocal(t time.Duration) Option {
return func(s *Swarm) error {
s.dialTimeoutLocal = t
return nil
}
}

// Swarm is a connection muxer, allowing connections to other peers to
// be opened and closed, while still using the same Chan for all
// communication. The Chan sends/receives Messages, which note the
Expand All @@ -73,6 +91,9 @@ type Swarm struct {
local peer.ID
peers peerstore.Peerstore

dialTimeout time.Duration
dialTimeoutLocal time.Duration

conns struct {
sync.RWMutex
m map[peer.ID][]*Conn
Expand Down Expand Up @@ -117,10 +138,12 @@ type Swarm struct {
func NewSwarm(local peer.ID, peers peerstore.Peerstore, opts ...Option) (*Swarm, error) {
ctx, cancel := context.WithCancel(context.Background())
s := &Swarm{
local: local,
peers: peers,
ctx: ctx,
ctxCancel: cancel,
local: local,
peers: peers,
ctx: ctx,
ctxCancel: cancel,
dialTimeout: defaultDialTimeout,
dialTimeoutLocal: defaultDialTimeoutLocal,
}

s.conns.m = make(map[peer.ID][]*Conn)
Expand Down
17 changes: 11 additions & 6 deletions swarm_dial.go
Original file line number Diff line number Diff line change
Expand Up @@ -279,10 +279,10 @@ func (s *Swarm) dialPeer(ctx context.Context, p peer.ID) (*Conn, error) {
return nil, err
}

///////////////////////////////////////////////////////////////////////////////////
// /////////////////////////////////////////////////////////////////////////////////
// lo and behold, The Dialer
// TODO explain how all this works
//////////////////////////////////////////////////////////////////////////////////
// ////////////////////////////////////////////////////////////////////////////////

type dialRequest struct {
ctx context.Context
Expand Down Expand Up @@ -664,11 +664,16 @@ func (s *Swarm) filterKnownUndialables(p peer.ID, addrs []ma.Multiaddr) []ma.Mul
// it is able, respecting the various different types of rate
// limiting that occur without using extra goroutines per addr
func (s *Swarm) limitedDial(ctx context.Context, p peer.ID, a ma.Multiaddr, resp chan dialResult) {
timeout := s.dialTimeout
if lowTimeoutFilters.AddrBlocked(a) && s.dialTimeoutLocal < s.dialTimeout {
timeout = s.dialTimeoutLocal
}
s.limiter.AddDialJob(&dialJob{
addr: a,
peer: p,
resp: resp,
ctx: ctx,
addr: a,
peer: p,
resp: resp,
ctx: ctx,
timeout: timeout,
})
}

Expand Down
11 changes: 11 additions & 0 deletions testing/testing.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package testing

import (
"testing"
"time"

"github.com/stretchr/testify/require"

Expand Down Expand Up @@ -31,6 +32,7 @@ type config struct {
dialOnly bool
disableTCP bool
disableQUIC bool
dialTimeout time.Duration
connectionGater connmgr.ConnectionGater
sk crypto.PrivKey
}
Expand Down Expand Up @@ -72,6 +74,12 @@ func OptPeerPrivateKey(sk crypto.PrivKey) Option {
}
}

func DialTimeout(t time.Duration) Option {
return func(_ *testing.T, c *config) {
c.dialTimeout = t
}
}

// GenUpgrader creates a new connection upgrader for use with this swarm.
func GenUpgrader(n *swarm.Swarm) *tptu.Upgrader {
id := n.LocalPeer()
Expand Down Expand Up @@ -120,6 +128,9 @@ func GenSwarm(t *testing.T, opts ...Option) *swarm.Swarm {
if cfg.connectionGater != nil {
swarmOpts = append(swarmOpts, swarm.WithConnectionGater(cfg.connectionGater))
}
if cfg.dialTimeout != 0 {
swarmOpts = append(swarmOpts, swarm.WithDialTimeout(cfg.dialTimeout))
}
s, err := swarm.NewSwarm(p.ID, ps, swarmOpts...)
require.NoError(t, err)

Expand Down