Skip to content

Commit

Permalink
Merge pull request #16 from denis-tingajkin/fix_issue_14
Browse files Browse the repository at this point in the history
Fix i/o timeout error
  • Loading branch information
haiodo authored Apr 7, 2020
2 parents ecfc24e + 733881a commit 47a5b58
Show file tree
Hide file tree
Showing 8 changed files with 100 additions and 18 deletions.
4 changes: 4 additions & 0 deletions .golangci.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -168,3 +168,7 @@ issues:
- path: "fanout_test.go"
linters:
- gosec
# Tests may be long.
- path: "setup_test.go"
linters:
- gocyclo
3 changes: 2 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,8 @@ Each incoming DNS query that hits the CoreDNS fanout plugin will be replicated i
* `worker-count` is the number of parallel queries per request. By default equals to count of IP list. Use this only for reducing parallel queries per request.
* `network` is a specific network protocol. Could be `tcp`, `udp`, `tcp-tls`.
* `except` is a list is a space-separated list of domains to exclude from proxying.

* `attempt-count` is the number of attempts to connect to upstream servers that are needed before considering an upstream to be down. If 0, the upstream will never be marked as down and request will be finished by `timeout`. Default is `3`.
* `timeout` is the timeout of request. After this period, attempts to receive a response from the upstream servers will be stopped. Default is `30s`.
## Metrics

If monitoring is enabled (via the *prometheus* plugin) then the following metric are exported:
Expand Down
1 change: 0 additions & 1 deletion client.go
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,6 @@ func (c *client) Request(r *request.Request) (*dns.Msg, error) {
for {
ret, err = conn.ReadMsg()
if err != nil {
logErrIfNotNil(err)
return nil, err
}
if r.Req.Id == ret.Id {
Expand Down
1 change: 1 addition & 0 deletions const.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ const (
maxTimeout = 2 * time.Second
defaultTimeout = 30 * time.Second
readTimeout = 2 * time.Second
attemptDelay = time.Millisecond * 100
tcptlc = "tcp-tls"
tcp = "tcp"
udp = "udp"
Expand Down
26 changes: 23 additions & 3 deletions fanout.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ package fanout
import (
"context"
"crypto/tls"
"errors"
"time"

"github.com/coredns/coredns/plugin"
Expand All @@ -36,8 +37,10 @@ type Fanout struct {
tlsConfig *tls.Config
ignored []string
tlsServerName string
timeout time.Duration
net string
from string
attempts int
workerCount int
Next plugin.Handler
}
Expand All @@ -47,6 +50,8 @@ func New() *Fanout {
return &Fanout{
tlsConfig: new(tls.Config),
net: "udp",
attempts: 3,
timeout: defaultTimeout,
}
}

Expand Down Expand Up @@ -80,9 +85,7 @@ func (f *Fanout) ServeDNS(ctx context.Context, w dns.ResponseWriter, m *dns.Msg)
for i := 0; i < f.workerCount; i++ {
go func() {
for c := range workerChannel {
start := time.Now()
msg, err := c.Request(&request.Request{W: w, Req: m})
responseCh <- &response{client: c, response: msg, start: start, err: err}
responseCh <- f.processClient(timeoutContext, c, &request.Request{W: w, Req: m})
}
}()
}
Expand Down Expand Up @@ -149,3 +152,20 @@ func (f *Fanout) isAllowedDomain(name string) bool {
}
return true
}

func (f *Fanout) processClient(ctx context.Context, c Client, r *request.Request) *response {
start := time.Now()
for j := 0; j < f.attempts || f.attempts == 0; <-time.After(attemptDelay) {
if ctx.Err() != nil {
return &response{client: c, response: nil, start: start, err: ctx.Err()}
}
msg, err := c.Request(r)
if err == nil {
return &response{client: c, response: msg, start: start, err: err}
}
if f.attempts != 0 {
j++
}
}
return &response{client: c, response: nil, start: start, err: errors.New("attempt limit has been reached")}
}
32 changes: 32 additions & 0 deletions fanout_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import (
"fmt"
"net"
"sync"
"sync/atomic"
"testing"
"time"

Expand Down Expand Up @@ -241,6 +242,37 @@ func (t *fanoutTestSuite) TestCanReturnUnsuccessfulRepose() {
t.Equal(writer.answers[0].MsgHdr.Rcode, dns.RcodeNameError, "fanout plugin returns first negative answer if other answers on request are negative")
}

func (t *fanoutTestSuite) TestBusyServer() {
var requestNum, answerCount int32
totalRequestNum := int32(5)
s := newServer(t.network, func(w dns.ResponseWriter, r *dns.Msg) {
if atomic.LoadInt32(&requestNum)%2 == 0 {
// server is busy
} else if r.Question[0].Name == testQuery {
msg := dns.Msg{
Answer: []dns.RR{makeRecordA("example1 3600 IN A 10.0.0.1")},
}
atomic.AddInt32(&answerCount, 1)
msg.SetReply(r)
logErrIfNotNil(w.WriteMsg(&msg))
}
atomic.AddInt32(&requestNum, 1)
})
c := NewClient(s.addr, t.network)
f := New()
f.net = t.network
f.from = "."
f.attempts = 0
f.addClient(c)
req := new(dns.Msg)
req.SetQuestion(testQuery, dns.TypeA)
for i := int32(0); i < totalRequestNum; i++ {
_, err := f.ServeDNS(context.TODO(), &test.ResponseWriter{}, req)
t.Nil(err)
}
t.Equal(totalRequestNum, atomic.LoadInt32(&answerCount))
}

func (t *fanoutTestSuite) TestTwoServers() {
const expected = 1
var mutex sync.Mutex
Expand Down
17 changes: 17 additions & 0 deletions setup.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ package fanout
import (
"strconv"
"strings"
"time"

"github.com/caddyserver/caddy"
"github.com/caddyserver/caddy/caddyfile"
Expand Down Expand Up @@ -152,13 +153,29 @@ func parseValue(v string, f *Fanout, c *caddyfile.Dispenser) error {
return parseTLSServer(f, c)
case "worker-count":
return parseWorkerCount(f, c)
case "timeout":
return parseTimeout(f, c)
case "except":
return parseIgnored(f, c)
case "attempt-count":
num, err := parsePositiveInt(c)
f.attempts = num
return err
default:
return errors.Errorf("unknown property %v", v)
}
}

func parseTimeout(f *Fanout, c *caddyfile.Dispenser) error {
if !c.NextArg() {
return c.ArgErr()
}
var err error
val := c.Val()
f.timeout, err = time.ParseDuration(val)
return err
}

func parseIgnored(f *Fanout, c *caddyfile.Dispenser) error {
ignore := c.RemainingArgs()
if len(ignore) == 0 {
Expand Down
34 changes: 21 additions & 13 deletions setup_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,26 +22,29 @@ import (
"reflect"
"strings"
"testing"
"time"

"github.com/caddyserver/caddy"
)

func TestSetup(t *testing.T) {
tests := []struct {
input string
expectedFrom string
expectedTo []string
expectedIgnored []string
expectedFails int
expectedWorkers int
expectedNetwork string
expectedErr string
input string
expectedFrom string
expectedTo []string
expectedIgnored []string
expectedWorkers int
expectedAttempts int
expectedTimeout time.Duration
expectedNetwork string
expectedErr string
}{
// positive
{input: "fanout . 127.0.0.1", expectedFrom: ".", expectedFails: 2, expectedWorkers: 1, expectedNetwork: "udp"},
{input: "fanout . 127.0.0.1 {\nexcept a b\nworker-count 3\n}", expectedFrom: ".", expectedFails: 2, expectedWorkers: 1, expectedIgnored: []string{"a.", "b."}, expectedNetwork: "udp"},
{input: "fanout . 127.0.0.1 127.0.0.2 {\nnetwork tcp\n}", expectedFrom: ".", expectedFails: 0, expectedWorkers: 2, expectedNetwork: "tcp", expectedTo: []string{"127.0.0.1:53", "127.0.0.2:53"}},
{input: "fanout . 127.0.0.1 127.0.0.2 127.0.0.3 127.0.0.4 {\nworker-count 3\n}", expectedFrom: ".", expectedFails: 2, expectedWorkers: 3, expectedNetwork: "udp"},
{input: "fanout . 127.0.0.1", expectedFrom: ".", expectedAttempts: 3, expectedWorkers: 1, expectedTimeout: defaultTimeout, expectedNetwork: "udp"},
{input: "fanout . 127.0.0.1 {\nexcept a b\nworker-count 3\n}", expectedFrom: ".", expectedTimeout: defaultTimeout, expectedAttempts: 3, expectedWorkers: 1, expectedIgnored: []string{"a.", "b."}, expectedNetwork: "udp"},
{input: "fanout . 127.0.0.1 127.0.0.2 {\nnetwork tcp\n}", expectedFrom: ".", expectedTimeout: defaultTimeout, expectedAttempts: 3, expectedWorkers: 2, expectedNetwork: "tcp", expectedTo: []string{"127.0.0.1:53", "127.0.0.2:53"}},
{input: "fanout . 127.0.0.1 127.0.0.2 127.0.0.3 127.0.0.4 {\nworker-count 3\ntimeout 1m\n}", expectedTimeout: time.Minute, expectedAttempts: 3, expectedFrom: ".", expectedWorkers: 3, expectedNetwork: "udp"},
{input: "fanout . 127.0.0.1 127.0.0.2 127.0.0.3 127.0.0.4 {\nattempt-count 2\n}", expectedTimeout: defaultTimeout, expectedFrom: ".", expectedAttempts: 2, expectedWorkers: 4, expectedNetwork: "udp"},

// negative
{input: "fanout . aaa", expectedErr: "not an IP address or file"},
Expand All @@ -63,7 +66,12 @@ func TestSetup(t *testing.T) {
}
continue
}

if f.timeout != test.expectedTimeout {
t.Fatalf("Test %d: expected: %d, got: %d", i, test.expectedTimeout, f.timeout)
}
if f.attempts != test.expectedAttempts {
t.Fatalf("Test %d: expected: %d, got: %d", i, test.expectedAttempts, f.attempts)
}
if f.from != test.expectedFrom && test.expectedFrom != "" {
t.Fatalf("Test %d: expected: %s, got: %s", i, test.expectedFrom, f.from)
}
Expand Down

0 comments on commit 47a5b58

Please sign in to comment.