Skip to content

Commit

Permalink
swarm: log unexpected listener errors (#2277)
Browse files Browse the repository at this point in the history
* Log unexpected listener errors

* Use errors.Is
  • Loading branch information
MarcoPolo authored May 11, 2023
1 parent 18c11ad commit 22a70b4
Show file tree
Hide file tree
Showing 10 changed files with 32 additions and 18 deletions.
4 changes: 4 additions & 0 deletions core/transport/transport.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ package transport

import (
"context"
"errors"
"net"

"github.com/libp2p/go-libp2p/core/network"
Expand Down Expand Up @@ -94,6 +95,9 @@ type Listener interface {
Multiaddr() ma.Multiaddr
}

// ErrListenerClosed is returned by Listener.Accept when the listener is gracefully closed.
var ErrListenerClosed = errors.New("listener closed")

// TransportNetwork is an inet.Network with methods for managing transports.
type TransportNetwork interface {
network.Network
Expand Down
4 changes: 4 additions & 0 deletions p2p/net/swarm/swarm_listen.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package swarm

import (
"errors"
"fmt"
"time"

Expand Down Expand Up @@ -127,6 +128,9 @@ func (s *Swarm) AddListenAddr(a ma.Multiaddr) error {
for {
c, err := list.Accept()
if err != nil {
if !errors.Is(err, transport.ErrListenerClosed) {
log.Errorf("swarm listener for %s accept error: %s", a, err)
}
return
}
canonicallog.LogPeerStatus(100, c.RemotePeer(), c.RemoteMultiaddr(), "connection_status", "established", "dir", "inbound")
Expand Down
4 changes: 4 additions & 0 deletions p2p/net/upgrader/listener.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package upgrader
import (
"context"
"fmt"
"strings"
"sync"

"github.com/libp2p/go-libp2p/core/network"
Expand Down Expand Up @@ -165,6 +166,9 @@ func (l *listener) Accept() (transport.CapableConn, error) {
return c, nil
}
}
if strings.Contains(l.err.Error(), "use of closed network connection") {
return nil, transport.ErrListenerClosed
}
return nil, l.err
}

Expand Down
2 changes: 1 addition & 1 deletion p2p/net/upgrader/listener_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -158,7 +158,7 @@ func TestListenerClose(t *testing.T) {
require.NoError(ln.Close())
err := <-errCh
require.Error(err)
require.Contains(err.Error(), "use of closed network connection")
require.Equal(err, transport.ErrListenerClosed)

// doesn't accept new connections when it is closed
_, err = dial(t, u, ln.Multiaddr(), peer.ID("1"), &network.NullScope{})
Expand Down
4 changes: 2 additions & 2 deletions p2p/protocol/circuitv2/client/listen.go
Original file line number Diff line number Diff line change
@@ -1,9 +1,9 @@
package client

import (
"errors"
"net"

"github.com/libp2p/go-libp2p/core/transport"
ma "github.com/multiformats/go-multiaddr"
manet "github.com/multiformats/go-multiaddr/net"
)
Expand Down Expand Up @@ -33,7 +33,7 @@ func (l *Listener) Accept() (manet.Conn, error) {
return evt.conn, nil

case <-l.ctx.Done():
return nil, errors.New("circuit v2 client closed")
return nil, transport.ErrListenerClosed
}
}
}
Expand Down
11 changes: 5 additions & 6 deletions p2p/transport/quic/virtuallistener.go
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
package libp2pquic

import (
"errors"
"sync"

tpt "github.com/libp2p/go-libp2p/core/transport"
Expand Down Expand Up @@ -30,7 +29,7 @@ func (l *virtualListener) Multiaddr() ma.Multiaddr {
}

func (l *virtualListener) Close() error {
l.acceptRunnner.RmAcceptForVersion(l.version)
l.acceptRunnner.RmAcceptForVersion(l.version, tpt.ErrListenerClosed)
return l.t.CloseVirtualListener(l)
}

Expand Down Expand Up @@ -65,7 +64,7 @@ func (r *acceptLoopRunner) AcceptForVersion(v quic.VersionNumber) chan acceptVal
return ch
}

func (r *acceptLoopRunner) RmAcceptForVersion(v quic.VersionNumber) {
func (r *acceptLoopRunner) RmAcceptForVersion(v quic.VersionNumber, err error) {
r.muxerMu.Lock()
defer r.muxerMu.Unlock()

Expand All @@ -78,7 +77,7 @@ func (r *acceptLoopRunner) RmAcceptForVersion(v quic.VersionNumber) {
if !ok {
panic("expected chan in accept muxer")
}
ch <- acceptVal{err: errors.New("listener Accept closed")}
ch <- acceptVal{err: err}
delete(r.muxer, v)
}

Expand All @@ -104,7 +103,7 @@ func (r *acceptLoopRunner) innerAccept(l *listener, expectedVersion quic.Version
// Check if we have a buffered connection first from an earlier Accept call
case v, ok := <-bufferedConnChan:
if !ok {
return nil, errors.New("listener closed")
return nil, tpt.ErrListenerClosed
}
return v.conn, v.err
default:
Expand Down Expand Up @@ -166,7 +165,7 @@ func (r *acceptLoopRunner) Accept(l *listener, expectedVersion quic.VersionNumbe
}
case v, ok := <-bufferedConnChan:
if !ok {
return nil, errors.New("listener closed")
return nil, tpt.ErrListenerClosed
}
conn = v.conn
err = v.err
Expand Down
4 changes: 2 additions & 2 deletions p2p/transport/quicreuse/connmgr_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ import (

"github.com/libp2p/go-libp2p/core/crypto"
"github.com/libp2p/go-libp2p/core/peer"
"github.com/libp2p/go-libp2p/core/transport"
libp2ptls "github.com/libp2p/go-libp2p/p2p/security/tls"

ma "github.com/multiformats/go-multiaddr"
Expand Down Expand Up @@ -152,8 +153,7 @@ func TestAcceptErrorGetCleanedUp(t *testing.T) {
require.NoError(t, err)
defer l.Close()
_, err = l.Accept(context.Background())
require.EqualError(t, err, "accept goroutine finished")

require.ErrorIs(t, err, transport.ErrListenerClosed)
}

// The connection passed to quic-go needs to be type-assertable to a net.UDPConn,
Expand Down
9 changes: 7 additions & 2 deletions p2p/transport/quicreuse/listener.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,10 @@ import (
"fmt"
"io"
"net"
"strings"
"sync"

"github.com/libp2p/go-libp2p/core/transport"
ma "github.com/multiformats/go-multiaddr"
"github.com/quic-go/quic-go"
)
Expand Down Expand Up @@ -134,6 +136,9 @@ func (l *connListener) Run() error {
for {
conn, err := l.l.Accept(context.Background())
if err != nil {
if errors.Is(err, quic.ErrServerClosed) || strings.Contains(err.Error(), "use of closed network connection") {
return transport.ErrListenerClosed
}
return err
}
proto := conn.ConnectionState().TLS.NegotiatedProtocol
Expand Down Expand Up @@ -192,10 +197,10 @@ func (l *listener) Accept(ctx context.Context) (quic.Connection, error) {
case <-ctx.Done():
return nil, ctx.Err()
case <-l.acceptLoopRunning:
return nil, errors.New("accept goroutine finished")
return nil, transport.ErrListenerClosed
case c, ok := <-l.queue:
if !ok {
return nil, errors.New("listener closed")
return nil, transport.ErrListenerClosed
}
return c, nil
}
Expand Down
4 changes: 2 additions & 2 deletions p2p/transport/websocket/listener.go
Original file line number Diff line number Diff line change
Expand Up @@ -113,7 +113,7 @@ func (l *listener) Accept() (manet.Conn, error) {
select {
case c, ok := <-l.incoming:
if !ok {
return nil, fmt.Errorf("listener is closed")
return nil, transport.ErrListenerClosed
}

mnc, err := manet.WrapNetConn(c)
Expand All @@ -124,7 +124,7 @@ func (l *listener) Accept() (manet.Conn, error) {

return mnc, nil
case <-l.closed:
return nil, fmt.Errorf("listener is closed")
return nil, transport.ErrListenerClosed
}
}

Expand Down
4 changes: 1 addition & 3 deletions p2p/transport/webtransport/listener.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,6 @@ import (
"github.com/quic-go/webtransport-go"
)

var errClosed = errors.New("closed")

const queueLen = 16
const handshakeTimeout = 10 * time.Second

Expand Down Expand Up @@ -155,7 +153,7 @@ func (l *listener) httpHandlerWithConnScope(w http.ResponseWriter, r *http.Reque
func (l *listener) Accept() (tpt.CapableConn, error) {
select {
case <-l.ctx.Done():
return nil, errClosed
return nil, tpt.ErrListenerClosed
case c := <-l.queue:
return c, nil
}
Expand Down

0 comments on commit 22a70b4

Please sign in to comment.