Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Collection interval random jittering #552

Merged
merged 1 commit into from
Jan 19, 2016
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
- [#522](https://github.com/influxdata/telegraf/pull/522): Phusion passenger input plugin. Thanks @kureikain!
- [#541](https://github.com/influxdata/telegraf/pull/541): Kafka output TLS cert support. Thanks @Ormod!
- [#551](https://github.com/influxdb/telegraf/pull/551): Statsd UDP read packet size now defaults to 1500 bytes, and is configurable.
- [#552](https://github.com/influxdata/telegraf/pull/552): Support for collection interval jittering.

### Bugfixes
- [#506](https://github.com/influxdb/telegraf/pull/506): Ping input doesn't return response time metric when timeout. Thanks @titilambert!
Expand Down
19 changes: 15 additions & 4 deletions agent.go
Original file line number Diff line number Diff line change
@@ -1,10 +1,11 @@
package telegraf

import (
"crypto/rand"
cryptorand "crypto/rand"
"fmt"
"log"
"math/big"
"math/rand"
"os"
"sync"
"time"
Expand Down Expand Up @@ -92,6 +93,7 @@ func (a *Agent) gatherParallel(pointChan chan *client.Point) error {

start := time.Now()
counter := 0
jitter := a.Config.Agent.CollectionJitter.Duration.Nanoseconds()
for _, input := range a.Config.Inputs {
if input.Config.Interval != 0 {
continue
Expand All @@ -104,9 +106,19 @@ func (a *Agent) gatherParallel(pointChan chan *client.Point) error {

acc := NewAccumulator(input.Config, pointChan)
acc.SetDebug(a.Config.Agent.Debug)
// acc.SetPrefix(input.Name + "_")
acc.SetDefaultTags(a.Config.Tags)

if jitter != 0 {
nanoSleep := rand.Int63n(jitter)
d, err := time.ParseDuration(fmt.Sprintf("%dns", nanoSleep))
if err != nil {
log.Printf("Jittering collection interval failed for plugin %s",
input.Name)
} else {
time.Sleep(d)
}
}

if err := input.Input.Gather(acc); err != nil {
log.Printf("Error in input [%s]: %s", input.Name, err)
}
Expand Down Expand Up @@ -143,7 +155,6 @@ func (a *Agent) gatherSeparate(

acc := NewAccumulator(input.Config, pointChan)
acc.SetDebug(a.Config.Agent.Debug)
// acc.SetPrefix(input.Name + "_")
acc.SetDefaultTags(a.Config.Tags)

if err := input.Input.Gather(acc); err != nil {
Expand Down Expand Up @@ -315,7 +326,7 @@ func jitterInterval(ininterval, injitter time.Duration) time.Duration {
outinterval := ininterval
if injitter.Nanoseconds() != 0 {
maxjitter := big.NewInt(injitter.Nanoseconds())
if j, err := rand.Int(rand.Reader, maxjitter); err == nil {
if j, err := cryptorand.Int(cryptorand.Reader, maxjitter); err == nil {
jitter = j.Int64()
}
outinterval = time.Duration(jitter + ininterval.Nanoseconds())
Expand Down
16 changes: 15 additions & 1 deletion internal/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,13 +61,22 @@ type AgentConfig struct {
// ie, if Interval=10s then always collect on :00, :10, :20, etc.
RoundInterval bool

// CollectionJitter is used to jitter the collection by a random amount.
// Each plugin will sleep for a random time within jitter before collecting.
// This can be used to avoid many plugins querying things like sysfs at the
// same time, which can have a measurable effect on the system.
CollectionJitter internal.Duration

// Interval at which to flush data
FlushInterval internal.Duration

// FlushRetries is the number of times to retry each data flush
FlushRetries int

// FlushJitter tells
// FlushJitter Jitters the flush interval by a random amount.
// This is primarily to avoid large write spikes for users running a large
// number of telegraf instances.
// ie, a jitter of 5s and interval 10s means flushes will happen every 10-15s
FlushJitter internal.Duration

// TODO(cam): Remove UTC and Precision parameters, they are no longer
Expand Down Expand Up @@ -271,6 +280,11 @@ var header = `# Telegraf configuration
# Rounds collection interval to 'interval'
# ie, if interval="10s" then always collect on :00, :10, :20, etc.
round_interval = true
# Collection jitter is used to jitter the collection by a random amount.
# Each plugin will sleep for a random time within jitter before collecting.
# This can be used to avoid many plugins querying things like sysfs at the
# same time, which can have a measurable effect on the system.
collection_jitter = "0s"

# Default data flushing interval for all outputs. You should not set this below
# interval. Maximum flush_interval will be flush_interval + flush_jitter
Expand Down