diff --git a/stats/config/init.go b/stats/config/init.go index 91142b769a..65cd824eda 100644 --- a/stats/config/init.go +++ b/stats/config/init.go @@ -3,6 +3,7 @@ package config import ( "flag" "strings" + "time" "github.com/grafana/metrictank/stats" "github.com/raintank/worldping-api/pkg/log" @@ -14,6 +15,7 @@ var prefix string var addr string var interval int var bufferSize int +var timeout time.Duration func ConfigSetup() { inStats := flag.NewFlagSet("stats", flag.ExitOnError) @@ -21,6 +23,7 @@ func ConfigSetup() { inStats.StringVar(&prefix, "prefix", "metrictank.stats.default.$instance", "stats prefix (will add trailing dot automatically if needed)") inStats.StringVar(&addr, "addr", "localhost:2003", "graphite address") inStats.IntVar(&interval, "interval", 1, "interval at which to send statistics") + inStats.DurationVar(&timeout, "timeout", time.Second*10, "timeout after which a write is considered not successful") inStats.IntVar(&bufferSize, "buffer-size", 20000, "how many messages (holding all measurements from one interval. rule of thumb: a message is ~25kB) to buffer up in case graphite endpoint is unavailable. With the default of 20k you will use max about 500MB and bridge 5 hours of downtime when needed") globalconf.Register("stats", inStats) } @@ -36,7 +39,7 @@ func ConfigProcess(instance string) { func Start() { if enabled { stats.NewMemoryReporter() - stats.NewGraphite(prefix, addr, interval, bufferSize) + stats.NewGraphite(prefix, addr, interval, bufferSize, timeout) } else { stats.NewDevnull() log.Warn("running metrictank without instrumentation.") diff --git a/stats/out_graphite.go b/stats/out_graphite.go index b66460e1a8..44b3482935 100644 --- a/stats/out_graphite.go +++ b/stats/out_graphite.go @@ -2,7 +2,9 @@ package stats import ( "bytes" + "io" "net" + "sync" "time" "github.com/raintank/worldping-api/pkg/log" @@ -25,10 +27,11 @@ type Graphite struct { prefix []byte addr string + timeout time.Duration toGraphite chan []byte } -func NewGraphite(prefix, addr string, interval int, bufferSize int) { +func NewGraphite(prefix, addr string, interval, bufferSize int, timeout time.Duration) { if len(prefix) != 0 && prefix[len(prefix)-1] != '.' { prefix = prefix + "." } @@ -44,6 +47,7 @@ func NewGraphite(prefix, addr string, interval int, bufferSize int) { prefix: []byte(prefix), addr: addr, toGraphite: make(chan []byte, bufferSize), + timeout: timeout, } go g.writer() go g.reporter(interval) @@ -80,31 +84,33 @@ func (g *Graphite) reporter(interval int) { } // writer connects to graphite and submits all pending data to it -// TODO: conn.Write() returns no error for a while when the remote endpoint is down, the reconnect happens with a delay. this can also cause lost data for a second or two. func (g *Graphite) writer() { var conn net.Conn var err error + var wg sync.WaitGroup - assureConn := func() net.Conn { + assureConn := func() { connected.Set(conn != nil) for conn == nil { time.Sleep(time.Second) conn, err = net.Dial("tcp", g.addr) if err == nil { log.Info("stats now connected to %s", g.addr) + wg.Add(1) + go g.checkEOF(conn, &wg) } else { log.Warn("stats dialing %s failed: %s. will retry", g.addr, err.Error()) } connected.Set(conn != nil) } - return conn } for buf := range g.toGraphite { queueItems.Value(len(g.toGraphite)) var ok bool for !ok { - conn = assureConn() + assureConn() + conn.SetWriteDeadline(time.Now().Add(g.timeout)) pre := time.Now() _, err = conn.Write(buf) if err == nil { @@ -113,8 +119,38 @@ func (g *Graphite) writer() { } else { log.Warn("stats failed to write to graphite: %s (took %s). will retry...", err, time.Now().Sub(pre)) conn.Close() + wg.Wait() conn = nil } } } } + +// normally the remote end should never write anything back +// but we know when we get EOF that the other end closed the conn +// if not for this, we can happily write and flush without getting errors (in Go) but getting RST tcp packets back (!) +// props to Tv` for this trick. +func (g *Graphite) checkEOF(conn net.Conn, wg *sync.WaitGroup) { + defer wg.Done() + b := make([]byte, 1024) + for { + num, err := conn.Read(b) + if err == io.EOF { + log.Info("Graphite.checkEOF: remote closed conn. closing conn") + conn.Close() + return + } + + // in case the remote behaves badly (out of spec for carbon protocol) + if num != 0 { + log.Warn("Graphite.checkEOF: read unexpected data from peer: %s\n", b[:num]) + continue + } + + if err != io.EOF { + log.Warn("Graphite.checkEOF: %s. closing conn\n", err) + conn.Close() + return + } + } +}