From 1de208c84b89ca6689b9a109d786a6de4ef84e14 Mon Sep 17 00:00:00 2001 From: Marten Seemann Date: Sun, 19 Dec 2021 19:39:16 +0400 Subject: [PATCH] add constructor options for timeout, stop using transport.DialTimeout --- dial_test.go | 52 +++++++++++++++++++++++----------------------- limiter.go | 20 ++++++------------ swarm.go | 41 ++++++++++++++++++++++++++++-------- swarm_dial.go | 17 +++++++++------ testing/testing.go | 11 ++++++++++ 5 files changed, 86 insertions(+), 55 deletions(-) diff --git a/dial_test.go b/dial_test.go index 5fe4bdc2..a3448640 100644 --- a/dial_test.go +++ b/dial_test.go @@ -14,7 +14,6 @@ import ( "github.com/libp2p/go-libp2p-core/peer" "github.com/libp2p/go-libp2p-core/peerstore" testutil "github.com/libp2p/go-libp2p-core/test" - "github.com/libp2p/go-libp2p-core/transport" swarmt "github.com/libp2p/go-libp2p-swarm/testing" "github.com/libp2p/go-libp2p-testing/ci" @@ -24,10 +23,6 @@ import ( "github.com/stretchr/testify/require" ) -func init() { - transport.DialTimeout = time.Second -} - func closeSwarms(swarms []*Swarm) { for _, s := range swarms { s.Close() @@ -161,8 +156,9 @@ func newSilentPeer(t *testing.T) (peer.ID, ma.Multiaddr, net.Listener) { func TestDialWait(t *testing.T) { t.Parallel() - ctx := context.Background() - swarms := makeSwarms(t, 1) + const dialTimeout = 250 * time.Millisecond + + swarms := makeSwarms(t, 1, swarmt.DialTimeout(dialTimeout)) s1 := swarms[0] defer s1.Close() @@ -173,7 +169,7 @@ func TestDialWait(t *testing.T) { s1.Peerstore().AddAddr(s2p, s2addr, peerstore.PermanentAddrTTL) before := time.Now() - if c, err := s1.DialPeer(ctx, s2p); err == nil { + if c, err := s1.DialPeer(context.Background(), s2p); err == nil { defer c.Close() t.Fatal("error swarm dialing to unknown peer worked...", err) } else { @@ -181,11 +177,11 @@ func TestDialWait(t *testing.T) { } duration := time.Since(before) - if duration < transport.DialTimeout*DialAttempts { - t.Error("< transport.DialTimeout * DialAttempts not being respected", duration, transport.DialTimeout*DialAttempts) + if duration < dialTimeout*DialAttempts { + t.Error("< dialTimeout * DialAttempts not being respected", duration, dialTimeout*DialAttempts) } - if duration > 2*transport.DialTimeout*DialAttempts { - t.Error("> 2*transport.DialTimeout * DialAttempts not being respected", duration, 2*transport.DialTimeout*DialAttempts) + if duration > 2*dialTimeout*DialAttempts { + t.Error("> 2*dialTimeout * DialAttempts not being respected", duration, 2*dialTimeout*DialAttempts) } if !s1.Backoff().Backoff(s2p, s2addr) { @@ -194,15 +190,16 @@ func TestDialWait(t *testing.T) { } func TestDialBackoff(t *testing.T) { - // t.Skip("skipping for another test") if ci.IsRunning() { t.Skip("travis will never have fun with this test") } t.Parallel() + const dialTimeout = 250 * time.Millisecond + ctx := context.Background() - swarms := makeSwarms(t, 2) + swarms := makeSwarms(t, 2, swarmt.DialTimeout(dialTimeout)) s1 := swarms[0] s2 := swarms[1] defer s1.Close() @@ -269,8 +266,8 @@ func TestDialBackoff(t *testing.T) { s3done := dialOfflineNode(s3p, N) // when all dials should be done by: - dialTimeout1x := time.After(transport.DialTimeout) - dialTimeout10Ax := time.After(transport.DialTimeout * 2 * 10) // DialAttempts * 10) + dialTimeout1x := time.After(dialTimeout) + dialTimeout10Ax := time.After(dialTimeout * 2 * 10) // DialAttempts * 10) // 2) all dials should hang select { @@ -352,8 +349,8 @@ func TestDialBackoff(t *testing.T) { s3done := dialOfflineNode(s3p, N) // when all dials should be done by: - dialTimeout1x := time.After(transport.DialTimeout) - dialTimeout10Ax := time.After(transport.DialTimeout * 2 * 10) // DialAttempts * 10) + dialTimeout1x := time.After(dialTimeout) + dialTimeout10Ax := time.After(dialTimeout * 2 * 10) // DialAttempts * 10) // 7) s3 dials should all return immediately (except 1) for i := 0; i < N-1; i++ { @@ -405,11 +402,12 @@ func TestDialBackoff(t *testing.T) { } func TestDialBackoffClears(t *testing.T) { - // t.Skip("skipping for another test") t.Parallel() + const dialTimeout = 250 * time.Millisecond + ctx := context.Background() - swarms := makeSwarms(t, 2) + swarms := makeSwarms(t, 2, swarmt.DialTimeout(dialTimeout)) s1 := swarms[0] s2 := swarms[1] defer s1.Close() @@ -433,11 +431,11 @@ func TestDialBackoffClears(t *testing.T) { } duration := time.Since(before) - if duration < transport.DialTimeout*DialAttempts { - t.Error("< transport.DialTimeout * DialAttempts not being respected", duration, transport.DialTimeout*DialAttempts) + if duration < dialTimeout*DialAttempts { + t.Error("< dialTimeout * DialAttempts not being respected", duration, dialTimeout*DialAttempts) } - if duration > 2*transport.DialTimeout*DialAttempts { - t.Error("> 2*transport.DialTimeout * DialAttempts not being respected", duration, 2*transport.DialTimeout*DialAttempts) + if duration > 2*dialTimeout*DialAttempts { + t.Error("> 2*dialTimeout * DialAttempts not being respected", duration, 2*dialTimeout*DialAttempts) } if !s1.Backoff().Backoff(s2.LocalPeer(), s2bad) { @@ -561,7 +559,9 @@ func TestDialSimultaneousJoin(t *testing.T) { ctx, cancel := context.WithCancel(context.Background()) defer cancel() - swarms := makeSwarms(t, 2) + const dialTimeout = 250 * time.Millisecond + + swarms := makeSwarms(t, 2, swarmt.DialTimeout(dialTimeout)) s1 := swarms[0] s2 := swarms[1] defer s1.Close() @@ -654,7 +654,7 @@ func TestDialSimultaneousJoin(t *testing.T) { if c1 != c2 { t.Fatal("expected c1 and c2 to be the same") } - case <-time.After(2 * transport.DialTimeout): + case <-time.After(2 * dialTimeout): t.Fatal("no connection from first dial") } } diff --git a/limiter.go b/limiter.go index 5cd8cfa2..ac72279b 100644 --- a/limiter.go +++ b/limiter.go @@ -20,25 +20,17 @@ type dialResult struct { } type dialJob struct { - addr ma.Multiaddr - peer peer.ID - ctx context.Context - resp chan dialResult + addr ma.Multiaddr + peer peer.ID + ctx context.Context + resp chan dialResult + timeout time.Duration } func (dj *dialJob) cancelled() bool { return dj.ctx.Err() != nil } -func (dj *dialJob) dialTimeout() time.Duration { - timeout := transport.DialTimeout - if lowTimeoutFilters.AddrBlocked(dj.addr) { - timeout = DialTimeoutLocal - } - - return timeout -} - type dialLimiter struct { lk sync.Mutex @@ -221,7 +213,7 @@ func (dl *dialLimiter) executeDial(j *dialJob) { return } - dctx, cancel := context.WithTimeout(j.ctx, j.dialTimeout()) + dctx, cancel := context.WithTimeout(j.ctx, j.timeout) defer cancel() con, err := dl.dialFunc(dctx, j.peer, j.addr) diff --git a/swarm.go b/swarm.go index b1f51a35..47617907 100644 --- a/swarm.go +++ b/swarm.go @@ -21,11 +21,15 @@ import ( ma "github.com/multiformats/go-multiaddr" ) -// DialTimeoutLocal is the maximum duration a Dial to local network address -// is allowed to take. -// This includes the time between dialing the raw network connection, -// protocol selection as well the handshake, if applicable. -var DialTimeoutLocal = 5 * time.Second +const ( + defaultDialTimeout = 15 * time.Second + + // defaultDialTimeoutLocal is the maximum duration a Dial to local network address + // is allowed to take. + // This includes the time between dialing the raw network connection, + // protocol selection as well the handshake, if applicable. + defaultDialTimeoutLocal = 5 * time.Second +) var log = logging.Logger("swarm2") @@ -58,6 +62,20 @@ func WithMetrics(reporter metrics.Reporter) Option { } } +func WithDialTimeout(t time.Duration) Option { + return func(s *Swarm) error { + s.dialTimeout = t + return nil + } +} + +func WithDialTimeoutLocal(t time.Duration) Option { + return func(s *Swarm) error { + s.dialTimeoutLocal = t + return nil + } +} + // Swarm is a connection muxer, allowing connections to other peers to // be opened and closed, while still using the same Chan for all // communication. The Chan sends/receives Messages, which note the @@ -73,6 +91,9 @@ type Swarm struct { local peer.ID peers peerstore.Peerstore + dialTimeout time.Duration + dialTimeoutLocal time.Duration + conns struct { sync.RWMutex m map[peer.ID][]*Conn @@ -117,10 +138,12 @@ type Swarm struct { func NewSwarm(local peer.ID, peers peerstore.Peerstore, opts ...Option) (*Swarm, error) { ctx, cancel := context.WithCancel(context.Background()) s := &Swarm{ - local: local, - peers: peers, - ctx: ctx, - ctxCancel: cancel, + local: local, + peers: peers, + ctx: ctx, + ctxCancel: cancel, + dialTimeout: defaultDialTimeout, + dialTimeoutLocal: defaultDialTimeoutLocal, } s.conns.m = make(map[peer.ID][]*Conn) diff --git a/swarm_dial.go b/swarm_dial.go index 80d6958a..df5f0d94 100644 --- a/swarm_dial.go +++ b/swarm_dial.go @@ -279,10 +279,10 @@ func (s *Swarm) dialPeer(ctx context.Context, p peer.ID) (*Conn, error) { return nil, err } -/////////////////////////////////////////////////////////////////////////////////// +// ///////////////////////////////////////////////////////////////////////////////// // lo and behold, The Dialer // TODO explain how all this works -////////////////////////////////////////////////////////////////////////////////// +// //////////////////////////////////////////////////////////////////////////////// type dialRequest struct { ctx context.Context @@ -664,11 +664,16 @@ func (s *Swarm) filterKnownUndialables(p peer.ID, addrs []ma.Multiaddr) []ma.Mul // it is able, respecting the various different types of rate // limiting that occur without using extra goroutines per addr func (s *Swarm) limitedDial(ctx context.Context, p peer.ID, a ma.Multiaddr, resp chan dialResult) { + timeout := s.dialTimeout + if lowTimeoutFilters.AddrBlocked(a) && s.dialTimeoutLocal < s.dialTimeout { + timeout = s.dialTimeoutLocal + } s.limiter.AddDialJob(&dialJob{ - addr: a, - peer: p, - resp: resp, - ctx: ctx, + addr: a, + peer: p, + resp: resp, + ctx: ctx, + timeout: timeout, }) } diff --git a/testing/testing.go b/testing/testing.go index e2ffabe2..0ed32735 100644 --- a/testing/testing.go +++ b/testing/testing.go @@ -2,6 +2,7 @@ package testing import ( "testing" + "time" "github.com/stretchr/testify/require" @@ -31,6 +32,7 @@ type config struct { dialOnly bool disableTCP bool disableQUIC bool + dialTimeout time.Duration connectionGater connmgr.ConnectionGater sk crypto.PrivKey } @@ -72,6 +74,12 @@ func OptPeerPrivateKey(sk crypto.PrivKey) Option { } } +func DialTimeout(t time.Duration) Option { + return func(_ *testing.T, c *config) { + c.dialTimeout = t + } +} + // GenUpgrader creates a new connection upgrader for use with this swarm. func GenUpgrader(n *swarm.Swarm) *tptu.Upgrader { id := n.LocalPeer() @@ -120,6 +128,9 @@ func GenSwarm(t *testing.T, opts ...Option) *swarm.Swarm { if cfg.connectionGater != nil { swarmOpts = append(swarmOpts, swarm.WithConnectionGater(cfg.connectionGater)) } + if cfg.dialTimeout != 0 { + swarmOpts = append(swarmOpts, swarm.WithDialTimeout(cfg.dialTimeout)) + } s, err := swarm.NewSwarm(p.ID, ps, swarmOpts...) require.NoError(t, err)