Skip to content

Commit

Permalink
Fix server close operation
Browse files Browse the repository at this point in the history
First close the connection with the peers
and after close the listeners
  • Loading branch information
pappz committed Dec 6, 2024
1 parent 5b163e8 commit ef3fd5c
Show file tree
Hide file tree
Showing 6 changed files with 33 additions and 20 deletions.
7 changes: 7 additions & 0 deletions relay/client/dialer/net/err.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
package net

import "errors"

var (
ErrClosedByServer = errors.New("closed by server")
)
16 changes: 13 additions & 3 deletions relay/client/dialer/quic/conn.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,12 +2,15 @@ package quic

import (
"context"
"errors"
"fmt"
"net"
"time"

"github.com/quic-go/quic-go"
log "github.com/sirupsen/logrus"

netErr "github.com/netbirdio/netbird/relay/client/dialer/net"
)

const (
Expand Down Expand Up @@ -41,8 +44,7 @@ func NewConn(session quic.Connection) net.Conn {
func (c *Conn) Read(b []byte) (n int, err error) {
dgram, err := c.session.ReceiveDatagram(c.ctx)
if err != nil {
log.Errorf("failed to read from QUIC session: %v", err)
return 0, err
return 0, c.remoteCloseErrHandling(err)
}

n = copy(b, dgram)
Expand All @@ -52,6 +54,7 @@ func (c *Conn) Read(b []byte) (n int, err error) {
func (c *Conn) Write(b []byte) (int, error) {
err := c.session.SendDatagram(b)
if err != nil {
err = c.remoteCloseErrHandling(err)
log.Errorf("failed to write to QUIC stream: %v", err)
return 0, err
}
Expand All @@ -78,10 +81,17 @@ func (c *Conn) SetWriteDeadline(t time.Time) error {
}

func (c *Conn) SetDeadline(t time.Time) error {

return nil
}

func (c *Conn) Close() error {
return c.session.CloseWithError(0, "normal closure")
}

func (c *Conn) remoteCloseErrHandling(err error) error {
var appErr *quic.ApplicationError
if errors.As(err, &appErr) && appErr.ErrorCode == 0x0 {
return netErr.ErrClosedByServer
}
return err
}
1 change: 1 addition & 0 deletions relay/client/dialer/ws/conn.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ func NewConn(wsConn *websocket.Conn, serverAddress string) net.Conn {
func (c *Conn) Read(b []byte) (n int, err error) {
t, ioReader, err := c.Conn.Reader(c.ctx)
if err != nil {
// todo use ErrClosedByServer
return 0, err
}

Expand Down
21 changes: 7 additions & 14 deletions relay/server/listener/quic/conn.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,22 +29,20 @@ func NewConn(session quic.Connection) *Conn {
}

func (c *Conn) Read(b []byte) (n int, err error) {
if c.isClosed() {
return 0, net.ErrClosed
}

dgram, err := c.session.ReceiveDatagram(c.ctx)
if err != nil {
return 0, c.ioErrHandling(err)
return 0, c.remoteCloseErrHandling(err)
}
// Copy data to b, ensuring we don’t exceed the size of b
n = copy(b, dgram)
return n, nil
}

func (c *Conn) Write(b []byte) (int, error) {
err := c.session.SendDatagram(b)
return len(b), err
if err := c.session.SendDatagram(b); err != nil {
return 0, c.remoteCloseErrHandling(err)
}
return len(b), nil
}

func (c *Conn) LocalAddr() net.Addr {
Expand Down Expand Up @@ -88,19 +86,14 @@ func (c *Conn) isClosed() bool {
return c.closed
}

func (c *Conn) ioErrHandling(err error) error {
func (c *Conn) remoteCloseErrHandling(err error) error {
if c.isClosed() {
return net.ErrClosed
}

// Handle QUIC-specific errors
if err == nil {
return nil
}

// Check if the connection was closed remotely
var appErr *quic.ApplicationError
if errors.As(err, &appErr) && appErr.ErrorCode == 0 { // 0 is normal closure
if errors.As(err, &appErr) && appErr.ErrorCode == 0x0 {
return net.ErrClosed
}

Expand Down
4 changes: 3 additions & 1 deletion relay/server/relay.go
Original file line number Diff line number Diff line change
Expand Up @@ -150,6 +150,8 @@ func (r *Relay) Accept(conn net.Conn) {
func (r *Relay) Shutdown(ctx context.Context) {
log.Infof("close connection with all peers")
r.closeMu.Lock()
defer r.closeMu.Unlock()

wg := sync.WaitGroup{}
peers := r.store.Peers()
for _, peer := range peers {
Expand All @@ -161,7 +163,7 @@ func (r *Relay) Shutdown(ctx context.Context) {
}
wg.Wait()
r.metricsCancel()
r.closeMu.Unlock()
r.closed = true
}

// InstanceURL returns the instance URL of the relay server
Expand Down
4 changes: 2 additions & 2 deletions relay/server/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -91,14 +91,14 @@ func (r *Server) Listen(cfg ListenerConfig) error {
// Shutdown stops the relay server. If there are active connections, they will be closed gracefully. In case of a context,
// the connections will be forcefully closed.
func (r *Server) Shutdown(ctx context.Context) error {
r.relay.Shutdown(ctx)

var multiErr *multierror.Error
for _, l := range r.listeners {
if err := l.Shutdown(ctx); err != nil {
multiErr = multierror.Append(multiErr, err)
}
}

r.relay.Shutdown(ctx)
return nberrors.FormatErrorOrNil(multiErr)
}

Expand Down

0 comments on commit ef3fd5c

Please sign in to comment.