Skip to content

Commit

Permalink
Add support for retrying output writes, using independent threads
Browse files Browse the repository at this point in the history
Fixes #285
  • Loading branch information
sparrc committed Oct 21, 2015
1 parent ac685d1 commit 58cca8c
Show file tree
Hide file tree
Showing 2 changed files with 54 additions and 18 deletions.
70 changes: 52 additions & 18 deletions agent.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,9 @@ type Agent struct {
// Interval at which to flush data
FlushInterval Duration

// FlushRetries is the number of times to retry each data flush
FlushRetries int

// TODO(cam): Remove UTC and Precision parameters, they are no longer
// valid for the agent config. Leaving them here for now for backwards-
// compatability
Expand Down Expand Up @@ -61,6 +64,7 @@ func NewAgent(config *Config) (*Agent, error) {
Config: config,
Interval: Duration{10 * time.Second},
FlushInterval: Duration{10 * time.Second},
FlushRetries: 2,
UTC: true,
Precision: "s",
}
Expand Down Expand Up @@ -293,28 +297,56 @@ func (a *Agent) Test() error {
return nil
}

func (a *Agent) flush(points []*client.Point) {
var wg sync.WaitGroup

// writeOutput writes a list of points to a single output, with retries
func (a *Agent) writeOutput(
points []*client.Point,
ro *runningOutput,
shutdown chan struct{},
) {
retry := 0
retries := a.FlushRetries
start := time.Now()
counter := 0
for _, o := range a.outputs {
wg.Add(1)
counter++

go func(ro *runningOutput) {
defer wg.Done()
// Log all output errors:
if err := ro.output.Write(points); err != nil {
log.Printf("Error in output [%s]: %s", ro.name, err.Error())
for {
err := ro.output.Write(points)

select {
case <-shutdown:
return
default:
if err == nil {
// Write successful
elapsed := time.Since(start)
log.Printf("Flushed %d metrics to output %s in %s\n",
len(points), ro.name, elapsed)
return
} else if retry >= retries {
// No more retries
msg := "FATAL: Write to output [%s] failed %d times, dropping" +
" %d metrics\n"
log.Printf(msg, ro.name, retries+1, len(points))
return
} else if err != nil {
// Sleep for a retry
log.Printf("Error in output [%s]: %s, retrying in %s",
ro.name, err.Error(), a.FlushInterval.Duration)
time.Sleep(a.FlushInterval.Duration)
}
}(o)
}

retry++
}
}

wg.Wait()
elapsed := time.Since(start)
log.Printf("Flushed %d metrics to %d output sinks in %s\n",
len(points), counter, elapsed)
// flush writes a list of points to all configured outputs
func (a *Agent) flush(points []*client.Point, shutdown chan struct{}) {
if len(points) == 0 {
return
}

for _, o := range a.outputs {
go a.writeOutput(points, o, shutdown)
}
}

// flusher monitors the points input channel and flushes on the minimum interval
Expand All @@ -327,9 +359,11 @@ func (a *Agent) flusher(shutdown chan struct{}, pointChan chan *client.Point) er
for {
select {
case <-shutdown:
log.Println("Hang on, flushing any cached points before shutdown")
a.flush(points, shutdown)
return nil
case <-ticker.C:
a.flush(points)
a.flush(points, shutdown)
points = make([]*client.Point, 0)
case pt := <-pointChan:
points = append(points, pt)
Expand Down
2 changes: 2 additions & 0 deletions config.go
Original file line number Diff line number Diff line change
Expand Up @@ -357,6 +357,8 @@ var header = `# Telegraf configuration
interval = "10s"
# Default data flushing interval for all outputs
flush_interval = "10s"
# Number of times to retry each data flush
flush_retries = 2
# run telegraf in debug mode
debug = false
# Override default hostname, if empty use os.Hostname()
Expand Down

0 comments on commit 58cca8c

Please sign in to comment.