diff --git a/CHANGELOG.md b/CHANGELOG.md index 77f5a082d2311..9319c775e7ecc 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -13,6 +13,7 @@ - [#522](https://github.com/influxdata/telegraf/pull/522): Phusion passenger input plugin. Thanks @kureikain! - [#541](https://github.com/influxdata/telegraf/pull/541): Kafka output TLS cert support. Thanks @Ormod! - [#551](https://github.com/influxdb/telegraf/pull/551): Statsd UDP read packet size now defaults to 1500 bytes, and is configurable. +- [#552](https://github.com/influxdata/telegraf/pull/552): Support for collection interval jittering. ### Bugfixes - [#506](https://github.com/influxdb/telegraf/pull/506): Ping input doesn't return response time metric when timeout. Thanks @titilambert! diff --git a/agent.go b/agent.go index 25fd46462be41..25b157c544358 100644 --- a/agent.go +++ b/agent.go @@ -1,10 +1,11 @@ package telegraf import ( - "crypto/rand" + cryptorand "crypto/rand" "fmt" "log" "math/big" + "math/rand" "os" "sync" "time" @@ -92,6 +93,7 @@ func (a *Agent) gatherParallel(pointChan chan *client.Point) error { start := time.Now() counter := 0 + jitter := a.Config.Agent.CollectionJitter.Duration.Nanoseconds() for _, input := range a.Config.Inputs { if input.Config.Interval != 0 { continue @@ -104,9 +106,19 @@ func (a *Agent) gatherParallel(pointChan chan *client.Point) error { acc := NewAccumulator(input.Config, pointChan) acc.SetDebug(a.Config.Agent.Debug) - // acc.SetPrefix(input.Name + "_") acc.SetDefaultTags(a.Config.Tags) + if jitter != 0 { + nanoSleep := rand.Int63n(jitter) + d, err := time.ParseDuration(fmt.Sprintf("%dns", nanoSleep)) + if err != nil { + log.Printf("Jittering collection interval failed for plugin %s", + input.Name) + } else { + time.Sleep(d) + } + } + if err := input.Input.Gather(acc); err != nil { log.Printf("Error in input [%s]: %s", input.Name, err) } @@ -143,7 +155,6 @@ func (a *Agent) gatherSeparate( acc := NewAccumulator(input.Config, pointChan) acc.SetDebug(a.Config.Agent.Debug) - // acc.SetPrefix(input.Name + "_") acc.SetDefaultTags(a.Config.Tags) if err := input.Input.Gather(acc); err != nil { @@ -315,7 +326,7 @@ func jitterInterval(ininterval, injitter time.Duration) time.Duration { outinterval := ininterval if injitter.Nanoseconds() != 0 { maxjitter := big.NewInt(injitter.Nanoseconds()) - if j, err := rand.Int(rand.Reader, maxjitter); err == nil { + if j, err := cryptorand.Int(cryptorand.Reader, maxjitter); err == nil { jitter = j.Int64() } outinterval = time.Duration(jitter + ininterval.Nanoseconds()) diff --git a/internal/config/config.go b/internal/config/config.go index ca4972b69793f..e4d7fbc9df4a6 100644 --- a/internal/config/config.go +++ b/internal/config/config.go @@ -61,13 +61,22 @@ type AgentConfig struct { // ie, if Interval=10s then always collect on :00, :10, :20, etc. RoundInterval bool + // CollectionJitter is used to jitter the collection by a random amount. + // Each plugin will sleep for a random time within jitter before collecting. + // This can be used to avoid many plugins querying things like sysfs at the + // same time, which can have a measurable effect on the system. + CollectionJitter internal.Duration + // Interval at which to flush data FlushInterval internal.Duration // FlushRetries is the number of times to retry each data flush FlushRetries int - // FlushJitter tells + // FlushJitter Jitters the flush interval by a random amount. + // This is primarily to avoid large write spikes for users running a large + // number of telegraf instances. + // ie, a jitter of 5s and interval 10s means flushes will happen every 10-15s FlushJitter internal.Duration // TODO(cam): Remove UTC and Precision parameters, they are no longer @@ -271,6 +280,11 @@ var header = `# Telegraf configuration # Rounds collection interval to 'interval' # ie, if interval="10s" then always collect on :00, :10, :20, etc. round_interval = true + # Collection jitter is used to jitter the collection by a random amount. + # Each plugin will sleep for a random time within jitter before collecting. + # This can be used to avoid many plugins querying things like sysfs at the + # same time, which can have a measurable effect on the system. + collection_jitter = "0s" # Default data flushing interval for all outputs. You should not set this below # interval. Maximum flush_interval will be flush_interval + flush_jitter