Skip to content

Commit

Permalink
Fix a couple of bugs in the logic for how AWS metric periods are calc…
Browse files Browse the repository at this point in the history
…ulated (#32724)

* Fix a couple of bugs in the logic for how AWS metric periods are calcuated.

Firstly, we clarify that periods are always whole-minute durations.
Any period that is less than or in between minutes is rounded up to the
next whole-minute.

Secondly, we ensure that the resulting time period is always in the past.
This stops us getting empty metrics for the current-minute period.

Thirdly, we follow the AWS guidelines of aligning periods to whole
multiples within the hour e.g. 10:25->10:30 instead of 10:27->10:32
for a 5 minute period.

* add test that validates intervals are continuous for given periods

* update changelog

* fix merge error
  • Loading branch information
tommyers-elastic committed Aug 25, 2022
1 parent 9cab775 commit df3733e
Show file tree
Hide file tree
Showing 6 changed files with 158 additions and 25 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.next.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,7 @@ https://github.com/elastic/beats/compare/v8.2.0\...main[Check the HEAD diff]

*Metricbeat*

- Fix and improve AWS metric period calculation to avoid zero-length intervals {pull}32724[32724]

*Packetbeat*

Expand Down
2 changes: 1 addition & 1 deletion x-pack/metricbeat/module/aws/billing/billing.go
Original file line number Diff line number Diff line change
Expand Up @@ -124,7 +124,7 @@ func (m *MetricSet) Fetch(report mb.ReporterV2) error {
startDate, endDate := getStartDateEndDate(m.Period)

// Get startTime and endTime
startTime, endTime := aws.GetStartTimeEndTime(m.Period, m.Latency)
startTime, endTime := aws.GetStartTimeEndTime(time.Now(), m.Period, m.Latency)

// get cost metrics from cost explorer
awsBeatsConfig := m.MetricSet.AwsConfig.Copy()
Expand Down
2 changes: 1 addition & 1 deletion x-pack/metricbeat/module/aws/cloudwatch/cloudwatch.go
Original file line number Diff line number Diff line change
Expand Up @@ -126,7 +126,7 @@ func New(base mb.BaseMetricSet) (mb.MetricSet, error) {
// of an error set the Error field of mb.Event or simply call report.Error().
func (m *MetricSet) Fetch(report mb.ReporterV2) error {
// Get startTime and endTime
startTime, endTime := aws.GetStartTimeEndTime(m.Period, m.Latency)
startTime, endTime := aws.GetStartTimeEndTime(time.Now(), m.Period, m.Latency)
m.Logger().Debugf("startTime = %s, endTime = %s", startTime, endTime)

// Check statistic method in config
Expand Down
10 changes: 5 additions & 5 deletions x-pack/metricbeat/module/aws/cloudwatch/cloudwatch_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1392,7 +1392,7 @@ func TestCreateEventsWithIdentifier(t *testing.T) {
Value: "test-ec2",
},
}
startTime, endTime := aws.GetStartTimeEndTime(m.MetricSet.Period, m.MetricSet.Latency)
startTime, endTime := aws.GetStartTimeEndTime(time.Now(), m.MetricSet.Period, m.MetricSet.Latency)

events, err := m.createEvents(mockCloudwatchSvc, mockTaggingSvc, listMetricWithStatsTotal, resourceTypeTagFilters, regionName, startTime, endTime)
assert.NoError(t, err)
Expand Down Expand Up @@ -1432,7 +1432,7 @@ func TestCreateEventsWithoutIdentifier(t *testing.T) {
}

resourceTypeTagFilters := map[string][]aws.Tag{}
startTime, endTime := aws.GetStartTimeEndTime(m.MetricSet.Period, m.MetricSet.Latency)
startTime, endTime := aws.GetStartTimeEndTime(time.Now(), m.MetricSet.Period, m.MetricSet.Latency)

events, err := m.createEvents(mockCloudwatchSvc, mockTaggingSvc, listMetricWithStatsTotal, resourceTypeTagFilters, regionName, startTime, endTime)
assert.NoError(t, err)
Expand Down Expand Up @@ -1478,7 +1478,7 @@ func TestCreateEventsWithTagsFilter(t *testing.T) {
},
}

startTime, endTime := aws.GetStartTimeEndTime(m.MetricSet.Period, m.MetricSet.Latency)
startTime, endTime := aws.GetStartTimeEndTime(time.Now(), m.MetricSet.Period, m.MetricSet.Latency)
events, err := m.createEvents(mockCloudwatchSvc, mockTaggingSvc, listMetricWithStatsTotal, resourceTypeTagFilters, regionName, startTime, endTime)
assert.NoError(t, err)
assert.Equal(t, 1, len(events))
Expand Down Expand Up @@ -1630,7 +1630,7 @@ func TestCreateEventsTimestamp(t *testing.T) {
}

resourceTypeTagFilters := map[string][]aws.Tag{}
startTime, endTime := aws.GetStartTimeEndTime(m.MetricSet.Period, m.MetricSet.Latency)
startTime, endTime := aws.GetStartTimeEndTime(time.Now(), m.MetricSet.Period, m.MetricSet.Latency)

cloudwatchMock := &MockCloudWatchClientWithoutDim{}
resGroupTaggingClientMock := &MockResourceGroupsTaggingClient{}
Expand All @@ -1644,6 +1644,6 @@ func TestGetStartTimeEndTime(t *testing.T) {
m.CloudwatchConfigs = []Config{{Statistic: []string{"Average"}}}
m.MetricSet = &aws.MetricSet{Period: 5 * time.Minute}
m.logger = logp.NewLogger("test")
startTime, endTime := aws.GetStartTimeEndTime(m.MetricSet.Period, m.MetricSet.Latency)
startTime, endTime := aws.GetStartTimeEndTime(time.Now(), m.MetricSet.Period, m.MetricSet.Latency)
assert.Equal(t, 5*time.Minute, endTime.Sub(startTime))
}
32 changes: 16 additions & 16 deletions x-pack/metricbeat/module/aws/utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,22 +18,22 @@ import (
resourcegroupstaggingapitypes "github.com/aws/aws-sdk-go-v2/service/resourcegroupstaggingapi/types"
)

// GetStartTimeEndTime function uses durationString to create startTime and endTime for queries.
func GetStartTimeEndTime(period time.Duration, latency time.Duration) (time.Time, time.Time) {
endTime := time.Now()
if latency != 0 {
// add latency if config is not 0
endTime = endTime.Add(latency * -1)
}

// Set startTime to be one period earlier than the endTime. If metrics are
// not being collected, use latency config parameter to offset the startTime
// and endTime.
startTime := endTime.Add(period * -1)
// Defining duration
d := 60 * time.Second
// Calling Round() method
return startTime.Round(d), endTime.Round(d)
// GetStartTimeEndTime calculates start and end times for queries based on the current time and a duration.
//
// Whilst the inputs to this function are continuous, the maximum period granularity we can consistently use
// is 1 minute. The resulting interval should also be aligned to the period for best performance. This means
// if a period of 3 minutes is requested at 12:05, for example, the calculated times are 12:00->12:03. See
// https://github.com/aws/aws-sdk-go-v2/blob/fdbd882cdf5c63a578caed14688cf9a456c75f2b/service/cloudwatch/api_op_GetMetricData.go#L88
// for more information about granularity and period alignment.
//
// If durations are configured in non-whole minute periods, they are rounded up to the next minute e.g. 90s becomes 120s.
//
// If `latency` is configured, the period is shifted back in time by specified duration (before period alignment).
func GetStartTimeEndTime(now time.Time, period time.Duration, latency time.Duration) (time.Time, time.Time) {
periodInMinutes := (period + time.Second*29).Round(time.Second * 60)
endTime := now.Add(latency * -1).Truncate(periodInMinutes)
startTime := endTime.Add(periodInMinutes * -1)
return startTime, endTime
}

// GetListMetricsOutput function gets listMetrics results from cloudwatch ~~per namespace~~ for each region.
Expand Down
136 changes: 134 additions & 2 deletions x-pack/metricbeat/module/aws/utils_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -160,7 +160,7 @@ func TestGetListMetricsOutputWithWildcard(t *testing.T) {
}

func TestGetMetricDataPerRegion(t *testing.T) {
startTime, endTime := GetStartTimeEndTime(10*time.Minute, 0)
startTime, endTime := GetStartTimeEndTime(time.Now(), 10*time.Minute, 0)

mockSvc := &MockCloudWatchClient{}
var metricDataQueries []cloudwatchtypes.MetricDataQuery
Expand Down Expand Up @@ -194,7 +194,7 @@ func TestGetMetricDataPerRegion(t *testing.T) {
}

func TestGetMetricDataResults(t *testing.T) {
startTime, endTime := GetStartTimeEndTime(10*time.Minute, 0)
startTime, endTime := GetStartTimeEndTime(time.Now(), 10*time.Minute, 0)

mockSvc := &MockCloudWatchClient{}
metricInfo := cloudwatchtypes.Metric{
Expand Down Expand Up @@ -434,3 +434,135 @@ func TestGetResourcesTags(t *testing.T) {
}
assert.Equal(t, expectedResourceTagMap, resourceTagMap)
}

func parseTime(t *testing.T, in string) time.Time {
time, err := time.Parse(time.RFC3339, in)
if err != nil {
t.Errorf("test setup failed - could not parse time with time.RFC3339: %s", in)
}
return time
}

func TestGetStartTimeEndTime(t *testing.T) {
var cases = []struct {
title string
start string
period time.Duration
latency time.Duration
expectedStart string
expectedEnd string
}{
// window should align with period e.g. requesting a 5 minute period at 10:27 gives 10:20->10:25
{"1 minute", "2022-08-15T13:38:45Z", time.Second * 60, 0, "2022-08-15T13:37:00Z", "2022-08-15T13:38:00Z"},
{"2 minutes", "2022-08-15T13:38:45Z", time.Second * 60 * 2, 0, "2022-08-15T13:36:00Z", "2022-08-15T13:38:00Z"},
{"3 minutes", "2022-08-15T13:38:45Z", time.Second * 60 * 3, 0, "2022-08-15T13:33:00Z", "2022-08-15T13:36:00Z"},
{"5 minutes", "2022-08-15T13:38:45Z", time.Second * 60 * 5, 0, "2022-08-15T13:30:00Z", "2022-08-15T13:35:00Z"},
{"30 minutes", "2022-08-15T13:38:45Z", time.Second * 60 * 30, 0, "2022-08-15T13:00:00Z", "2022-08-15T13:30:00Z"},

// latency should shift the time *before* period alignment
// e.g. requesting a 5 minute period at 10:27 with 1 minutes latency still gives 10:20->10:25,
// but with 3 minutes latency gives 10:15->10:20
{"1 minute, 10 minutes latency", "2022-08-15T13:38:45Z", time.Second * 60, time.Second * 60 * 10, "2022-08-15T13:27:00Z", "2022-08-15T13:28:00Z"},
{"2 minutes, 1 minute latency", "2022-08-15T13:38:45Z", time.Second * 60 * 2, time.Second * 60, "2022-08-15T13:34:00Z", "2022-08-15T13:36:00Z"},
{"5 minutes, 4 minutes latency", "2022-08-15T13:38:45Z", time.Second * 60 * 5, time.Second * 60 * 4, "2022-08-15T13:25:00Z", "2022-08-15T13:30:00Z"},
{"30 minutes, 30 minutes latency", "2022-08-15T13:38:45Z", time.Second * 60 * 30, time.Second * 60 * 30, "2022-08-15T12:30:00Z", "2022-08-15T13:00:00Z"},

// non-whole-minute periods should be rounded up to the nearest minute; latency is applied as-is before period adjustment
{"20 seconds, 45 second latency", "2022-08-15T13:38:45Z", time.Second * 20, time.Second * 45, "2022-08-15T13:37:00Z", "2022-08-15T13:38:00Z"},
{"1.5 minutes, 60 second latency", "2022-08-15T13:38:45Z", time.Second * 90, time.Second * 60, "2022-08-15T13:34:00Z", "2022-08-15T13:36:00Z"},
{"just less than 5 minutes, 3 minute latency", "2022-08-15T13:38:45Z", time.Second * 59 * 5, time.Second * 90, "2022-08-15T13:30:00Z", "2022-08-15T13:35:00Z"},
}

for _, tt := range cases {
t.Run(tt.title, func(t *testing.T) {
startTime, expectedStartTime, expectedEndTime := parseTime(t, tt.start), parseTime(t, tt.expectedStart), parseTime(t, tt.expectedEnd)

start, end := GetStartTimeEndTime(startTime, tt.period, tt.latency)

if expectedStartTime != start || expectedEndTime != end {
t.Errorf("got (%s, %s), want (%s, %s)", start, end, tt.expectedStart, tt.expectedEnd)
}
})
}
}

func TestGetStartTimeEndTime_AlwaysCreatesContinuousIntervals(t *testing.T) {
type interval struct {
start, end string
}

startTime := parseTime(t, "2022-08-24T11:01:00Z")
numCalls := 5

var cases = []struct {
title string
period time.Duration
latency time.Duration
expectedIntervals []interval
}{
// with no latency
{"1 minute", time.Second * 60, 0, []interval{
{"2022-08-24T11:00:00Z", "2022-08-24T11:01:00Z"},
{"2022-08-24T11:01:00Z", "2022-08-24T11:02:00Z"},
{"2022-08-24T11:02:00Z", "2022-08-24T11:03:00Z"},
{"2022-08-24T11:03:00Z", "2022-08-24T11:04:00Z"},
{"2022-08-24T11:04:00Z", "2022-08-24T11:05:00Z"},
}},
{"2 minutes", time.Second * 60 * 2, 0, []interval{
{"2022-08-24T10:58:00Z", "2022-08-24T11:00:00Z"},
{"2022-08-24T11:00:00Z", "2022-08-24T11:02:00Z"},
{"2022-08-24T11:02:00Z", "2022-08-24T11:04:00Z"},
{"2022-08-24T11:04:00Z", "2022-08-24T11:06:00Z"},
{"2022-08-24T11:06:00Z", "2022-08-24T11:08:00Z"},
}},
{"3 minutes", time.Second * 60 * 3, 0, []interval{
{"2022-08-24T10:57:00Z", "2022-08-24T11:00:00Z"},
{"2022-08-24T11:00:00Z", "2022-08-24T11:03:00Z"},
{"2022-08-24T11:03:00Z", "2022-08-24T11:06:00Z"},
{"2022-08-24T11:06:00Z", "2022-08-24T11:09:00Z"},
{"2022-08-24T11:09:00Z", "2022-08-24T11:12:00Z"},
}},
{"5 minutes", time.Second * 60 * 5, 0, []interval{
{"2022-08-24T10:55:00Z", "2022-08-24T11:00:00Z"},
{"2022-08-24T11:00:00Z", "2022-08-24T11:05:00Z"},
{"2022-08-24T11:05:00Z", "2022-08-24T11:10:00Z"},
{"2022-08-24T11:10:00Z", "2022-08-24T11:15:00Z"},
{"2022-08-24T11:15:00Z", "2022-08-24T11:20:00Z"},
}},
{"30 minutes", time.Second * 60 * 30, 0, []interval{
{"2022-08-24T10:30:00Z", "2022-08-24T11:00:00Z"},
{"2022-08-24T11:00:00Z", "2022-08-24T11:30:00Z"},
{"2022-08-24T11:30:00Z", "2022-08-24T12:00:00Z"},
{"2022-08-24T12:00:00Z", "2022-08-24T12:30:00Z"},
{"2022-08-24T12:30:00Z", "2022-08-24T13:00:00Z"},
}},

// with 90s latency (sanity check)
{"1 minute with 2 minute latency", time.Second * 60, time.Second * 90, []interval{
{"2022-08-24T10:58:00Z", "2022-08-24T10:59:00Z"},
{"2022-08-24T10:59:00Z", "2022-08-24T11:00:00Z"},
{"2022-08-24T11:00:00Z", "2022-08-24T11:01:00Z"},
{"2022-08-24T11:01:00Z", "2022-08-24T11:02:00Z"},
{"2022-08-24T11:02:00Z", "2022-08-24T11:03:00Z"},
}},
}

for _, tt := range cases {
t.Run(tt.title, func(t *testing.T) {
// get a few repeated intervals
intervals := make([]interval, numCalls)
for i := range intervals {
adjustedStartTime := startTime.Add(tt.period * time.Duration(i))
start, end := GetStartTimeEndTime(adjustedStartTime, tt.period, tt.latency)
intervals[i] = interval{start.Format(time.RFC3339), end.Format(time.RFC3339)}
}

for i, val := range intervals {
if val != tt.expectedIntervals[i] {
t.Errorf("got %v, want %v", intervals, tt.expectedIntervals)
break
}
}
})
}
}

0 comments on commit df3733e

Please sign in to comment.