Skip to content

Commit

Permalink
tcp_sink: don't hang on Flush() if reconnecting
Browse files Browse the repository at this point in the history
This fixes a bug where the TCP Sink would hang indefinitely on Flush()
if it was unable to reconnect.

This also significantly increases the code coverage of the TCP Sink
tests.
  • Loading branch information
Charlie Vieth committed Mar 31, 2020
1 parent 59f0da0 commit c6af809
Show file tree
Hide file tree
Showing 4 changed files with 568 additions and 11 deletions.
90 changes: 79 additions & 11 deletions tcp_sink.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,11 +13,6 @@ import (
logger "github.com/sirupsen/logrus"
)

// TODO(btc): add constructor that accepts functional options in order to allow
// users to choose the constants that work best for them. (Leave the existing
// c'tor for backwards compatibility)
// e.g. `func NewTCPStatsdSinkWithOptions(opts ...Option) Sink`

const (
defaultRetryInterval = time.Second * 3
defaultDialTimeout = defaultRetryInterval / 2
Expand All @@ -30,19 +25,65 @@ const (
chanSize = approxMaxMemBytes / defaultBufferSize
)

// An SinkOption configures a Sink.
type SinkOption interface {
apply(*tcpStatsdSink)
}

// sinkOptionFunc wraps a func so it satisfies the Option interface.
type sinkOptionFunc func(*tcpStatsdSink)

func (f sinkOptionFunc) apply(sink *tcpStatsdSink) {
f(sink)
}

// WithStatsdHost sets the host of the statsd sink otherwise the host is
// read from the environment variable "STATSD_HOST".
func WithStatsdHost(host string) SinkOption {
return sinkOptionFunc(func(sink *tcpStatsdSink) {
sink.statsdHost = host
})
}

// WithStatsdPort sets the port of the statsd sink otherwise the port is
// read from the environment variable "STATSD_PORT".
func WithStatsdPort(port int) SinkOption {
return sinkOptionFunc(func(sink *tcpStatsdSink) {
sink.statsdPort = port
})
}

// WithLogger configures the sink to use the provided logger otherwise
// the standard logrus logger is used.
func WithLogger(log *logger.Logger) SinkOption {
// TODO (CEV): use the zap.Logger
return sinkOptionFunc(func(sink *tcpStatsdSink) {
sink.log = log
})
}

// NewTCPStatsdSink returns a FlushableSink that is backed by a buffered writer
// and a separate goroutine that flushes those buffers to a statsd connection.
func NewTCPStatsdSink() FlushableSink {
func NewTCPStatsdSink(opts ...SinkOption) FlushableSink {
outc := make(chan *bytes.Buffer, chanSize) // TODO(btc): parameterize
writer := sinkWriter{
outc: outc,
}
// TODO (CEV): this auto loading from the env is bad and should be removed.
conf := GetSettings()
s := &tcpStatsdSink{
outc: outc,
// TODO(btc): parameterize size
bufWriter: bufio.NewWriterSize(&writer, defaultBufferSize),
// arbitrarily buffered
doFlush: make(chan chan struct{}, 8),
// CEV: default to the standard logger to match the legacy implementation.
log: logger.StandardLogger(),
statsdHost: conf.StatsdHost,
statsdPort: conf.StatsdPort,
}
for _, opt := range opts {
opt.apply(s)
}
go s.run()
return s
Expand All @@ -55,6 +96,9 @@ type tcpStatsdSink struct {
bufWriter *bufio.Writer
doFlush chan chan struct{}
droppedBytes uint64
log *logger.Logger
statsdHost string
statsdPort int
}

type sinkWriter struct {
Expand Down Expand Up @@ -92,11 +136,26 @@ func (s *tcpStatsdSink) flush() error {
return err
}

func (s *tcpStatsdSink) drainFlushQueue() {
// Limit the number of items we'll flush to prevent this from possibly
// hanging when the flush channel is saturated with sends.
doFlush := s.doFlush
n := cap(doFlush) * 8
for i := 0; i < n; i++ {
select {
case ch := <-doFlush:
close(ch)
default:
return
}
}
}

// s.mu should be held
func (s *tcpStatsdSink) handleFlushErrorSize(err error, dropped int) {
d := uint64(dropped)
if (s.droppedBytes+d)%logOnEveryNDroppedBytes > s.droppedBytes%logOnEveryNDroppedBytes {
logger.WithField("total_dropped_bytes", s.droppedBytes+d).
s.log.WithField("total_dropped_bytes", s.droppedBytes+d).
WithField("dropped_bytes", d).
Error(err)
}
Expand Down Expand Up @@ -177,20 +236,29 @@ func (s *tcpStatsdSink) FlushTimer(name string, value float64) {
}

func (s *tcpStatsdSink) run() {
conf := GetSettings()
addr := net.JoinHostPort(conf.StatsdHost, strconv.Itoa(conf.StatsdPort))
addr := net.JoinHostPort(s.statsdHost, strconv.Itoa(s.statsdPort))

var reconnectFailed bool // true if last reconnect failed

t := time.NewTicker(flushInterval)
defer t.Stop()
for {
if s.conn == nil {
if err := s.connect(addr); err != nil {
logger.Warnf("statsd connection error: %s", err)
s.log.Warnf("statsd connection error: %s", err)

// If the previous reconnect attempt failed, drain the flush
// queue to prevent Flush() from blocking indefinitely.
if reconnectFailed {
s.drainFlushQueue()
}
reconnectFailed = true

// TODO (CEV): don't sleep on the first retry
time.Sleep(3 * time.Second)
time.Sleep(defaultRetryInterval)
continue
}
reconnectFailed = false
}

select {
Expand Down
Loading

0 comments on commit c6af809

Please sign in to comment.