Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix a couple of bugs in the logic for how AWS metric periods are calculated #32724

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.next.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,7 @@ https://github.com/elastic/beats/compare/v8.2.0\...main[Check the HEAD diff]

*Metricbeat*

- Fix and improve AWS metric period calculation to avoid zero-length intervals {pull}32724[32724]

*Packetbeat*

Expand Down
2 changes: 1 addition & 1 deletion x-pack/metricbeat/module/aws/billing/billing.go
Original file line number Diff line number Diff line change
Expand Up @@ -124,7 +124,7 @@ func (m *MetricSet) Fetch(report mb.ReporterV2) error {
startDate, endDate := getStartDateEndDate(m.Period)

// Get startTime and endTime
startTime, endTime := aws.GetStartTimeEndTime(m.Period, m.Latency)
startTime, endTime := aws.GetStartTimeEndTime(time.Now(), m.Period, m.Latency)

// get cost metrics from cost explorer
awsBeatsConfig := m.MetricSet.AwsConfig.Copy()
Expand Down
2 changes: 1 addition & 1 deletion x-pack/metricbeat/module/aws/cloudwatch/cloudwatch.go
Original file line number Diff line number Diff line change
Expand Up @@ -126,7 +126,7 @@ func New(base mb.BaseMetricSet) (mb.MetricSet, error) {
// of an error set the Error field of mb.Event or simply call report.Error().
func (m *MetricSet) Fetch(report mb.ReporterV2) error {
// Get startTime and endTime
startTime, endTime := aws.GetStartTimeEndTime(m.Period, m.Latency)
startTime, endTime := aws.GetStartTimeEndTime(time.Now(), m.Period, m.Latency)
m.Logger().Debugf("startTime = %s, endTime = %s", startTime, endTime)

// Check statistic method in config
Expand Down
10 changes: 5 additions & 5 deletions x-pack/metricbeat/module/aws/cloudwatch/cloudwatch_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1392,7 +1392,7 @@ func TestCreateEventsWithIdentifier(t *testing.T) {
Value: "test-ec2",
},
}
startTime, endTime := aws.GetStartTimeEndTime(m.MetricSet.Period, m.MetricSet.Latency)
startTime, endTime := aws.GetStartTimeEndTime(time.Now(), m.MetricSet.Period, m.MetricSet.Latency)

events, err := m.createEvents(mockCloudwatchSvc, mockTaggingSvc, listMetricWithStatsTotal, resourceTypeTagFilters, regionName, startTime, endTime)
assert.NoError(t, err)
Expand Down Expand Up @@ -1432,7 +1432,7 @@ func TestCreateEventsWithoutIdentifier(t *testing.T) {
}

resourceTypeTagFilters := map[string][]aws.Tag{}
startTime, endTime := aws.GetStartTimeEndTime(m.MetricSet.Period, m.MetricSet.Latency)
startTime, endTime := aws.GetStartTimeEndTime(time.Now(), m.MetricSet.Period, m.MetricSet.Latency)

events, err := m.createEvents(mockCloudwatchSvc, mockTaggingSvc, listMetricWithStatsTotal, resourceTypeTagFilters, regionName, startTime, endTime)
assert.NoError(t, err)
Expand Down Expand Up @@ -1478,7 +1478,7 @@ func TestCreateEventsWithTagsFilter(t *testing.T) {
},
}

startTime, endTime := aws.GetStartTimeEndTime(m.MetricSet.Period, m.MetricSet.Latency)
startTime, endTime := aws.GetStartTimeEndTime(time.Now(), m.MetricSet.Period, m.MetricSet.Latency)
events, err := m.createEvents(mockCloudwatchSvc, mockTaggingSvc, listMetricWithStatsTotal, resourceTypeTagFilters, regionName, startTime, endTime)
assert.NoError(t, err)
assert.Equal(t, 1, len(events))
Expand Down Expand Up @@ -1630,7 +1630,7 @@ func TestCreateEventsTimestamp(t *testing.T) {
}

resourceTypeTagFilters := map[string][]aws.Tag{}
startTime, endTime := aws.GetStartTimeEndTime(m.MetricSet.Period, m.MetricSet.Latency)
startTime, endTime := aws.GetStartTimeEndTime(time.Now(), m.MetricSet.Period, m.MetricSet.Latency)

cloudwatchMock := &MockCloudWatchClientWithoutDim{}
resGroupTaggingClientMock := &MockResourceGroupsTaggingClient{}
Expand All @@ -1644,6 +1644,6 @@ func TestGetStartTimeEndTime(t *testing.T) {
m.CloudwatchConfigs = []Config{{Statistic: []string{"Average"}}}
m.MetricSet = &aws.MetricSet{Period: 5 * time.Minute}
m.logger = logp.NewLogger("test")
startTime, endTime := aws.GetStartTimeEndTime(m.MetricSet.Period, m.MetricSet.Latency)
startTime, endTime := aws.GetStartTimeEndTime(time.Now(), m.MetricSet.Period, m.MetricSet.Latency)
assert.Equal(t, 5*time.Minute, endTime.Sub(startTime))
}
32 changes: 16 additions & 16 deletions x-pack/metricbeat/module/aws/utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,22 +18,22 @@ import (
resourcegroupstaggingapitypes "github.com/aws/aws-sdk-go-v2/service/resourcegroupstaggingapi/types"
)

// GetStartTimeEndTime function uses durationString to create startTime and endTime for queries.
func GetStartTimeEndTime(period time.Duration, latency time.Duration) (time.Time, time.Time) {
endTime := time.Now()
if latency != 0 {
// add latency if config is not 0
endTime = endTime.Add(latency * -1)
}

// Set startTime to be one period earlier than the endTime. If metrics are
// not being collected, use latency config parameter to offset the startTime
// and endTime.
startTime := endTime.Add(period * -1)
// Defining duration
d := 60 * time.Second
// Calling Round() method
return startTime.Round(d), endTime.Round(d)
// GetStartTimeEndTime calculates start and end times for queries based on the current time and a duration.
//
// Whilst the inputs to this function are continuous, the maximum period granularity we can consistently use
// is 1 minute. The resulting interval should also be aligned to the period for best performance. This means
// if a period of 3 minutes is requested at 12:05, for example, the calculated times are 12:00->12:03. See
// https://github.com/aws/aws-sdk-go-v2/blob/fdbd882cdf5c63a578caed14688cf9a456c75f2b/service/cloudwatch/api_op_GetMetricData.go#L88
// for more information about granularity and period alignment.
//
// If durations are configured in non-whole minute periods, they are rounded up to the next minute e.g. 90s becomes 120s.
//
// If `latency` is configured, the period is shifted back in time by specified duration (before period alignment).
func GetStartTimeEndTime(now time.Time, period time.Duration, latency time.Duration) (time.Time, time.Time) {
periodInMinutes := (period + time.Second*29).Round(time.Second * 60)
endTime := now.Add(latency * -1).Truncate(periodInMinutes)
startTime := endTime.Add(periodInMinutes * -1)
return startTime, endTime
}

// GetListMetricsOutput function gets listMetrics results from cloudwatch ~~per namespace~~ for each region.
Expand Down
136 changes: 134 additions & 2 deletions x-pack/metricbeat/module/aws/utils_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -160,7 +160,7 @@ func TestGetListMetricsOutputWithWildcard(t *testing.T) {
}

func TestGetMetricDataPerRegion(t *testing.T) {
startTime, endTime := GetStartTimeEndTime(10*time.Minute, 0)
startTime, endTime := GetStartTimeEndTime(time.Now(), 10*time.Minute, 0)

mockSvc := &MockCloudWatchClient{}
var metricDataQueries []cloudwatchtypes.MetricDataQuery
Expand Down Expand Up @@ -194,7 +194,7 @@ func TestGetMetricDataPerRegion(t *testing.T) {
}

func TestGetMetricDataResults(t *testing.T) {
startTime, endTime := GetStartTimeEndTime(10*time.Minute, 0)
startTime, endTime := GetStartTimeEndTime(time.Now(), 10*time.Minute, 0)

mockSvc := &MockCloudWatchClient{}
metricInfo := cloudwatchtypes.Metric{
Expand Down Expand Up @@ -434,3 +434,135 @@ func TestGetResourcesTags(t *testing.T) {
}
assert.Equal(t, expectedResourceTagMap, resourceTagMap)
}

func parseTime(t *testing.T, in string) time.Time {
time, err := time.Parse(time.RFC3339, in)
if err != nil {
t.Errorf("test setup failed - could not parse time with time.RFC3339: %s", in)
}
return time
}

func TestGetStartTimeEndTime(t *testing.T) {
var cases = []struct {
title string
start string
period time.Duration
latency time.Duration
expectedStart string
expectedEnd string
}{
// window should align with period e.g. requesting a 5 minute period at 10:27 gives 10:20->10:25
{"1 minute", "2022-08-15T13:38:45Z", time.Second * 60, 0, "2022-08-15T13:37:00Z", "2022-08-15T13:38:00Z"},
{"2 minutes", "2022-08-15T13:38:45Z", time.Second * 60 * 2, 0, "2022-08-15T13:36:00Z", "2022-08-15T13:38:00Z"},
{"3 minutes", "2022-08-15T13:38:45Z", time.Second * 60 * 3, 0, "2022-08-15T13:33:00Z", "2022-08-15T13:36:00Z"},
kaiyan-sheng marked this conversation as resolved.
Show resolved Hide resolved
{"5 minutes", "2022-08-15T13:38:45Z", time.Second * 60 * 5, 0, "2022-08-15T13:30:00Z", "2022-08-15T13:35:00Z"},
{"30 minutes", "2022-08-15T13:38:45Z", time.Second * 60 * 30, 0, "2022-08-15T13:00:00Z", "2022-08-15T13:30:00Z"},

// latency should shift the time *before* period alignment
// e.g. requesting a 5 minute period at 10:27 with 1 minutes latency still gives 10:20->10:25,
// but with 3 minutes latency gives 10:15->10:20
{"1 minute, 10 minutes latency", "2022-08-15T13:38:45Z", time.Second * 60, time.Second * 60 * 10, "2022-08-15T13:27:00Z", "2022-08-15T13:28:00Z"},
{"2 minutes, 1 minute latency", "2022-08-15T13:38:45Z", time.Second * 60 * 2, time.Second * 60, "2022-08-15T13:34:00Z", "2022-08-15T13:36:00Z"},
{"5 minutes, 4 minutes latency", "2022-08-15T13:38:45Z", time.Second * 60 * 5, time.Second * 60 * 4, "2022-08-15T13:25:00Z", "2022-08-15T13:30:00Z"},
{"30 minutes, 30 minutes latency", "2022-08-15T13:38:45Z", time.Second * 60 * 30, time.Second * 60 * 30, "2022-08-15T12:30:00Z", "2022-08-15T13:00:00Z"},

// non-whole-minute periods should be rounded up to the nearest minute; latency is applied as-is before period adjustment
{"20 seconds, 45 second latency", "2022-08-15T13:38:45Z", time.Second * 20, time.Second * 45, "2022-08-15T13:37:00Z", "2022-08-15T13:38:00Z"},
{"1.5 minutes, 60 second latency", "2022-08-15T13:38:45Z", time.Second * 90, time.Second * 60, "2022-08-15T13:34:00Z", "2022-08-15T13:36:00Z"},
{"just less than 5 minutes, 3 minute latency", "2022-08-15T13:38:45Z", time.Second * 59 * 5, time.Second * 90, "2022-08-15T13:30:00Z", "2022-08-15T13:35:00Z"},
}

for _, tt := range cases {
t.Run(tt.title, func(t *testing.T) {
startTime, expectedStartTime, expectedEndTime := parseTime(t, tt.start), parseTime(t, tt.expectedStart), parseTime(t, tt.expectedEnd)

start, end := GetStartTimeEndTime(startTime, tt.period, tt.latency)

if expectedStartTime != start || expectedEndTime != end {
t.Errorf("got (%s, %s), want (%s, %s)", start, end, tt.expectedStart, tt.expectedEnd)
}
})
}
}

func TestGetStartTimeEndTime_AlwaysCreatesContinuousIntervals(t *testing.T) {
type interval struct {
start, end string
}

startTime := parseTime(t, "2022-08-24T11:01:00Z")
numCalls := 5

var cases = []struct {
title string
period time.Duration
latency time.Duration
expectedIntervals []interval
}{
// with no latency
{"1 minute", time.Second * 60, 0, []interval{
{"2022-08-24T11:00:00Z", "2022-08-24T11:01:00Z"},
{"2022-08-24T11:01:00Z", "2022-08-24T11:02:00Z"},
{"2022-08-24T11:02:00Z", "2022-08-24T11:03:00Z"},
{"2022-08-24T11:03:00Z", "2022-08-24T11:04:00Z"},
{"2022-08-24T11:04:00Z", "2022-08-24T11:05:00Z"},
}},
{"2 minutes", time.Second * 60 * 2, 0, []interval{
{"2022-08-24T10:58:00Z", "2022-08-24T11:00:00Z"},
{"2022-08-24T11:00:00Z", "2022-08-24T11:02:00Z"},
{"2022-08-24T11:02:00Z", "2022-08-24T11:04:00Z"},
{"2022-08-24T11:04:00Z", "2022-08-24T11:06:00Z"},
{"2022-08-24T11:06:00Z", "2022-08-24T11:08:00Z"},
}},
{"3 minutes", time.Second * 60 * 3, 0, []interval{
{"2022-08-24T10:57:00Z", "2022-08-24T11:00:00Z"},
{"2022-08-24T11:00:00Z", "2022-08-24T11:03:00Z"},
{"2022-08-24T11:03:00Z", "2022-08-24T11:06:00Z"},
{"2022-08-24T11:06:00Z", "2022-08-24T11:09:00Z"},
{"2022-08-24T11:09:00Z", "2022-08-24T11:12:00Z"},
}},
{"5 minutes", time.Second * 60 * 5, 0, []interval{
{"2022-08-24T10:55:00Z", "2022-08-24T11:00:00Z"},
{"2022-08-24T11:00:00Z", "2022-08-24T11:05:00Z"},
{"2022-08-24T11:05:00Z", "2022-08-24T11:10:00Z"},
{"2022-08-24T11:10:00Z", "2022-08-24T11:15:00Z"},
{"2022-08-24T11:15:00Z", "2022-08-24T11:20:00Z"},
}},
{"30 minutes", time.Second * 60 * 30, 0, []interval{
{"2022-08-24T10:30:00Z", "2022-08-24T11:00:00Z"},
{"2022-08-24T11:00:00Z", "2022-08-24T11:30:00Z"},
{"2022-08-24T11:30:00Z", "2022-08-24T12:00:00Z"},
{"2022-08-24T12:00:00Z", "2022-08-24T12:30:00Z"},
{"2022-08-24T12:30:00Z", "2022-08-24T13:00:00Z"},
}},

// with 90s latency (sanity check)
{"1 minute with 2 minute latency", time.Second * 60, time.Second * 90, []interval{
{"2022-08-24T10:58:00Z", "2022-08-24T10:59:00Z"},
{"2022-08-24T10:59:00Z", "2022-08-24T11:00:00Z"},
{"2022-08-24T11:00:00Z", "2022-08-24T11:01:00Z"},
{"2022-08-24T11:01:00Z", "2022-08-24T11:02:00Z"},
{"2022-08-24T11:02:00Z", "2022-08-24T11:03:00Z"},
}},
}

for _, tt := range cases {
t.Run(tt.title, func(t *testing.T) {
// get a few repeated intervals
intervals := make([]interval, numCalls)
for i := range intervals {
adjustedStartTime := startTime.Add(tt.period * time.Duration(i))
start, end := GetStartTimeEndTime(adjustedStartTime, tt.period, tt.latency)
intervals[i] = interval{start.Format(time.RFC3339), end.Format(time.RFC3339)}
}

for i, val := range intervals {
if val != tt.expectedIntervals[i] {
t.Errorf("got %v, want %v", intervals, tt.expectedIntervals)
break
}
}
})
}
}