Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Flush based on buffer size rather than time #699

Merged
merged 1 commit into from
Feb 17, 2016
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
20 changes: 12 additions & 8 deletions agent/agent.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,8 @@ func (a *Agent) Connect() error {
}
err := o.Output.Connect()
if err != nil {
log.Printf("Failed to connect to output %s, retrying in 15s, error was '%s' \n", o.Name, err)
log.Printf("Failed to connect to output %s, retrying in 15s, "+
"error was '%s' \n", o.Name, err)
time.Sleep(15 * time.Second)
err = o.Output.Connect()
if err != nil {
Expand Down Expand Up @@ -241,7 +242,7 @@ func (a *Agent) Test() error {
return nil
}

// flush writes a list of points to all configured outputs
// flush writes a list of metrics to all configured outputs
func (a *Agent) flush() {
var wg sync.WaitGroup

Expand All @@ -260,7 +261,7 @@ func (a *Agent) flush() {
wg.Wait()
}

// flusher monitors the points input channel and flushes on the minimum interval
// flusher monitors the metrics input channel and flushes on the minimum interval
func (a *Agent) flusher(shutdown chan struct{}, metricC chan telegraf.Metric) error {
// Inelegant, but this sleep is to allow the Gather threads to run, so that
// the flusher will flush after metrics are collected.
Expand All @@ -271,14 +272,14 @@ func (a *Agent) flusher(shutdown chan struct{}, metricC chan telegraf.Metric) er
for {
select {
case <-shutdown:
log.Println("Hang on, flushing any cached points before shutdown")
log.Println("Hang on, flushing any cached metrics before shutdown")
a.flush()
return nil
case <-ticker.C:
a.flush()
case m := <-metricC:
for _, o := range a.Config.Outputs {
o.AddPoint(m)
o.AddMetric(m)
}
}
}
Expand Down Expand Up @@ -318,8 +319,8 @@ func (a *Agent) Run(shutdown chan struct{}) error {
a.Config.Agent.Interval.Duration, a.Config.Agent.Debug, a.Config.Agent.Quiet,
a.Config.Agent.Hostname, a.Config.Agent.FlushInterval.Duration)

// channel shared between all input threads for accumulating points
metricC := make(chan telegraf.Metric, 1000)
// channel shared between all input threads for accumulating metrics
metricC := make(chan telegraf.Metric, 10000)

// Round collection to nearest interval by sleeping
if a.Config.Agent.RoundInterval {
Expand All @@ -342,7 +343,10 @@ func (a *Agent) Run(shutdown chan struct{}) error {
// Start service of any ServicePlugins
switch p := input.Input.(type) {
case telegraf.ServiceInput:
if err := p.Start(); err != nil {
acc := NewAccumulator(input.Config, metricC)
acc.SetDebug(a.Config.Agent.Debug)
acc.setDefaultTags(a.Config.Tags)
if err := p.Start(acc); err != nil {
log.Printf("Service for input %s failed to start, exiting\n%s\n",
input.Name, err.Error())
return err
Expand Down
34 changes: 24 additions & 10 deletions etc/telegraf.conf
Original file line number Diff line number Diff line change
Expand Up @@ -16,23 +16,37 @@

# Configuration for telegraf agent
[agent]
# Default data collection interval for all plugins
### Default data collection interval for all inputs
interval = "10s"
# Rounds collection interval to 'interval'
# ie, if interval="10s" then always collect on :00, :10, :20, etc.
### Rounds collection interval to 'interval'
### ie, if interval="10s" then always collect on :00, :10, :20, etc.
round_interval = true

# Default data flushing interval for all outputs. You should not set this below
# interval. Maximum flush_interval will be flush_interval + flush_jitter
### Telegraf will cache metric_buffer_limit metrics for each output, and will
### flush this buffer on a successful write.
metric_buffer_limit = 10000
### Flush the buffer whenever full, regardless of flush_interval.
flush_buffer_when_full = true

### Collection jitter is used to jitter the collection by a random amount.
### Each plugin will sleep for a random time within jitter before collecting.
### This can be used to avoid many plugins querying things like sysfs at the
### same time, which can have a measurable effect on the system.
collection_jitter = "0s"

### Default flushing interval for all outputs. You shouldn't set this below
### interval. Maximum flush_interval will be flush_interval + flush_jitter
flush_interval = "10s"
# Jitter the flush interval by a random amount. This is primarily to avoid
# large write spikes for users running a large number of telegraf instances.
# ie, a jitter of 5s and interval 10s means flushes will happen every 10-15s
### Jitter the flush interval by a random amount. This is primarily to avoid
### large write spikes for users running a large number of telegraf instances.
### ie, a jitter of 5s and interval 10s means flushes will happen every 10-15s
flush_jitter = "0s"

# Run telegraf in debug mode
### Run telegraf in debug mode
debug = false
# Override default hostname, if empty use os.Hostname()
### Run telegraf in quiet mode
quiet = false
### Override default hostname, if empty use os.Hostname()
hostname = ""


Expand Down
2 changes: 1 addition & 1 deletion input.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ type ServiceInput interface {
Gather(Accumulator) error

// Start starts the ServiceInput's service, whatever that may be
Start() error
Start(Accumulator) error

// Stop stops the services and closes any necessary channels and connections
Stop()
Expand Down
12 changes: 10 additions & 2 deletions internal/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,7 @@ type AgentConfig struct {
// same time, which can have a measurable effect on the system.
CollectionJitter internal.Duration

// Interval at which to flush data
// FlushInterval is the Interval at which to flush data
FlushInterval internal.Duration

// FlushJitter Jitters the flush interval by a random amount.
Expand All @@ -82,6 +82,11 @@ type AgentConfig struct {
// full, the oldest metrics will be overwritten.
MetricBufferLimit int

// FlushBufferWhenFull tells Telegraf to flush the metric buffer whenever
// it fills up, regardless of FlushInterval. Setting this option to true
// does _not_ deactivate FlushInterval.
FlushBufferWhenFull bool

// TODO(cam): Remove UTC and Precision parameters, they are no longer
// valid for the agent config. Leaving them here for now for backwards-
// compatability
Expand Down Expand Up @@ -157,6 +162,8 @@ var header = `##################################################################
### Telegraf will cache metric_buffer_limit metrics for each output, and will
### flush this buffer on a successful write.
metric_buffer_limit = 10000
### Flush the buffer whenever full, regardless of flush_interval.
flush_buffer_when_full = true

### Collection jitter is used to jitter the collection by a random amount.
### Each plugin will sleep for a random time within jitter before collecting.
Expand Down Expand Up @@ -421,8 +428,9 @@ func (c *Config) addOutput(name string, table *ast.Table) error {

ro := internal_models.NewRunningOutput(name, output, outputConfig)
if c.Agent.MetricBufferLimit > 0 {
ro.PointBufferLimit = c.Agent.MetricBufferLimit
ro.MetricBufferLimit = c.Agent.MetricBufferLimit
}
ro.FlushBufferWhenFull = c.Agent.FlushBufferWhenFull
ro.Quiet = c.Agent.Quiet
c.Outputs = append(c.Outputs, ro)
return nil
Expand Down
115 changes: 87 additions & 28 deletions internal/models/running_output.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,22 +2,34 @@ package internal_models

import (
"log"
"sync"
"time"

"github.com/influxdata/telegraf"
)

const DEFAULT_POINT_BUFFER_LIMIT = 10000
const (
// Default number of metrics kept between flushes.
DEFAULT_METRIC_BUFFER_LIMIT = 10000

// Limit how many full metric buffers are kept due to failed writes.
FULL_METRIC_BUFFERS_LIMIT = 100
)

type RunningOutput struct {
Name string
Output telegraf.Output
Config *OutputConfig
Quiet bool
PointBufferLimit int
Name string
Output telegraf.Output
Config *OutputConfig
Quiet bool
MetricBufferLimit int
FlushBufferWhenFull bool

metrics []telegraf.Metric
overwriteCounter int
metrics []telegraf.Metric
tmpmetrics map[int][]telegraf.Metric
overwriteI int
mapI int

sync.Mutex
}

func NewRunningOutput(
Expand All @@ -26,47 +38,94 @@ func NewRunningOutput(
conf *OutputConfig,
) *RunningOutput {
ro := &RunningOutput{
Name: name,
metrics: make([]telegraf.Metric, 0),
Output: output,
Config: conf,
PointBufferLimit: DEFAULT_POINT_BUFFER_LIMIT,
Name: name,
metrics: make([]telegraf.Metric, 0),
tmpmetrics: make(map[int][]telegraf.Metric),
Output: output,
Config: conf,
MetricBufferLimit: DEFAULT_METRIC_BUFFER_LIMIT,
}
return ro
}

func (ro *RunningOutput) AddPoint(point telegraf.Metric) {
// AddMetric adds a metric to the output. This function can also write cached
// points if FlushBufferWhenFull is true.
func (ro *RunningOutput) AddMetric(metric telegraf.Metric) {
if ro.Config.Filter.IsActive {
if !ro.Config.Filter.ShouldMetricPass(point) {
if !ro.Config.Filter.ShouldMetricPass(metric) {
return
}
}
ro.Lock()
defer ro.Unlock()

if len(ro.metrics) < ro.PointBufferLimit {
ro.metrics = append(ro.metrics, point)
if len(ro.metrics) < ro.MetricBufferLimit {
ro.metrics = append(ro.metrics, metric)
} else {
log.Printf("WARNING: overwriting cached metrics, you may want to " +
"increase the metric_buffer_limit setting in your [agent] config " +
"if you do not wish to overwrite metrics.\n")
if ro.overwriteCounter == len(ro.metrics) {
ro.overwriteCounter = 0
if ro.FlushBufferWhenFull {
tmpmetrics := make([]telegraf.Metric, len(ro.metrics))
copy(tmpmetrics, ro.metrics)
ro.metrics = make([]telegraf.Metric, 0)
err := ro.write(tmpmetrics)
if err != nil {
log.Printf("ERROR writing full metric buffer to output %s, %s",
ro.Name, err)
if len(ro.tmpmetrics) == FULL_METRIC_BUFFERS_LIMIT {
ro.mapI = 0
// overwrite one
ro.tmpmetrics[ro.mapI] = tmpmetrics
ro.mapI++
} else {
ro.tmpmetrics[ro.mapI] = tmpmetrics
ro.mapI++
}
}
} else {
log.Printf("WARNING: overwriting cached metrics, you may want to " +
"increase the metric_buffer_limit setting in your [agent] " +
"config if you do not wish to overwrite metrics.\n")
if ro.overwriteI == len(ro.metrics) {
ro.overwriteI = 0
}
ro.metrics[ro.overwriteI] = metric
ro.overwriteI++
}
ro.metrics[ro.overwriteCounter] = point
ro.overwriteCounter++
}
}

// Write writes all cached points to this output.
func (ro *RunningOutput) Write() error {
ro.Lock()
defer ro.Unlock()
err := ro.write(ro.metrics)
if err != nil {
return err
} else {
ro.metrics = make([]telegraf.Metric, 0)
ro.overwriteI = 0
}

// Write any cached metric buffers that failed previously
for i, tmpmetrics := range ro.tmpmetrics {
if err := ro.write(tmpmetrics); err != nil {
return err
} else {
delete(ro.tmpmetrics, i)
}
}

return nil
}

func (ro *RunningOutput) write(metrics []telegraf.Metric) error {
start := time.Now()
err := ro.Output.Write(ro.metrics)
err := ro.Output.Write(metrics)
elapsed := time.Since(start)
if err == nil {
if !ro.Quiet {
log.Printf("Wrote %d metrics to output %s in %s\n",
len(ro.metrics), ro.Name, elapsed)
len(metrics), ro.Name, elapsed)
}
ro.metrics = make([]telegraf.Metric, 0)
ro.overwriteCounter = 0
}
return err
}
Expand Down
2 changes: 1 addition & 1 deletion plugins/inputs/github_webhooks/github_webhooks.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ func (gh *GithubWebhooks) Listen() {
}
}

func (gh *GithubWebhooks) Start() error {
func (gh *GithubWebhooks) Start(_ telegraf.Accumulator) error {
go gh.Listen()
log.Printf("Started the github_webhooks service on %s\n", gh.ServiceAddress)
return nil
Expand Down
Loading