Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Refactor running_output buffering #1087

Closed
wants to merge 1 commit into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions docs/CONFIGURATION.md
Original file line number Diff line number Diff line change
Expand Up @@ -38,8 +38,12 @@ config.
* **interval**: Default data collection interval for all inputs
* **round_interval**: Rounds collection interval to 'interval'
ie, if interval="10s" then always collect on :00, :10, :20, etc.
* **metric_batch_size**: Telegraf will send metrics to output in batch of at
most metric_batch_size metrics.
* **metric_buffer_limit**: Telegraf will cache metric_buffer_limit metrics
for each output, and will flush this buffer on a successful write.
This should be a multiple of metric_batch_size and could not be less
than 2 times metric_batch_size.
* **collection_jitter**: Collection jitter is used to jitter
the collection by a random amount.
Each plugin will sleep for a random time within jitter before collecting.
Expand Down
8 changes: 6 additions & 2 deletions etc/telegraf.conf
Original file line number Diff line number Diff line change
Expand Up @@ -30,9 +30,13 @@
## ie, if interval="10s" then always collect on :00, :10, :20, etc.
round_interval = true

## Telegraf will send metrics to output in batch of at
## most metric_batch_size metrics.
metric_batch_size = 1000
## Telegraf will cache metric_buffer_limit metrics for each output, and will
## flush this buffer on a successful write.
metric_buffer_limit = 1000
## flush this buffer on a successful write. This should be a multiple of
## metric_batch_size and could not be less than 2 times metric_batch_size
metric_buffer_limit = 10000
## Flush the buffer whenever full, regardless of flush_interval.
flush_buffer_when_full = true

Expand Down
19 changes: 16 additions & 3 deletions internal/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -93,9 +93,15 @@ type AgentConfig struct {
// ie, a jitter of 5s and interval 10s means flushes will happen every 10-15s
FlushJitter internal.Duration

// MetricBatchSize is the maximum number of metrics that is wrote to an
// output plugin in one call.
MetricBatchSize int

// MetricBufferLimit is the max number of metrics that each output plugin
// will cache. The buffer is cleared when a successful write occurs. When
// full, the oldest metrics will be overwritten.
// full, the oldest metrics will be overwritten. This number should be a
// multiple of MetricBatchSize. Due to current implementation, this could
// not be less than 2 times MetricBatchSize.
MetricBufferLimit int

// FlushBufferWhenFull tells Telegraf to flush the metric buffer whenever
Expand Down Expand Up @@ -182,9 +188,13 @@ var header = `# Telegraf Configuration
## ie, if interval="10s" then always collect on :00, :10, :20, etc.
round_interval = true

## Telegraf will send metrics to output in batch of at
## most metric_batch_size metrics.
metric_batch_size = 1000
## Telegraf will cache metric_buffer_limit metrics for each output, and will
## flush this buffer on a successful write.
metric_buffer_limit = 1000
## flush this buffer on a successful write. This should be a multiple of
## metric_batch_size and could not be less than 2 times metric_batch_size
metric_buffer_limit = 10000
## Flush the buffer whenever full, regardless of flush_interval.
flush_buffer_when_full = true

Expand Down Expand Up @@ -526,6 +536,9 @@ func (c *Config) addOutput(name string, table *ast.Table) error {
}

ro := internal_models.NewRunningOutput(name, output, outputConfig)
if c.Agent.MetricBatchSize > 0 {
ro.MetricBatchSize = c.Agent.MetricBatchSize
}
if c.Agent.MetricBufferLimit > 0 {
ro.MetricBufferLimit = c.Agent.MetricBufferLimit
}
Expand Down
90 changes: 51 additions & 39 deletions internal/models/running_output.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,25 +9,32 @@ import (
)

const (
// Default number of metrics kept between flushes.
DEFAULT_METRIC_BUFFER_LIMIT = 1000

// Limit how many full metric buffers are kept due to failed writes.
FULL_METRIC_BUFFERS_LIMIT = 100
// Default size of metrics batch size.
DEFAULT_METRIC_BATCH_SIZE = 1000

// Default number of metrics kept. It should be a multiple of batch size.
DEFAULT_METRIC_BUFFER_LIMIT = 10000
)

// tmpmetrics point to batch of metrics ready to be wrote to output.
// readI point to the oldest batch of metrics (the first to sent to output). It
// may point to nil value if tmpmetrics is empty.
// writeI point to the next slot to buffer a batch of metrics is output fail to
// write.
type RunningOutput struct {
Name string
Output telegraf.Output
Config *OutputConfig
Quiet bool
MetricBufferLimit int
MetricBatchSize int
FlushBufferWhenFull bool

metrics []telegraf.Metric
tmpmetrics map[int][]telegraf.Metric
overwriteI int
mapI int
tmpmetrics []([]telegraf.Metric)
writeI int
readI int

sync.Mutex
}
Expand All @@ -40,10 +47,10 @@ func NewRunningOutput(
ro := &RunningOutput{
Name: name,
metrics: make([]telegraf.Metric, 0),
tmpmetrics: make(map[int][]telegraf.Metric),
Output: output,
Config: conf,
MetricBufferLimit: DEFAULT_METRIC_BUFFER_LIMIT,
MetricBatchSize: DEFAULT_METRIC_BATCH_SIZE,
}
return ro
}
Expand All @@ -59,6 +66,17 @@ func (ro *RunningOutput) AddMetric(metric telegraf.Metric) {
ro.Lock()
defer ro.Unlock()

if ro.tmpmetrics == nil {
size := ro.MetricBufferLimit / ro.MetricBatchSize
// ro.metrics already contains one batch
size = size - 1

if size < 1 {
size = 1
}
ro.tmpmetrics = make([]([]telegraf.Metric), size)
}

// Filter any tagexclude/taginclude parameters before adding metric
if len(ro.Config.Filter.TagExclude) != 0 || len(ro.Config.Filter.TagInclude) != 0 {
// In order to filter out tags, we need to create a new metric, since
Expand All @@ -72,62 +90,56 @@ func (ro *RunningOutput) AddMetric(metric telegraf.Metric) {
metric, _ = telegraf.NewMetric(name, tags, fields, t)
}

if len(ro.metrics) < ro.MetricBufferLimit {
if len(ro.metrics) < ro.MetricBatchSize {
ro.metrics = append(ro.metrics, metric)
} else {
flushSuccess := true
if ro.FlushBufferWhenFull {
ro.metrics = append(ro.metrics, metric)
tmpmetrics := make([]telegraf.Metric, len(ro.metrics))
copy(tmpmetrics, ro.metrics)
ro.metrics = make([]telegraf.Metric, 0)
err := ro.write(tmpmetrics)
err := ro.write(ro.metrics)
if err != nil {
log.Printf("ERROR writing full metric buffer to output %s, %s",
ro.Name, err)
if len(ro.tmpmetrics) == FULL_METRIC_BUFFERS_LIMIT {
ro.mapI = 0
// overwrite one
ro.tmpmetrics[ro.mapI] = tmpmetrics
ro.mapI++
} else {
ro.tmpmetrics[ro.mapI] = tmpmetrics
ro.mapI++
}
flushSuccess = false
}
} else {
if ro.overwriteI == 0 {
flushSuccess = false
}
if !flushSuccess {
if ro.tmpmetrics[ro.writeI] != nil && ro.writeI == ro.readI {
log.Printf("WARNING: overwriting cached metrics, you may want to " +
"increase the metric_buffer_limit setting in your [agent] " +
"config if you do not wish to overwrite metrics.\n")
ro.readI = (ro.readI + 1) % cap(ro.tmpmetrics)
}
if ro.overwriteI == len(ro.metrics) {
ro.overwriteI = 0
}
ro.metrics[ro.overwriteI] = metric
ro.overwriteI++
ro.tmpmetrics[ro.writeI] = ro.metrics
ro.writeI = (ro.writeI + 1) % cap(ro.tmpmetrics)
}
ro.metrics = make([]telegraf.Metric, 0)
ro.metrics = append(ro.metrics, metric)
}
}

// Write writes all cached points to this output.
func (ro *RunningOutput) Write() error {
ro.Lock()
defer ro.Unlock()

// Write any cached metric buffers before, as those metrics are the
// oldest
for ro.tmpmetrics[ro.readI] != nil {
if err := ro.write(ro.tmpmetrics[ro.readI]); err != nil {
return err
} else {
ro.tmpmetrics[ro.readI] = nil
ro.readI = (ro.readI + 1) % cap(ro.tmpmetrics)
}
}

err := ro.write(ro.metrics)
if err != nil {
return err
} else {
ro.metrics = make([]telegraf.Metric, 0)
ro.overwriteI = 0
}

// Write any cached metric buffers that failed previously
for i, tmpmetrics := range ro.tmpmetrics {
if err := ro.write(tmpmetrics); err != nil {
return err
} else {
delete(ro.tmpmetrics, i)
}
}

return nil
Expand Down
17 changes: 11 additions & 6 deletions internal/models/running_output_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -193,7 +193,7 @@ func TestRunningOutputDefault(t *testing.T) {
assert.Len(t, m.Metrics(), 10)
}

// Test that the first metric gets overwritten if there is a buffer overflow.
// Test that the first metrics batch gets overwritten if there is a buffer overflow.
func TestRunningOutputOverwrite(t *testing.T) {
conf := &OutputConfig{
Filter: Filter{
Expand All @@ -203,6 +203,7 @@ func TestRunningOutputOverwrite(t *testing.T) {

m := &mockOutput{}
ro := NewRunningOutput("test", m, conf)
ro.MetricBatchSize = 1
ro.MetricBufferLimit = 4

for _, metric := range first5 {
Expand Down Expand Up @@ -236,6 +237,7 @@ func TestRunningOutputMultiOverwrite(t *testing.T) {

m := &mockOutput{}
ro := NewRunningOutput("test", m, conf)
ro.MetricBatchSize = 1
ro.MetricBufferLimit = 3

for _, metric := range first5 {
Expand Down Expand Up @@ -274,7 +276,8 @@ func TestRunningOutputFlushWhenFull(t *testing.T) {
m := &mockOutput{}
ro := NewRunningOutput("test", m, conf)
ro.FlushBufferWhenFull = true
ro.MetricBufferLimit = 5
ro.MetricBatchSize = 5
ro.MetricBufferLimit = 10

// Fill buffer to limit
for _, metric := range first5 {
Expand All @@ -286,7 +289,7 @@ func TestRunningOutputFlushWhenFull(t *testing.T) {
// add one more metric
ro.AddMetric(next5[0])
// now it flushed
assert.Len(t, m.Metrics(), 6)
assert.Len(t, m.Metrics(), 5)

// add one more metric and write it manually
ro.AddMetric(next5[1])
Expand All @@ -307,7 +310,8 @@ func TestRunningOutputMultiFlushWhenFull(t *testing.T) {
m := &mockOutput{}
ro := NewRunningOutput("test", m, conf)
ro.FlushBufferWhenFull = true
ro.MetricBufferLimit = 4
ro.MetricBatchSize = 4
ro.MetricBufferLimit = 12

// Fill buffer past limit twive
for _, metric := range first5 {
Expand All @@ -317,7 +321,7 @@ func TestRunningOutputMultiFlushWhenFull(t *testing.T) {
ro.AddMetric(metric)
}
// flushed twice
assert.Len(t, m.Metrics(), 10)
assert.Len(t, m.Metrics(), 8)
}

func TestRunningOutputWriteFail(t *testing.T) {
Expand All @@ -331,7 +335,8 @@ func TestRunningOutputWriteFail(t *testing.T) {
m.failWrite = true
ro := NewRunningOutput("test", m, conf)
ro.FlushBufferWhenFull = true
ro.MetricBufferLimit = 4
ro.MetricBatchSize = 4
ro.MetricBufferLimit = 12

// Fill buffer past limit twice
for _, metric := range first5 {
Expand Down