Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix CloudWatch.publish() sometimes not jittering. #501

Merged
merged 16 commits into from
Jul 7, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 8 additions & 0 deletions plugins/outputs/cloudwatch/aggregator_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,8 @@ func TestAggregator_NoAggregationKeyFound(t *testing.T) {

assertNoMetricsInChan(t, metricChan)
close(shutdownChan)
// Cleanup
wg.Wait()
adam-mateen marked this conversation as resolved.
Show resolved Hide resolved
}

func TestAggregator_NotDurationType(t *testing.T) {
Expand All @@ -60,6 +62,8 @@ func TestAggregator_NotDurationType(t *testing.T) {

assertNoMetricsInChan(t, metricChan)
close(shutdownChan)
// Cleanup
wg.Wait()
adam-mateen marked this conversation as resolved.
Show resolved Hide resolved
}

func TestAggregator_ProperAggregationKey(t *testing.T) {
Expand All @@ -78,6 +82,8 @@ func TestAggregator_ProperAggregationKey(t *testing.T) {

assertNoMetricsInChan(t, metricChan)
close(shutdownChan)
// Cleanup
wg.Wait()
}

func TestAggregator_MultipleAggregationPeriods(t *testing.T) {
Expand Down Expand Up @@ -121,6 +127,8 @@ func TestAggregator_MultipleAggregationPeriods(t *testing.T) {

assertNoMetricsInChan(t, metricChan)
close(shutdownChan)
// Cleanup
wg.Wait()
}

func TestAggregator_ShutdownBehavior(t *testing.T) {
Expand Down
114 changes: 77 additions & 37 deletions plugins/outputs/cloudwatch/cloudwatch.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@ package cloudwatch

import (
"log"
"math"
"reflect"
"runtime"
"sort"
Expand Down Expand Up @@ -39,7 +38,7 @@ const (
pushIntervalInSec = 60 // 60 sec
highResolutionTagKey = "aws:StorageResolution"
defaultRetryCount = 5 // this is the retry count, the total attempts would be retry count + 1 at most.
backoffRetryBase = 200
backoffRetryBase = 200 * time.Millisecond
MaxDimensions = 30
)

Expand Down Expand Up @@ -74,7 +73,6 @@ type CloudWatch struct {
aggregatorWaitGroup sync.WaitGroup
metricChan chan telegraf.Metric
datumBatchChan chan []*cloudwatch.MetricDatum
datumBatchFullChan chan bool
metricDatumBatch *MetricDatumBatch
shutdownChan chan struct{}
pushTicker *time.Ticker
Expand Down Expand Up @@ -166,7 +164,6 @@ func (c *CloudWatch) Connect() error {
func (c *CloudWatch) startRoutines() {
c.metricChan = make(chan telegraf.Metric, metricChanBufferSize)
c.datumBatchChan = make(chan []*cloudwatch.MetricDatum, datumBatchChanBufferSize)
c.datumBatchFullChan = make(chan bool, 1)
c.shutdownChan = make(chan struct{})
c.aggregatorShutdownChan = make(chan struct{})
c.aggregator = NewAggregator(c.metricChan, c.aggregatorShutdownChan, &c.aggregatorWaitGroup)
Expand Down Expand Up @@ -279,45 +276,85 @@ func (c *CloudWatch) timeToPublish(b *MetricDatumBatch) bool {
return len(b.Partition) > 0 && time.Now().Sub(b.BeginTime) >= c.ForceFlushInterval.Duration
}

// getFirstPushMs returns the time at which the first upload should occur.
// It uses random jitter as an offset from the start of the given interval.
func getFirstPushMs(interval time.Duration) int64 {
publishJitter := publishJitter(interval)
log.Printf("I! cloudwatch: publish with ForceFlushInterval: %v, Publish Jitter: %v",
interval, publishJitter)
nowMs := time.Now().UnixMilli()
// Truncate i.e. round down, then add jitter.
// If the rounded down time is in the past, move it forward.
nextMs := nowMs - (nowMs % interval.Milliseconds()) + publishJitter.Milliseconds()
khanhntd marked this conversation as resolved.
Show resolved Hide resolved
if nextMs < nowMs {
nextMs += interval.Milliseconds()
}
return nextMs
}

// publish loops until a shutdown occurs.
// It periodically tries pushing batches of metrics (if there are any).
// If thet batch buffer fills up the interval will be gradually reduced to avoid
// many agents bursting the backend.
func (c *CloudWatch) publish() {
now := time.Now()
forceFlushInterval := c.ForceFlushInterval.Duration
publishJitter := publishJitter(forceFlushInterval)
log.Printf("I! cloudwatch: publish with ForceFlushInterval: %v, Publish Jitter: %v", forceFlushInterval, publishJitter)
time.Sleep(now.Truncate(forceFlushInterval).Add(publishJitter).Sub(now))
adam-mateen marked this conversation as resolved.
Show resolved Hide resolved
c.pushTicker = time.NewTicker(c.ForceFlushInterval.Duration)
defer c.pushTicker.Stop()
shouldPublish := false
currentInterval := c.ForceFlushInterval.Duration
nextMs := getFirstPushMs(currentInterval)
bufferFullOccurred := false

for {
shouldPublish := false
select {
case <-c.shutdownChan:
log.Printf("D! CloudWatch: publish routine receives the shutdown signal, exiting.")
log.Printf("D! cloudwatch: publish routine receives the shutdown signal, exiting.")
return
case <-c.pushTicker.C:
shouldPublish = true
case <-c.aggregatorShutdownChan:
shouldPublish = true
case <-c.metricDatumBatchFull():
shouldPublish = true
default:
shouldPublish = false
}

nowMs := time.Now().UnixMilli()

if c.metricDatumBatchFull() {
if !bufferFullOccurred {
// Set to true so this only happens once per push.
bufferFullOccurred = true
// Keep interval above above 1 second.
if currentInterval.Seconds() > 1 {
currentInterval /= 2
if currentInterval.Seconds() < 1 {
currentInterval = 1 * time.Second
}
// Cut the remaining interval in half.
nextMs = nowMs + ((nextMs - nowMs) / 2)
}
}
}

if nowMs >= nextMs {
SaxyPandaBear marked this conversation as resolved.
Show resolved Hide resolved
shouldPublish = true
// Restore interval if buffer did not fill up during this interval.
if !bufferFullOccurred {
currentInterval = c.ForceFlushInterval.Duration
}
nextMs += currentInterval.Milliseconds()
}

if shouldPublish {
c.pushMetricDatumBatch()
} else {
bufferFullOccurred = false
}
// Sleep 1 second, unless the nextMs is less than a second away.
if nextMs - nowMs > time.Second.Milliseconds() {
time.Sleep(time.Second)
} else {
time.Sleep(time.Duration(nextMs - nowMs) * time.Millisecond)
}
}
}

func (c *CloudWatch) metricDatumBatchFull() chan bool {
if len(c.datumBatchChan) >= datumBatchChanBufferSize {
if len(c.datumBatchFullChan) == 0 {
c.datumBatchFullChan <- true
}
return c.datumBatchFullChan
}
return nil
// metricDatumBatchFull returns true if the channel/buffer of batches if full.
func (c *CloudWatch) metricDatumBatchFull() bool {
return len(c.datumBatchChan) >= datumBatchChanBufferSize
adam-mateen marked this conversation as resolved.
Show resolved Hide resolved
}

func (c *CloudWatch) pushMetricDatumBatch() {
Expand All @@ -332,16 +369,17 @@ func (c *CloudWatch) pushMetricDatumBatch() {
}
}

//sleep some back off time before retries.
// backoffSleep sleeps some amount of time based on number of retries done.
func (c *CloudWatch) backoffSleep() {
var backoffInMillis int64 = 60 * 1000 // 1 minute
d := 1 * time.Minute
if c.retries <= defaultRetryCount {
backoffInMillis = int64(backoffRetryBase * math.Pow(2, float64(c.retries)))
d = backoffRetryBase * time.Duration(1 << c.retries)
}
sleepDuration := time.Millisecond * time.Duration(backoffInMillis)
log.Printf("W! %v retries, going to sleep %v before retrying.", c.retries, sleepDuration)
d = (d / 2) + publishJitter(d / 2)
log.Printf("W! cloudwatch: %v retries, going to sleep %v ms before retrying.",
c.retries, d.Milliseconds())
c.retries++
time.Sleep(sleepDuration)
time.Sleep(d)
}

func (c *CloudWatch) WriteToCloudWatch(req interface{}) {
Expand All @@ -357,13 +395,13 @@ func (c *CloudWatch) WriteToCloudWatch(req interface{}) {
if err != nil {
awsErr, ok := err.(awserr.Error)
if !ok {
log.Printf("E! Cannot cast PutMetricData error %v into awserr.Error.", err)
log.Printf("E! cloudwatch: Cannot cast PutMetricData error %v into awserr.Error.", err)
c.backoffSleep()
continue
}
switch awsErr.Code() {
case cloudwatch.ErrCodeLimitExceededFault, cloudwatch.ErrCodeInternalServiceFault:
log.Printf("W! cloudwatch PutMetricData, error: %s, message: %s",
log.Printf("W! cloudwatch: PutMetricData, error: %s, message: %s",
awsErr.Code(),
awsErr.Message())
c.backoffSleep()
Expand All @@ -379,7 +417,7 @@ func (c *CloudWatch) WriteToCloudWatch(req interface{}) {
break
}
if err != nil {
log.Println("E! WriteToCloudWatch failure, err: ", err)
log.Println("E! cloudwatch: WriteToCloudWatch failure, err: ", err)
}
}

Expand Down Expand Up @@ -601,7 +639,9 @@ func (c *CloudWatch) ProcessRollup(rawDimension []*cloudwatch.Dimension) [][]*cl
}

}
log.Printf("D! cloudwatch: Get Full dimensionList %v", fullDimensionsList)
if len(fullDimensionsList) > 0 && len(fullDimensionsList[0]) > 0 {
log.Printf("D! cloudwatch: Get Full dimensionList %v", fullDimensionsList)
}
return fullDimensionsList
}

Expand Down
Loading