Skip to content

Commit

Permalink
tcp_sink: don't hang on Flush() if reconnecting and add Options
Browse files Browse the repository at this point in the history
This fixes a bug where the TCP Sink would hang indefinitely on Flush()
if it was unable to reconnect.

It also adds the SinkOption interface for configuring the TCP Sink,
previously could only be done via environment variables (it still
defaults to using env vars). This changes the function signature of
NewTCPStatsdSink(), but the change should not be breaking since the new
"opts" argument is variadic.

This also significantly increases the code coverage of the TCP Sink
tests.
  • Loading branch information
Charlie Vieth committed Mar 31, 2020
1 parent 59f0da0 commit 1b56639
Show file tree
Hide file tree
Showing 4 changed files with 568 additions and 11 deletions.
90 changes: 79 additions & 11 deletions tcp_sink.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,11 +13,6 @@ import (
logger "github.com/sirupsen/logrus"
)

// TODO(btc): add constructor that accepts functional options in order to allow
// users to choose the constants that work best for them. (Leave the existing
// c'tor for backwards compatibility)
// e.g. `func NewTCPStatsdSinkWithOptions(opts ...Option) Sink`

const (
defaultRetryInterval = time.Second * 3
defaultDialTimeout = defaultRetryInterval / 2
Expand All @@ -30,19 +25,65 @@ const (
chanSize = approxMaxMemBytes / defaultBufferSize
)

// An SinkOption configures a Sink.
type SinkOption interface {
apply(*tcpStatsdSink)
}

// sinkOptionFunc wraps a func so it satisfies the Option interface.
type sinkOptionFunc func(*tcpStatsdSink)

func (f sinkOptionFunc) apply(sink *tcpStatsdSink) {
f(sink)
}

// WithStatsdHost sets the host of the statsd sink otherwise the host is
// read from the environment variable "STATSD_HOST".
func WithStatsdHost(host string) SinkOption {
return sinkOptionFunc(func(sink *tcpStatsdSink) {
sink.statsdHost = host
})
}

// WithStatsdPort sets the port of the statsd sink otherwise the port is
// read from the environment variable "STATSD_PORT".
func WithStatsdPort(port int) SinkOption {
return sinkOptionFunc(func(sink *tcpStatsdSink) {
sink.statsdPort = port
})
}

// WithLogger configures the sink to use the provided logger otherwise
// the standard logrus logger is used.
func WithLogger(log *logger.Logger) SinkOption {
// TODO (CEV): use the zap.Logger
return sinkOptionFunc(func(sink *tcpStatsdSink) {
sink.log = log
})
}

// NewTCPStatsdSink returns a FlushableSink that is backed by a buffered writer
// and a separate goroutine that flushes those buffers to a statsd connection.
func NewTCPStatsdSink() FlushableSink {
func NewTCPStatsdSink(opts ...SinkOption) FlushableSink {
outc := make(chan *bytes.Buffer, chanSize) // TODO(btc): parameterize
writer := sinkWriter{
outc: outc,
}
// TODO (CEV): this auto loading from the env is bad and should be removed.
conf := GetSettings()
s := &tcpStatsdSink{
outc: outc,
// TODO(btc): parameterize size
bufWriter: bufio.NewWriterSize(&writer, defaultBufferSize),
// arbitrarily buffered
doFlush: make(chan chan struct{}, 8),
// CEV: default to the standard logger to match the legacy implementation.
log: logger.StandardLogger(),
statsdHost: conf.StatsdHost,
statsdPort: conf.StatsdPort,
}
for _, opt := range opts {
opt.apply(s)
}
go s.run()
return s
Expand All @@ -55,6 +96,9 @@ type tcpStatsdSink struct {
bufWriter *bufio.Writer
doFlush chan chan struct{}
droppedBytes uint64
log *logger.Logger
statsdHost string
statsdPort int
}

type sinkWriter struct {
Expand Down Expand Up @@ -92,11 +136,26 @@ func (s *tcpStatsdSink) flush() error {
return err
}

func (s *tcpStatsdSink) drainFlushQueue() {
// Limit the number of items we'll flush to prevent this from possibly
// hanging when the flush channel is saturated with sends.
doFlush := s.doFlush
n := cap(doFlush) * 8
for i := 0; i < n; i++ {
select {
case ch := <-doFlush:
close(ch)
default:
return
}
}
}

// s.mu should be held
func (s *tcpStatsdSink) handleFlushErrorSize(err error, dropped int) {
d := uint64(dropped)
if (s.droppedBytes+d)%logOnEveryNDroppedBytes > s.droppedBytes%logOnEveryNDroppedBytes {
logger.WithField("total_dropped_bytes", s.droppedBytes+d).
s.log.WithField("total_dropped_bytes", s.droppedBytes+d).
WithField("dropped_bytes", d).
Error(err)
}
Expand Down Expand Up @@ -177,20 +236,29 @@ func (s *tcpStatsdSink) FlushTimer(name string, value float64) {
}

func (s *tcpStatsdSink) run() {
conf := GetSettings()
addr := net.JoinHostPort(conf.StatsdHost, strconv.Itoa(conf.StatsdPort))
addr := net.JoinHostPort(s.statsdHost, strconv.Itoa(s.statsdPort))

var reconnectFailed bool // true if last reconnect failed

t := time.NewTicker(flushInterval)
defer t.Stop()
for {
if s.conn == nil {
if err := s.connect(addr); err != nil {
logger.Warnf("statsd connection error: %s", err)
s.log.Warnf("statsd connection error: %s", err)

// If the previous reconnect attempt failed, drain the flush
// queue to prevent Flush() from blocking indefinitely.
if reconnectFailed {
s.drainFlushQueue()
}
reconnectFailed = true

// TODO (CEV): don't sleep on the first retry
time.Sleep(3 * time.Second)
time.Sleep(defaultRetryInterval)
continue
}
reconnectFailed = false
}

select {
Expand Down
Loading

0 comments on commit 1b56639

Please sign in to comment.