Skip to content
This repository was archived by the owner on Aug 23, 2023. It is now read-only.

set deadline before stats write/read #918

Merged
merged 8 commits into from
May 28, 2018
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 4 additions & 1 deletion stats/config/init.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package config
import (
"flag"
"strings"
"time"

"github.com/grafana/metrictank/stats"
"github.com/raintank/worldping-api/pkg/log"
Expand All @@ -14,13 +15,15 @@ var prefix string
var addr string
var interval int
var bufferSize int
var timeout time.Duration

func ConfigSetup() {
inStats := flag.NewFlagSet("stats", flag.ExitOnError)
inStats.BoolVar(&enabled, "enabled", true, "enable sending graphite messages for instrumentation")
inStats.StringVar(&prefix, "prefix", "metrictank.stats.default.$instance", "stats prefix (will add trailing dot automatically if needed)")
inStats.StringVar(&addr, "addr", "localhost:2003", "graphite address")
inStats.IntVar(&interval, "interval", 1, "interval at which to send statistics")
inStats.DurationVar(&timeout, "timeout", time.Second*10, "timeout after which a write is considered not successful")
inStats.IntVar(&bufferSize, "buffer-size", 20000, "how many messages (holding all measurements from one interval. rule of thumb: a message is ~25kB) to buffer up in case graphite endpoint is unavailable. With the default of 20k you will use max about 500MB and bridge 5 hours of downtime when needed")
globalconf.Register("stats", inStats)
}
Expand All @@ -36,7 +39,7 @@ func ConfigProcess(instance string) {
func Start() {
if enabled {
stats.NewMemoryReporter()
stats.NewGraphite(prefix, addr, interval, bufferSize)
stats.NewGraphite(prefix, addr, interval, bufferSize, timeout)
} else {
stats.NewDevnull()
log.Warn("running metrictank without instrumentation.")
Expand Down
46 changes: 41 additions & 5 deletions stats/out_graphite.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,9 @@ package stats

import (
"bytes"
"io"
"net"
"sync"
"time"

"github.com/raintank/worldping-api/pkg/log"
Expand All @@ -25,10 +27,11 @@ type Graphite struct {
prefix []byte
addr string

timeout time.Duration
toGraphite chan []byte
}

func NewGraphite(prefix, addr string, interval int, bufferSize int) {
func NewGraphite(prefix, addr string, interval, bufferSize int, timeout time.Duration) {
if len(prefix) != 0 && prefix[len(prefix)-1] != '.' {
prefix = prefix + "."
}
Expand All @@ -44,6 +47,7 @@ func NewGraphite(prefix, addr string, interval int, bufferSize int) {
prefix: []byte(prefix),
addr: addr,
toGraphite: make(chan []byte, bufferSize),
timeout: timeout,
}
go g.writer()
go g.reporter(interval)
Expand Down Expand Up @@ -80,31 +84,33 @@ func (g *Graphite) reporter(interval int) {
}

// writer connects to graphite and submits all pending data to it
// TODO: conn.Write() returns no error for a while when the remote endpoint is down, the reconnect happens with a delay. this can also cause lost data for a second or two.
func (g *Graphite) writer() {
var conn net.Conn
var err error
var wg sync.WaitGroup

assureConn := func() net.Conn {
assureConn := func() {
connected.Set(conn != nil)
for conn == nil {
time.Sleep(time.Second)
conn, err = net.Dial("tcp", g.addr)
if err == nil {
log.Info("stats now connected to %s", g.addr)
wg.Add(1)
go g.checkEOF(conn, &wg)
} else {
log.Warn("stats dialing %s failed: %s. will retry", g.addr, err.Error())
}
connected.Set(conn != nil)
}
return conn
}

for buf := range g.toGraphite {
queueItems.Value(len(g.toGraphite))
var ok bool
for !ok {
conn = assureConn()
assureConn()
conn.SetWriteDeadline(time.Now().Add(g.timeout))
pre := time.Now()
_, err = conn.Write(buf)
if err == nil {
Expand All @@ -113,8 +119,38 @@ func (g *Graphite) writer() {
} else {
log.Warn("stats failed to write to graphite: %s (took %s). will retry...", err, time.Now().Sub(pre))
conn.Close()
wg.Wait()
conn = nil
}
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think there's a race condition here. note how the write routine can set conn to nil, but checkEOF requires it to be non-nil.
particularly, the conn.Close() will activate the Read in checkEOF which will get an error, and try to call Close() on a pointer that can be nil.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

i think you're right... that means i'll need to put a lock around conn

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

that should do it: 113554f

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

looks good but why do we need the changes to how the conn variable is being set?

Copy link
Member

@woodsaj woodsaj May 28, 2018

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

having conn = assureConn() is redundent.

The assureConn func is working with the exact same conn that we are going to write to as they are all in the same scope.

}
}

// normally the remote end should never write anything back
// but we know when we get EOF that the other end closed the conn
// if not for this, we can happily write and flush without getting errors (in Go) but getting RST tcp packets back (!)
// props to Tv` for this trick.
func (g *Graphite) checkEOF(conn net.Conn, wg *sync.WaitGroup) {
defer wg.Done()
b := make([]byte, 1024)
for {
num, err := conn.Read(b)
if err == io.EOF {
log.Info("Graphite.checkEOF: remote closed conn. closing conn")
conn.Close()
return
}

// in case the remote behaves badly (out of spec for carbon protocol)
if num != 0 {
log.Warn("Graphite.checkEOF: read unexpected data from peer: %s\n", b[:num])
continue
}

if err != io.EOF {
log.Warn("Graphite.checkEOF: %s. closing conn\n", err)
conn.Close()
return
}
}
}