Skip to content

Commit

Permalink
add fanout plugin
Browse files Browse the repository at this point in the history
Signed-off-by: denis-tingajkin <denis.tingajkin@xored.com>
  • Loading branch information
denis-tingaikin committed Mar 12, 2020
1 parent c2aa248 commit 7def468
Show file tree
Hide file tree
Showing 16 changed files with 1,702 additions and 1 deletion.
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -13,3 +13,4 @@

# Dependency directories (remove the comment below to include it)
# vendor/
.idea
102 changes: 101 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
@@ -1,2 +1,102 @@
# fanout
Repository for the coredns fanout plugin used by Network Service Mesh

## Name

*fanout* - parallel proxying DNS messages to upstream resolvers.

## Description

Each incoming DNS query that hits the CoreDNS fanout plugin will be replicated in parallel to each listed IP (i.e. the DNS servers). The first non-negative response from any of the queried DNS Servers will be forwarded as a response to the application's DNS request.

## Syntax

* `tls` **CERT** **KEY** **CA** define the TLS properties for TLS connection. From 0 to 3 arguments can be
provided with the meaning as described below
* `tls` - no client authentication is used, and the system CAs are used to verify the server certificate
* `tls` **CA** - no client authentication is used, and the file CA is used to verify the server certificate
* `tls` **CERT** **KEY** - client authentication is used with the specified cert/key pair.
The server certificate is verified with the system CAs
* `tls` **CERT** **KEY** **CA** - client authentication is used with the specified cert/key pair.
The server certificate is verified using the specified CA file
* `tls_servername` **NAME** allows you to set a server name in the TLS configuration; for instance 9.9.9.9
needs this to be set to `dns.quad9.net`. Multiple upstreams are still allowed in this scenario,
but they have to use the same `tls_servername`. E.g. mixing 9.9.9.9 (QuadDNS) with 1.1.1.1
(Cloudflare) will not work.

* `worker-count` is the number of parallel queries per request. By default equals to count of IP list. Use this only for reducing parallel queries per request.
* `network` is a specific network protocol. Could be `tcp`, `udp`, `tcp-tls`.
* `except` is a list is a space-separated list of domains to exclude from proxying.

## Metrics

If monitoring is enabled (via the *prometheus* plugin) then the following metric are exported:

* `coredns_fanout_request_duration_seconds{to}` - duration per upstream interaction.
* `coredns_fanout_request_count_total{to}` - query count per upstream.
* `coredns_fanout_response_rcode_count_total{to, rcode}` - count of RCODEs per upstream.
* `coredns_fanout_healthcheck_failure_count_total{to}` - number of failed health checks per upstream.
* `coredns_fanout_healthcheck_broken_count_total{}` - counter of when all upstreams are unhealthy,
and we are randomly (this always uses the `random` policy) spraying to an upstream.

Where `to` is one of the upstream servers (**TO** from the config), `rcode` is the returned RCODE
from the upstream.

## Examples
Proxy all requests within `example.org.` to a nameservers running on a different ports. The first positive response from a proxy will be provided as the result.

~~~ corefile
example.org {
fanout . 127.0.0.1:9005 127.0.0.1:9006 127.0.0.1:9007 127.0.0.1:9008
}
~~~

Sends parallel requests between three resolvers, one of which has a IPv6 address via TCP. The first response from proxy will be provided as the result.

~~~ corefile
. {
fanout . 10.0.0.10:53 10.0.0.11:1053 [2003::1]:53 {
network TCP
}
}
~~~

Proxying everything except requests to `example.org`

~~~ corefile
. {
fanout . 10.0.0.10:1234 {
except example.org
}
}
~~~

Proxy everything except `example.org` using the host's `resolv.conf`'s nameservers:

~~~ corefile
. {
fanout . /etc/resolv.conf {
except example.org
}
}
~~~

Proxy all requests to 9.9.9.9 using the DNS-over-TLS protocol.
Note the `tls-server` is mandatory if you want a working setup, as 9.9.9.9 can't be
used in the TLS negotiation.

~~~ corefile
. {
fanout . tls://9.9.9.9 {
tls-server dns.quad9.net
}
}
~~~

Sends parallel requests between five resolvers via UDP uses two workers and without attempting to reconnect. The first positive response from a proxy will be provided as the result.
~~~ corefile
. {
fanout . 10.0.0.10:53 10.0.0.11:53 10.0.0.12:53 10.0.0.13:1053 10.0.0.14:1053 {
worker-count 2
}
}
~~~
83 changes: 83 additions & 0 deletions client.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,83 @@
package fanout

import (
"crypto/tls"
"fmt"
"github.com/coredns/coredns/request"
"github.com/miekg/dns"
"time"
)

// Client represents the proxy for remote DNS server
type Client interface {
Request(request.Request) (*dns.Msg, error)
Endpoint() string
SetTLSConfig(*tls.Config)
}

type client struct {
transport Transport
addr string
net string
}

// NewClient creates new client with specific addr and network
func NewClient(addr, net string) Client {
a := &client{
addr: addr,
net: net,
transport: NewTransport(addr),
}
return a
}

// SetTLSConfig sets tls config for client
func (c *client) SetTLSConfig(cfg *tls.Config) {
if cfg != nil {
c.net = tcptlc
}
c.transport.SetTLSConfig(cfg)
}

// Endpoint returns address of DNS server
func (c *client) Endpoint() string {
return c.addr
}

// Request sends request to DNS server
func (c *client) Request(request request.Request) (*dns.Msg, error) {
start := time.Now()
conn, err := c.transport.Dial(c.net)
if err != nil {
return nil, err
}
defer func() {
logErrIfNotNil(conn.Close())
}()

logErrIfNotNil(conn.SetWriteDeadline(time.Now().Add(maxTimeout)))
if err = conn.WriteMsg(request.Req); err != nil {
logErrIfNotNil(err)
return nil, err
}
logErrIfNotNil(conn.SetReadDeadline(time.Now().Add(readTimeout)))
var ret *dns.Msg
for {
ret, err = conn.ReadMsg()
if err != nil {
logErrIfNotNil(err)
return nil, err
}
if request.Req.Id == ret.Id {
break
}
}
rc, ok := dns.RcodeToString[ret.Rcode]
if !ok {
rc = fmt.Sprint(ret.Rcode)
}
RequestCount.WithLabelValues(c.addr).Add(1)
RcodeCount.WithLabelValues(rc, c.addr).Add(1)
RequestDuration.WithLabelValues(c.addr).Observe(time.Since(start).Seconds())
return ret, nil
}
36 changes: 36 additions & 0 deletions connect_result.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
package fanout

import (
"github.com/miekg/dns"
"time"
)

type response struct {
client Client
response *dns.Msg
start time.Time
err error
}

func isBetter(left, right *response) bool {
if right == nil {
return false
}
if left == nil {
return true
}
if right.err != nil {
return false
}
if left.err != nil {
return true
}
if right.response == nil {
return false
}
if left.response == nil {
return true
}
return left.response.MsgHdr.Rcode != dns.RcodeSuccess &&
right.response.MsgHdr.Rcode == dns.RcodeSuccess
}
13 changes: 13 additions & 0 deletions const.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
package fanout

import "time"

const (
maxIPCount = 100
maxWorkerCount = 32
minWorkerCount = 2
maxTimeout = 2 * time.Second
defaultTimeout = 30 * time.Second
readTimeout = 2 * time.Second
tcptlc = "tcp-tls"
)
134 changes: 134 additions & 0 deletions fanout.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,134 @@
package fanout

import (
"context"
"crypto/tls"
"github.com/coredns/coredns/plugin"
"github.com/coredns/coredns/plugin/debug"
clog "github.com/coredns/coredns/plugin/pkg/log"
"github.com/coredns/coredns/request"
"github.com/miekg/dns"
"time"
)

var log = clog.NewWithPlugin("fanout")

// Fanout represents a plugin instance that can do async requests to list of DNS servers.
type Fanout struct {
clients []Client
tlsConfig *tls.Config
ignored []string
tlsServerName string
net string
from string
workerCount int
Next plugin.Handler
}

// New returns reference to new Fanout plugin instance with default configs.
func New() *Fanout {
return &Fanout{
tlsConfig: new(tls.Config),
net: "udp",
}
}

func (f *Fanout) addClient(p Client) {
f.clients = append(f.clients, p)
f.workerCount++
}

// Name implements plugin.Handler.
func (f *Fanout) Name() string {
return "fanout"
}

// ServeDNS implements plugin.Handler.
func (f *Fanout) ServeDNS(ctx context.Context, w dns.ResponseWriter, m *dns.Msg) (int, error) {
req := request.Request{W: w, Req: m}
if !f.match(req) {
return plugin.NextOrFailure(f.Name(), f.Next, ctx, w, m)
}
timeoutContext, cancel := context.WithTimeout(ctx, defaultTimeout)
defer cancel()
clientCount := len(f.clients)
workerChannel := make(chan Client, f.workerCount)
responseCh := make(chan *response, clientCount)
go func() {
for i := 0; i < clientCount; i++ {
client := f.clients[i]
workerChannel <- client
}
}()
for i := 0; i < f.workerCount; i++ {
go func() {
for c := range workerChannel {
start := time.Now()
msg, err := c.Request(request.Request{W: w, Req: m})
responseCh <- &response{client: c, response: msg, start: start, err: err}
}
}()
}
result := f.getFanoutResult(timeoutContext, responseCh)
if result == nil {
return dns.RcodeServerFailure, timeoutContext.Err()
}
if result.err != nil {
return dns.RcodeServerFailure, result.err
}
dnsTAP := toDnstap(ctx, result.client.Endpoint(), f.net, req, result.response, result.start)
if !req.Match(result.response) {
debug.Hexdumpf(result.response, "Wrong reply for id: %d, %s %d", result.response.Id, req.QName(), req.QType())
formerr := new(dns.Msg)
formerr.SetRcode(req.Req, dns.RcodeFormatError)
logErrIfNotNil(w.WriteMsg(formerr))
return 0, dnsTAP
}
logErrIfNotNil(w.WriteMsg(result.response))
return 0, dnsTAP
}

func (f *Fanout) getFanoutResult(ctx context.Context, responseCh <-chan *response) *response {
count := len(f.clients)
var result *response
for {
select {
case <-ctx.Done():
return result
case r := <-responseCh:
count--
if isBetter(result, r) {
result = r
}
if count == 0 {
return result
}
if r.err != nil {
break
}
if r.response.Rcode != dns.RcodeSuccess {
break
}
return r
}
}
}

func (f *Fanout) match(state request.Request) bool {
if !plugin.Name(f.from).Matches(state.Name()) || !f.isAllowedDomain(state.Name()) {
return false
}
return true
}

func (f *Fanout) isAllowedDomain(name string) bool {
if dns.Name(name) == dns.Name(f.from) {
return true
}
for _, ignore := range f.ignored {
if plugin.Name(ignore).Matches(name) {
return false
}
}
return true
}
Loading

0 comments on commit 7def468

Please sign in to comment.