Skip to content

Commit

Permalink
upstream: several improvements in DoH3 and DoQ upstreams
Browse files Browse the repository at this point in the history
The previous implementation weren't able to properly handle a situation when the
server was restarted. This commit greatly improves the overall stability.
  • Loading branch information
ameshkov committed Sep 19, 2022
1 parent 59cf92b commit 79f47f5
Show file tree
Hide file tree
Showing 10 changed files with 549 additions and 132 deletions.
24 changes: 12 additions & 12 deletions proxy/proxy.go
Original file line number Diff line number Diff line change
Expand Up @@ -67,17 +67,17 @@ type Proxy struct {
// Listeners
// --

udpListen []*net.UDPConn // UDP listen connections
tcpListen []net.Listener // TCP listeners
tlsListen []net.Listener // TLS listeners
quicListen []quic.Listener // QUIC listeners
httpsListen []net.Listener // HTTPS listeners
httpsServer *http.Server // HTTPS server instance
h3Listen []*net.UDPConn // HTTP/3 listeners
h3Server *http3.Server // HTTP/3 server instance
dnsCryptUDPListen []*net.UDPConn // UDP listen connections for DNSCrypt
dnsCryptTCPListen []net.Listener // TCP listeners for DNSCrypt
dnsCryptServer *dnscrypt.Server // DNSCrypt server instance
udpListen []*net.UDPConn // UDP listen connections
tcpListen []net.Listener // TCP listeners
tlsListen []net.Listener // TLS listeners
quicListen []quic.EarlyListener // QUIC listeners
httpsListen []net.Listener // HTTPS listeners
httpsServer *http.Server // HTTPS server instance
h3Listen []quic.EarlyListener // HTTP/3 listeners
h3Server *http3.Server // HTTP/3 server instance
dnsCryptUDPListen []*net.UDPConn // UDP listen connections for DNSCrypt
dnsCryptTCPListen []net.Listener // TCP listeners for DNSCrypt
dnsCryptServer *dnscrypt.Server // DNSCrypt server instance

// Upstream
// --
Expand Down Expand Up @@ -240,7 +240,7 @@ func (p *Proxy) Stop() error {
closeAll([]io.Closer{p.httpsServer}, &errs)
p.httpsServer = nil

// No need to close these since they're closed by httpsServer.
// No need to close these since they're closed by httpsServer.Close().
p.httpsListen = nil
}

Expand Down
4 changes: 3 additions & 1 deletion proxy/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,8 @@ import (
"net"
"time"

"github.com/lucas-clemente/quic-go"

"github.com/AdguardTeam/golibs/log"
"github.com/miekg/dns"
)
Expand Down Expand Up @@ -58,7 +60,7 @@ func (p *Proxy) startListeners() error {
}

for _, l := range p.h3Listen {
go func(l *net.UDPConn) { _ = p.h3Server.Serve(l) }(l)
go func(l quic.EarlyListener) { _ = p.h3Server.ServeListener(l) }(l)
}

for _, l := range p.quicListen {
Expand Down
20 changes: 11 additions & 9 deletions proxy/server_https.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,8 @@ import (
"golang.org/x/net/http2"
)

// listenHTTP creates an instance of an HTTPS server that supports HTTP/1.1
// and HTTP/2. Returns the address the listener actually listens to (useful
// listenHTTP creates instances of TLS listeners that will be used to run an
// H1/H2 server. Returns the address the listener actually listens to (useful
// in the case if port 0 is specified).
func (p *Proxy) listenHTTP(addr *net.TCPAddr) (laddr *net.TCPAddr, err error) {
tcpListen, err := net.ListenTCP("tcp", addr)
Expand All @@ -37,14 +37,18 @@ func (p *Proxy) listenHTTP(addr *net.TCPAddr) (laddr *net.TCPAddr, err error) {
return tcpListen.Addr().(*net.TCPAddr), nil
}

// listenH3 creates an instance of an HTTP/3 server.
// listenH3 creates instances of QUIC listeners that will be used for running
// an HTTP/3 server.
func (p *Proxy) listenH3(addr *net.UDPAddr) (err error) {
udpListen, err := net.ListenUDP("udp", addr)
tlsConfig := p.TLSConfig.Clone()
tlsConfig.NextProtos = []string{"h3"}
quicListen, err := quic.ListenAddrEarly(addr.String(), tlsConfig, &quic.Config{})
if err != nil {
return fmt.Errorf("tcp listener: %w", err)
return fmt.Errorf("quic listener: %w", err)
}
log.Info("Listening to h3://%s", udpListen.LocalAddr())
p.h3Listen = append(p.h3Listen, udpListen)
log.Info("Listening to h3://%s", quicListen.Addr())

p.h3Listen = append(p.h3Listen, quicListen)

return nil
}
Expand All @@ -66,8 +70,6 @@ func (p *Proxy) createHTTPSListeners() (err error) {
proxy: p,
h3: true,
},
TLSConfig: p.TLSConfig.Clone(),
QuicConfig: &quic.Config{},
}
}

Expand Down
13 changes: 9 additions & 4 deletions proxy/server_quic.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,9 +49,13 @@ func (p *Proxy) createQUICListeners() error {
log.Info("Creating a QUIC listener")
tlsConfig := p.TLSConfig.Clone()
tlsConfig.NextProtos = compatProtoDQ
quicListen, err := quic.ListenAddr(a.String(), tlsConfig, &quic.Config{MaxIdleTimeout: maxQUICIdleTimeout})
quicListen, err := quic.ListenAddrEarly(
a.String(),
tlsConfig,
&quic.Config{MaxIdleTimeout: maxQUICIdleTimeout},
)
if err != nil {
return fmt.Errorf("starting quic listener: %w", err)
return fmt.Errorf("quic listener: %w", err)
}

p.quicListen = append(p.quicListen, quicListen)
Expand All @@ -63,13 +67,14 @@ func (p *Proxy) createQUICListeners() error {
// quicPacketLoop listens for incoming QUIC packets.
//
// See also the comment on Proxy.requestGoroutinesSema.
func (p *Proxy) quicPacketLoop(l quic.Listener, requestGoroutinesSema semaphore) {
func (p *Proxy) quicPacketLoop(l quic.EarlyListener, requestGoroutinesSema semaphore) {
log.Info("Entering the DNS-over-QUIC listener loop on %s", l.Addr())
for {
conn, err := l.Accept(context.Background())

if err != nil {
if isQUICNonCrit(err) {
log.Tracef("quic connection closed or timeout: %s", err)
log.Tracef("quic connection closed or timed out: %s", err)
} else {
log.Error("reading from quic listen: %s", err)
}
Expand Down
4 changes: 3 additions & 1 deletion proxy/server_quic_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,10 +34,12 @@ func TestQuicProxy(t *testing.T) {
addr := dnsProxy.Addr(ProtoQUIC)

// Open QUIC connection.
conn, err := quic.DialAddr(addr.String(), tlsConfig, nil)
conn, err := quic.DialAddrEarly(addr.String(), tlsConfig, nil)
require.NoError(t, err)
defer conn.CloseWithError(DoQCodeNoError, "")

conn.HandshakeComplete()

// Send several test messages.
for i := 0; i < 10; i++ {
sendTestQUICMessage(t, conn, DoQv1)
Expand Down
13 changes: 13 additions & 0 deletions upstream/bootstrap.go
Original file line number Diff line number Diff line change
Expand Up @@ -306,3 +306,16 @@ func (n *bootstrapper) createDialContext(addresses []string) (dialContext dialHa
return nil, errors.List("all dialers failed", errs...)
}
}

// newContext creates a new context with deadline if needed. If no timeout is
// set cancel would be a simple noop.
func (n *bootstrapper) newContext() (ctx context.Context, cancel context.CancelFunc) {
ctx = context.Background()
cancel = func() {}

if n.options.Timeout > 0 {
ctx, cancel = context.WithDeadline(ctx, time.Now().Add(n.options.Timeout))
}

return ctx, cancel
}
146 changes: 122 additions & 24 deletions upstream/upstream_doh.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,6 @@ import (
"net"
"net/http"
"net/url"
"os"
"sync"
"time"

Expand Down Expand Up @@ -74,12 +73,7 @@ func newDoH(uu *url.URL, opts *Options) (u Upstream, err error) {

quicConfig: &quic.Config{
KeepAlivePeriod: QUICKeepAlivePeriod,
// You can read more on address validation here:
// https://datatracker.ietf.org/doc/html/rfc9000#section-8.1
// Setting maxOrigins to 1 and tokensPerOrigin to 10 assuming that
// this is more than enough for the way we use it (one connection
// per upstream).
TokenStore: quic.NewLRUTokenStore(1, 10),
TokenStore: newQUICTokenStore(),
},
}, nil
}
Expand All @@ -88,17 +82,69 @@ func newDoH(uu *url.URL, opts *Options) (u Upstream, err error) {
func (p *dnsOverHTTPS) Address() string { return p.boot.URL.String() }

// Exchange implements the Upstream interface for *dnsOverHTTPS.
func (p *dnsOverHTTPS) Exchange(m *dns.Msg) (*dns.Msg, error) {
func (p *dnsOverHTTPS) Exchange(m *dns.Msg) (resp *dns.Msg, err error) {
// Quote from https://www.rfc-editor.org/rfc/rfc8484.html:
// In order to maximize HTTP cache friendliness, DoH clients using media
// formats that include the ID field from the DNS message header, such
// as "application/dns-message", SHOULD use a DNS ID of 0 in every DNS
// request.
id := m.Id
m.Id = 0
defer func() {
// Restore the original ID to not break compatibility with proxies.
m.Id = id
if resp != nil {
resp.Id = id
}
}()

// Check if there was already an active client before sending the request.
// We'll only attempt to re-connect if there was one.
hasClient := p.hasClient()

// Make the first attempt to send the DNS query.
resp, err = p.exchangeHTTPS(m)

// Make up to 2 attempts to re-create the HTTP client and send the request
// again. There are several cases (mostly, with QUIC) where this workaround
// is necessary to make HTTP client usable. We need to make 2 attempts in
// the case when the connection was closed (due to inactivity for example)
// AND the server refuses to open a 0-RTT connection.
for i := 0; hasClient && p.shouldRetry(err) && i < 2; i++ {
log.Debug("re-creating the HTTP client and retrying due to %v", err)

p.clientGuard.Lock()
p.client = nil
// Re-create the token store to make sure we're not trying to use invalid
// tokens for 0-RTT.
p.quicConfig.TokenStore = newQUICTokenStore()
p.clientGuard.Unlock()

resp, err = p.exchangeHTTPS(m)
}

if err != nil {
// If the request failed anyway, make sure we don't use this client.
p.clientGuard.Lock()
p.client = nil
p.clientGuard.Unlock()
}

return resp, err
}

// exchangeHTTPS creates an HTTP client and sends the DNS query using it.
func (p *dnsOverHTTPS) exchangeHTTPS(m *dns.Msg) (resp *dns.Msg, err error) {
client, err := p.getClient()
if err != nil {
return nil, fmt.Errorf("initializing http client: %w", err)
}

logBegin(p.Address(), m)
r, err := p.exchangeHTTPSClient(m, client)
resp, err = p.exchangeHTTPSClient(m, client)
logFinish(p.Address(), err)

return r, err
return resp, err
}

// exchangeHTTPSClient sends the DNS query to a DoH resolver using the specified
Expand All @@ -125,16 +171,6 @@ func (p *dnsOverHTTPS) exchangeHTTPSClient(m *dns.Msg, client *http.Client) (*dn
defer resp.Body.Close()
}
if err != nil {
if errors.Is(err, os.ErrDeadlineExceeded) {
// If this is a timeout error, trying to forcibly re-create the HTTP
// client instance.
//
// See https://github.com/AdguardTeam/AdGuardHome/issues/3217.
p.clientGuard.Lock()
p.client = nil
p.clientGuard.Unlock()
}

return nil, fmt.Errorf("requesting %s: %w", p.boot.URL, err)
}

Expand All @@ -160,6 +196,55 @@ func (p *dnsOverHTTPS) exchangeHTTPSClient(m *dns.Msg, client *http.Client) (*dn
return &response, err
}

// hasClient returns true if this connection already has an active HTTP client.
func (p *dnsOverHTTPS) hasClient() (ok bool) {
p.clientGuard.Lock()
defer p.clientGuard.Unlock()

return p.client != nil
}

// shouldRetry checks what error we have received and returns true if we should
// re-create the HTTP client and retry the request.
func (p *dnsOverHTTPS) shouldRetry(err error) (ok bool) {
if err == nil {
return false
}

var netErr net.Error
if errors.As(err, &netErr) && netErr.Timeout() {
// If this is a timeout error, trying to forcibly re-create the HTTP
// client instance.
//
// See https://github.com/AdguardTeam/AdGuardHome/issues/3217.
return true
}

var qAppErr *quic.ApplicationError
if errors.As(err, &qAppErr) && qAppErr.ErrorCode == 0 {
// If this is a QUIC error, we'd better re-create the HTTP client and
// reset the token store. See this example where HTTP3 client does not
// handle dead connections:
// https://github.com/lucas-clemente/quic-go/issues/765
return true
}

var qIdleErr *quic.IdleTimeoutError
if errors.As(err, &qIdleErr) {
// This error means that the connection was closed due to being idle.
// In this case we should forcibly re-create the HTTP client.
return true
}

if errors.Is(err, quic.Err0RTTRejected) {
// Unfortunately, HTTP3 client does not handle these errors:
// https://github.com/lucas-clemente/quic-go/issues/3259
return true
}

return false
}

// getClient gets or lazily initializes an HTTP client (and transport) that will
// be used for this DoH resolver.
func (p *dnsOverHTTPS) getClient() (c *http.Client, err error) {
Expand Down Expand Up @@ -258,6 +343,15 @@ func (p *dnsOverHTTPS) createTransport() (t http.RoundTripper, err error) {
return transport, nil
}

type http3RoundTripper struct {
rt *http3.RoundTripper
}

func (r *http3RoundTripper) RoundTrip(req *http.Request) (resp *http.Response, err error) {
resp, err = r.rt.RoundTrip(req)
return resp, err
}

// createTransportH3 tries to create an HTTP/3 transport for this upstream.
// We should be able to fall back to H1/H2 in case if HTTP/3 is unavailable or
// if it is too slow. In order to do that, this method will run two probes
Expand All @@ -266,7 +360,7 @@ func (p *dnsOverHTTPS) createTransport() (t http.RoundTripper, err error) {
func (p *dnsOverHTTPS) createTransportH3(
tlsConfig *tls.Config,
dialContext dialHandler,
) (roundTripper *http3.RoundTripper, err error) {
) (roundTripper http.RoundTripper, err error) {
if !p.supportsH3() {
return nil, errors.Error("HTTP3 support is not enabled")
}
Expand All @@ -276,21 +370,25 @@ func (p *dnsOverHTTPS) createTransportH3(
return nil, err
}

return &http3.RoundTripper{
rt := &http3.RoundTripper{
Dial: func(
ctx context.Context,

// Ignore the address and always connect to the one that we got
// from the bootstrapper.
_ string,
tlsCfg *tls.Config,
cfg *quic.Config,
) (c quic.EarlyConnection, err error) {
return quic.DialAddrEarlyContext(ctx, addr, tlsCfg, cfg)
c, err = quic.DialAddrEarlyContext(ctx, addr, tlsCfg, cfg)
return c, err
},
DisableCompression: true,
TLSClientConfig: tlsConfig,
QuicConfig: p.quicConfig,
}, nil
}

return &http3RoundTripper{rt: rt}, nil
}

// probeH3 runs a test to check whether QUIC is faster than TLS for this
Expand Down
Loading

0 comments on commit 79f47f5

Please sign in to comment.