Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

swarm: log unexpected listener errors #2277

Merged
merged 2 commits into from
May 11, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions core/transport/transport.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ package transport

import (
"context"
"errors"
"net"

"github.com/libp2p/go-libp2p/core/network"
Expand Down Expand Up @@ -94,6 +95,9 @@ type Listener interface {
Multiaddr() ma.Multiaddr
}

// ErrListenerClosed is returned by Listener.Accept when the listener is gracefully closed.
var ErrListenerClosed = errors.New("listener closed")

// TransportNetwork is an inet.Network with methods for managing transports.
type TransportNetwork interface {
network.Network
Expand Down
4 changes: 4 additions & 0 deletions p2p/net/swarm/swarm_listen.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package swarm

import (
"errors"
"fmt"
"time"

Expand Down Expand Up @@ -127,6 +128,9 @@ func (s *Swarm) AddListenAddr(a ma.Multiaddr) error {
for {
c, err := list.Accept()
if err != nil {
if !errors.Is(err, transport.ErrListenerClosed) {
log.Errorf("swarm listener for %s accept error: %s", a, err)
}
return
}
canonicallog.LogPeerStatus(100, c.RemotePeer(), c.RemoteMultiaddr(), "connection_status", "established", "dir", "inbound")
Expand Down
4 changes: 4 additions & 0 deletions p2p/net/upgrader/listener.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package upgrader
import (
"context"
"fmt"
"strings"
"sync"

"github.com/libp2p/go-libp2p/core/network"
Expand Down Expand Up @@ -165,6 +166,9 @@ func (l *listener) Accept() (transport.CapableConn, error) {
return c, nil
}
}
if strings.Contains(l.err.Error(), "use of closed network connection") {
return nil, transport.ErrListenerClosed
}
return nil, l.err
}

Expand Down
2 changes: 1 addition & 1 deletion p2p/net/upgrader/listener_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -158,7 +158,7 @@ func TestListenerClose(t *testing.T) {
require.NoError(ln.Close())
err := <-errCh
require.Error(err)
require.Contains(err.Error(), "use of closed network connection")
require.Equal(err, transport.ErrListenerClosed)

// doesn't accept new connections when it is closed
_, err = dial(t, u, ln.Multiaddr(), peer.ID("1"), &network.NullScope{})
Expand Down
4 changes: 2 additions & 2 deletions p2p/protocol/circuitv2/client/listen.go
Original file line number Diff line number Diff line change
@@ -1,9 +1,9 @@
package client

import (
"errors"
"net"

"github.com/libp2p/go-libp2p/core/transport"
ma "github.com/multiformats/go-multiaddr"
manet "github.com/multiformats/go-multiaddr/net"
)
Expand Down Expand Up @@ -33,7 +33,7 @@ func (l *Listener) Accept() (manet.Conn, error) {
return evt.conn, nil

case <-l.ctx.Done():
return nil, errors.New("circuit v2 client closed")
return nil, transport.ErrListenerClosed
}
}
}
Expand Down
11 changes: 5 additions & 6 deletions p2p/transport/quic/virtuallistener.go
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
package libp2pquic

import (
"errors"
"sync"

tpt "github.com/libp2p/go-libp2p/core/transport"
Expand Down Expand Up @@ -30,7 +29,7 @@ func (l *virtualListener) Multiaddr() ma.Multiaddr {
}

func (l *virtualListener) Close() error {
l.acceptRunnner.RmAcceptForVersion(l.version)
l.acceptRunnner.RmAcceptForVersion(l.version, tpt.ErrListenerClosed)
return l.t.CloseVirtualListener(l)
}

Expand Down Expand Up @@ -64,15 +63,15 @@ func (r *acceptLoopRunner) AcceptForVersion(v quic.VersionNumber) chan acceptVal
return ch
}

func (r *acceptLoopRunner) RmAcceptForVersion(v quic.VersionNumber) {
func (r *acceptLoopRunner) RmAcceptForVersion(v quic.VersionNumber, err error) {
r.muxerMu.Lock()
defer r.muxerMu.Unlock()

ch, ok := r.muxer[v]
if !ok {
panic("expected chan in accept muxer")
}
ch <- acceptVal{err: errors.New("listener Accept closed")}
ch <- acceptVal{err: err}
delete(r.muxer, v)
}

Expand All @@ -97,7 +96,7 @@ func (r *acceptLoopRunner) innerAccept(l *listener, expectedVersion quic.Version
// Check if we have a buffered connection first from an earlier Accept call
case v, ok := <-bufferedConnChan:
if !ok {
return nil, errors.New("listener closed")
return nil, tpt.ErrListenerClosed
}
return v.conn, v.err
default:
Expand Down Expand Up @@ -159,7 +158,7 @@ func (r *acceptLoopRunner) Accept(l *listener, expectedVersion quic.VersionNumbe
}
case v, ok := <-bufferedConnChan:
if !ok {
return nil, errors.New("listener closed")
return nil, tpt.ErrListenerClosed
}
conn = v.conn
err = v.err
Expand Down
4 changes: 2 additions & 2 deletions p2p/transport/quicreuse/connmgr_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ import (

"github.com/libp2p/go-libp2p/core/crypto"
"github.com/libp2p/go-libp2p/core/peer"
"github.com/libp2p/go-libp2p/core/transport"
libp2ptls "github.com/libp2p/go-libp2p/p2p/security/tls"

ma "github.com/multiformats/go-multiaddr"
Expand Down Expand Up @@ -152,8 +153,7 @@ func TestAcceptErrorGetCleanedUp(t *testing.T) {
require.NoError(t, err)
defer l.Close()
_, err = l.Accept(context.Background())
require.EqualError(t, err, "accept goroutine finished")

require.ErrorIs(t, err, transport.ErrListenerClosed)
}

// The connection passed to quic-go needs to be type-assertable to a net.UDPConn,
Expand Down
9 changes: 7 additions & 2 deletions p2p/transport/quicreuse/listener.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,10 @@ import (
"fmt"
"io"
"net"
"strings"
"sync"

"github.com/libp2p/go-libp2p/core/transport"
ma "github.com/multiformats/go-multiaddr"
"github.com/quic-go/quic-go"
)
Expand Down Expand Up @@ -134,6 +136,9 @@ func (l *connListener) Run() error {
for {
conn, err := l.l.Accept(context.Background())
if err != nil {
if errors.Is(err, quic.ErrServerClosed) || strings.Contains(err.Error(), "use of closed network connection") {
return transport.ErrListenerClosed
}
return err
}
proto := conn.ConnectionState().TLS.NegotiatedProtocol
Expand Down Expand Up @@ -192,10 +197,10 @@ func (l *listener) Accept(ctx context.Context) (quic.Connection, error) {
case <-ctx.Done():
return nil, ctx.Err()
case <-l.acceptLoopRunning:
return nil, errors.New("accept goroutine finished")
return nil, transport.ErrListenerClosed
case c, ok := <-l.queue:
if !ok {
return nil, errors.New("listener closed")
return nil, transport.ErrListenerClosed
}
return c, nil
}
Expand Down
4 changes: 2 additions & 2 deletions p2p/transport/websocket/listener.go
Original file line number Diff line number Diff line change
Expand Up @@ -140,7 +140,7 @@ func (l *listener) Accept() (manet.Conn, error) {
select {
case c, ok := <-l.incoming:
if !ok {
return nil, fmt.Errorf("listener is closed")
return nil, transport.ErrListenerClosed
}

mnc, err := manet.WrapNetConn(c)
Expand All @@ -151,7 +151,7 @@ func (l *listener) Accept() (manet.Conn, error) {

return mnc, nil
case <-l.closed:
return nil, fmt.Errorf("listener is closed")
return nil, transport.ErrListenerClosed
}
}

Expand Down
4 changes: 1 addition & 3 deletions p2p/transport/webtransport/listener.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,6 @@ import (
"github.com/quic-go/webtransport-go"
)

var errClosed = errors.New("closed")

const queueLen = 16
const handshakeTimeout = 10 * time.Second

Expand Down Expand Up @@ -155,7 +153,7 @@ func (l *listener) httpHandlerWithConnScope(w http.ResponseWriter, r *http.Reque
func (l *listener) Accept() (tpt.CapableConn, error) {
select {
case <-l.ctx.Done():
return nil, errClosed
return nil, tpt.ErrListenerClosed
case c := <-l.queue:
return c, nil
}
Expand Down