Skip to content

Commit

Permalink
upstream: fix #389, handle quic conn errors better
Browse files Browse the repository at this point in the history
  • Loading branch information
ameshkov committed Mar 24, 2024
1 parent 0dbc5e8 commit 60b21f0
Show file tree
Hide file tree
Showing 2 changed files with 47 additions and 71 deletions.
108 changes: 41 additions & 67 deletions upstream/doq.go
Original file line number Diff line number Diff line change
Expand Up @@ -150,32 +150,40 @@ func (p *dnsOverQUIC) Exchange(m *dns.Msg) (resp *dns.Msg, err error) {
}
}()

// Check if there was already an active conn before sending the request.
// We'll only attempt to re-connect if there was one.
hasConnection := p.hasConnection()
// Gets or opens a QUIC connection to use for this query.
conn, cached, err := p.getConnection()
if err != nil {
return nil, fmt.Errorf("getting conn: %w", err)
}

// Make the first attempt to send the DNS query.
resp, err = p.exchangeQUIC(m)

// Make up to 2 attempts to re-open the QUIC connection and send the request
// again. There are several cases where this workaround is necessary to
// make DoQ usable. We need to make 2 attempts in the case when the
// connection was closed (due to inactivity for example) AND the server
// refuses to open a 0-RTT connection.
for i := 0; hasConnection && isQUICRetryError(err) && i < 2; i++ {
resp, err = p.exchangeQUIC(m, conn)

// Failure to use a cached connection should be handled gracefully as this
// connection could have been closed by the server or simply be broken due
// to how UDP NAT works. In this case the connection should be re-created.
if cached && err != nil {
log.Debug("dnsproxy: re-creating the QUIC connection and retrying due to %v", err)

// Close the active connection to make sure we'll try to re-connect.
p.closeConnWithError(err)
// Close the active connection to make sure the cached connection is
// cleaned up.
p.closeConnWithError(conn, err)

// Retry sending the request.
resp, err = p.exchangeQUIC(m)
// Get or re-create the QUIC connection in order to make the second
// attempt.
conn, _, err = p.getConnection()
if err != nil {
return nil, err
}

// Retry sending the request through the new connection.
resp, err = p.exchangeQUIC(m, conn)
}

if err != nil {
// If we're unable to exchange messages, make sure the connection is
// closed and signal about an internal error.
p.closeConnWithError(err)
p.closeConnWithError(conn, err)
}

return resp, err
Expand All @@ -195,19 +203,14 @@ func (p *dnsOverQUIC) Close() (err error) {
return err
}

// exchangeQUIC attempts to open a QUIC connection, send the DNS message
// exchangeQUIC attempts to open a new QUIC stream, send the DNS message
// through it and return the response it got from the server.
func (p *dnsOverQUIC) exchangeQUIC(req *dns.Msg) (resp *dns.Msg, err error) {
func (p *dnsOverQUIC) exchangeQUIC(req *dns.Msg, conn quic.Connection) (resp *dns.Msg, err error) {
addr := p.Address()

logBegin(addr, networkUDP, req)
defer func() { logFinish(addr, networkUDP, err) }()

conn, err := p.getConnection(true)
if err != nil {
return nil, err
}

buf, err := req.Pack()
if err != nil {
return nil, fmt.Errorf("failed to pack DNS message for DoQ: %w", err)
Expand Down Expand Up @@ -260,45 +263,27 @@ func (p *dnsOverQUIC) getBytesPool() (pool *sync.Pool) {
return p.bytesPool
}

// getConnection opens or returns an existing quic.Connection. useCached
// argument controls whether we should try to use the existing cached
// connection. If it is false, we will forcibly create a new connection and
// close the existing one if needed.
func (p *dnsOverQUIC) getConnection(useCached bool) (c quic.Connection, err error) {
// getConnection opens or returns an existing quic.Connection and indicates
// whether it opened a new connection or used an existing cached one.
func (p *dnsOverQUIC) getConnection() (c quic.Connection, cached bool, err error) {
var conn quic.Connection

p.connMu.Lock()
defer p.connMu.Unlock()

conn = p.conn
if conn != nil {
if useCached {
return conn, nil
}

// We're recreating the connection, let's create a new one.
err = conn.CloseWithError(QUICCodeNoError, "")
if err != nil {
log.Debug("dnsproxy: closing stale connection: %s", err)
}
return conn, true, nil
}

conn, err = p.openConnection()
if err != nil {
return nil, err
return nil, false, err
}

p.conn = conn

return conn, nil
}

// hasConnection returns true if there's an active QUIC connection.
func (p *dnsOverQUIC) hasConnection() (ok bool) {
p.connMu.Lock()
defer p.connMu.Unlock()

return p.conn != nil
return conn, false, nil
}

// getQUICConfig returns the QUIC config in a thread-safe manner. Note, that
Expand Down Expand Up @@ -327,20 +312,10 @@ func (p *dnsOverQUIC) openStream(conn quic.Connection) (quic.Stream, error) {

stream, err := conn.OpenStreamSync(ctx)
if err != nil {
log.Debug("dnsproxy: opening quic stream: %s", err)
} else {
return stream, nil
return nil, fmt.Errorf("failed to open a QUIC stream: %w", err)
}

// We can get here if the old QUIC connection is not valid anymore. We
// should try to re-create the connection again in this case.
newConn, err := p.getConnection(false)
if err != nil {
return nil, err
}

// Open a new stream.
return newConn.OpenStreamSync(ctx)
return stream, nil
}

// openConnection dials a new QUIC connection.
Expand Down Expand Up @@ -385,15 +360,10 @@ func (p *dnsOverQUIC) openConnection() (conn quic.Connection, err error) {
// closeConnWithError closes the active connection with error to make sure that
// new queries were processed in another connection. We can do that in the case
// of a fatal error.
func (p *dnsOverQUIC) closeConnWithError(err error) {
func (p *dnsOverQUIC) closeConnWithError(conn quic.Connection, err error) {
p.connMu.Lock()
defer p.connMu.Unlock()

if p.conn == nil {
// Do nothing, there's no active conn anyways.
return
}

code := QUICCodeNoError
if err != nil {
code = QUICCodeInternalError
Expand All @@ -404,11 +374,15 @@ func (p *dnsOverQUIC) closeConnWithError(err error) {
p.resetQUICConfig()
}

err = p.conn.CloseWithError(code, "")
err = conn.CloseWithError(code, "")
if err != nil {
log.Error("dnsproxy: failed to close the conn: %v", err)
}
p.conn = nil

// If the connection that's being closed it cached, reset the cache.
if p.conn == conn {
p.conn = nil
}
}

// readMsg reads the incoming DNS message from the QUIC stream.
Expand Down
10 changes: 6 additions & 4 deletions upstream/doq_internal_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ func TestUpstreamDoQ(t *testing.T) {
if conn == nil {
conn = uq.conn
} else {
// This way we test that the conn is properly reused.
// This way we test that the connection is properly reused.
require.Equal(t, conn, uq.conn)
}
}
Expand Down Expand Up @@ -110,6 +110,8 @@ func TestUpstreamDoQ_serverCloseConn(t *testing.T) {

for i := 0; i < 10; i++ {
go func() {
t.Helper()

defer wg.Done()

req := createTestMessage()
Expand Down Expand Up @@ -230,14 +232,14 @@ type testDoQServer struct {
// listener is the QUIC connections listener.
listener *quic.EarlyListener

// addr is the address that this server listens to.
addr string

// conns is the list of connections that are currently active.
conns map[quic.EarlyConnection]struct{}

// connsMu protects conns.
connsMu *sync.Mutex

// addr is the address that this server listens to.
addr string
}

// Shutdown stops the test server.
Expand Down

0 comments on commit 60b21f0

Please sign in to comment.