Skip to content

Commit

Permalink
[AWS CloudWatch Metrics] Record previous endTime to use for next coll…
Browse files Browse the repository at this point in the history
…ection period (#40870)

This PR is to record the previous endTime from the previous collection period to use for calculating the next collection start time and end time. This PR also changes log.logger from cloudwatch to aws.cloudwatch to match the other logger.
  • Loading branch information
kaiyan-sheng authored Sep 20, 2024
1 parent 5d2f58e commit cef78ed
Show file tree
Hide file tree
Showing 6 changed files with 77 additions and 42 deletions.
1 change: 1 addition & 0 deletions CHANGELOG-developer.next.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -104,6 +104,7 @@ The list below covers the major changes between 7.0.0-rc2 and main only.
- Fix bug in minimum length for request trace logging. {pull}39834[39834]
- Close connections properly in Filbeat's HTTPJSON input. {pull}39790[39790]
- Add the Offset property to libbeat/reader.Message to store the total number of bytes read and discarded before generating the message. This enables inputs to accurately determine how much data has been read up to the message, using Message.Bytes + Message.Offset. {pull}39873[39873] {issue}39653[39653]
- AWS CloudWatch Metrics record previous endTime to use for next collection period and change log.logger from cloudwatch to aws.cloudwatch. {pull}40870[40870]

==== Added

Expand Down
4 changes: 3 additions & 1 deletion x-pack/metricbeat/module/aws/billing/billing.go
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,7 @@ type MetricSet struct {
*aws.MetricSet
logger *logp.Logger
CostExplorerConfig CostExplorerConfig `config:"cost_explorer_config"`
PreviousEndTime time.Time
}

// CostExplorerConfig holds a configuration specific for billing metricset.
Expand Down Expand Up @@ -121,7 +122,8 @@ func (m *MetricSet) Fetch(report mb.ReporterV2) error {
startDate, endDate := getStartDateEndDate(m.Period)

// Get startTime and endTime
startTime, endTime := aws.GetStartTimeEndTime(time.Now(), m.Period, m.Latency)
startTime, endTime := aws.GetStartTimeEndTime(time.Now(), m.Period, m.Latency, m.PreviousEndTime)
m.PreviousEndTime = endTime

// get cost metrics from cost explorer
awsBeatsConfig := m.MetricSet.AwsConfig.Copy()
Expand Down
6 changes: 4 additions & 2 deletions x-pack/metricbeat/module/aws/cloudwatch/cloudwatch.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@ type MetricSet struct {
*aws.MetricSet
logger *logp.Logger
CloudwatchConfigs []Config `config:"metrics" validate:"nonzero,required"`
PreviousEndTime time.Time
}

// Dimension holds name and value for cloudwatch metricset dimension config.
Expand Down Expand Up @@ -87,7 +88,7 @@ type namespaceDetail struct {
// New creates a new instance of the MetricSet. New is responsible for unpacking
// any MetricSet specific configuration options if there are any.
func New(base mb.BaseMetricSet) (mb.MetricSet, error) {
logger := logp.NewLogger(metricsetName)
logger := logp.NewLogger(aws.ModuleName + "." + metricsetName)
metricSet, err := aws.NewMetricSet(base)
if err != nil {
return nil, fmt.Errorf("error creating aws metricset: %w", err)
Expand Down Expand Up @@ -119,7 +120,8 @@ func New(base mb.BaseMetricSet) (mb.MetricSet, error) {
// of an error set the Error field of mb.Event or simply call report.Error().
func (m *MetricSet) Fetch(report mb.ReporterV2) error {
// Get startTime and endTime
startTime, endTime := aws.GetStartTimeEndTime(time.Now(), m.Period, m.Latency)
startTime, endTime := aws.GetStartTimeEndTime(time.Now(), m.Period, m.Latency, m.PreviousEndTime)
m.PreviousEndTime = endTime
m.Logger().Debugf("startTime = %s, endTime = %s", startTime, endTime)

// Check statistic method in config
Expand Down
18 changes: 12 additions & 6 deletions x-pack/metricbeat/module/aws/cloudwatch/cloudwatch_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1269,7 +1269,8 @@ func TestCreateEventsWithIdentifier(t *testing.T) {
}}
resourceTypeTagFilters := map[string][]aws.Tag{}
resourceTypeTagFilters["ec2:instance"] = nameTestEC2Tag
startTime, endTime := aws.GetStartTimeEndTime(time.Now(), m.MetricSet.Period, m.MetricSet.Latency)
var previousEndTime time.Time
startTime, endTime := aws.GetStartTimeEndTime(time.Now(), m.MetricSet.Period, m.MetricSet.Latency, previousEndTime)

events, err := m.createEvents(mockCloudwatchSvc, mockTaggingSvc, listMetricWithStatsTotal, resourceTypeTagFilters, regionName, startTime, endTime)
assert.NoError(t, err)
Expand Down Expand Up @@ -1314,7 +1315,8 @@ func TestCreateEventsWithoutIdentifier(t *testing.T) {
}

resourceTypeTagFilters := map[string][]aws.Tag{}
startTime, endTime := aws.GetStartTimeEndTime(time.Now(), m.MetricSet.Period, m.MetricSet.Latency)
var previousEndTime time.Time
startTime, endTime := aws.GetStartTimeEndTime(time.Now(), m.MetricSet.Period, m.MetricSet.Latency, previousEndTime)

events, err := m.createEvents(mockCloudwatchSvc, mockTaggingSvc, listMetricWithStatsTotal, resourceTypeTagFilters, regionName, startTime, endTime)
assert.NoError(t, err)
Expand Down Expand Up @@ -1349,7 +1351,8 @@ func TestCreateEventsWithDataGranularity(t *testing.T) {
}

resourceTypeTagFilters := map[string][]aws.Tag{}
startTime, endTime := aws.GetStartTimeEndTime(time.Now(), m.MetricSet.Period, m.MetricSet.Latency)
var previousEndTime time.Time
startTime, endTime := aws.GetStartTimeEndTime(time.Now(), m.MetricSet.Period, m.MetricSet.Latency, previousEndTime)

events, err := m.createEvents(mockCloudwatchSvc, mockTaggingSvc, listMetricWithStatsTotal, resourceTypeTagFilters, regionName, startTime, endTime)
assert.NoError(t, err)
Expand Down Expand Up @@ -1393,7 +1396,8 @@ func TestCreateEventsWithTagsFilter(t *testing.T) {
resourceTypeTagFilters := map[string][]aws.Tag{}
resourceTypeTagFilters["ec2:instance"] = nameTestEC2Tag

startTime, endTime := aws.GetStartTimeEndTime(time.Now(), m.MetricSet.Period, m.MetricSet.Latency)
var previousEndTime time.Time
startTime, endTime := aws.GetStartTimeEndTime(time.Now(), m.MetricSet.Period, m.MetricSet.Latency, previousEndTime)
events, err := m.createEvents(mockCloudwatchSvc, mockTaggingSvc, listMetricWithStatsTotal, resourceTypeTagFilters, regionName, startTime, endTime)
assert.NoError(t, err)
assert.Equal(t, 1, len(events))
Expand Down Expand Up @@ -1556,7 +1560,8 @@ func TestCreateEventsTimestamp(t *testing.T) {
}

resourceTypeTagFilters := map[string][]aws.Tag{}
startTime, endTime := aws.GetStartTimeEndTime(time.Now(), m.MetricSet.Period, m.MetricSet.Latency)
var previousEndTime time.Time
startTime, endTime := aws.GetStartTimeEndTime(time.Now(), m.MetricSet.Period, m.MetricSet.Latency, previousEndTime)

cloudwatchMock := &MockCloudWatchClientWithoutDim{}
resGroupTaggingClientMock := &MockResourceGroupsTaggingClient{}
Expand All @@ -1570,6 +1575,7 @@ func TestGetStartTimeEndTime(t *testing.T) {
m.CloudwatchConfigs = []Config{{Statistic: []string{"Average"}}}
m.MetricSet = &aws.MetricSet{Period: 5 * time.Minute}
m.logger = logp.NewLogger("test")
startTime, endTime := aws.GetStartTimeEndTime(time.Now(), m.MetricSet.Period, m.MetricSet.Latency)
var previousEndTime time.Time
startTime, endTime := aws.GetStartTimeEndTime(time.Now(), m.MetricSet.Period, m.MetricSet.Latency, previousEndTime)
assert.Equal(t, 5*time.Minute, endTime.Sub(startTime))
}
13 changes: 10 additions & 3 deletions x-pack/metricbeat/module/aws/utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,10 +31,17 @@ const DefaultApiTimeout = 5 * time.Second
// If durations are configured in non-whole minute periods, they are rounded up to the next minute e.g. 90s becomes 120s.
//
// If `latency` is configured, the period is shifted back in time by specified duration (before period alignment).
func GetStartTimeEndTime(now time.Time, period time.Duration, latency time.Duration) (time.Time, time.Time) {
// If endTime of the previous collection period is recorded, then use this endTime as the new startTime. This will guarantee no gap between collection timestamps.
func GetStartTimeEndTime(now time.Time, period time.Duration, latency time.Duration, previousEndTime time.Time) (time.Time, time.Time) {
periodInMinutes := (period + time.Second*29).Round(time.Second * 60)
endTime := now.Add(latency * -1).Truncate(periodInMinutes)
startTime := endTime.Add(periodInMinutes * -1)
var startTime, endTime time.Time
if !previousEndTime.IsZero() {
startTime = previousEndTime
endTime = startTime.Add(periodInMinutes)
} else {
endTime = now.Add(latency * -1).Truncate(periodInMinutes)
startTime = endTime.Add(periodInMinutes * -1)
}
return startTime, endTime
}

Expand Down
77 changes: 47 additions & 30 deletions x-pack/metricbeat/module/aws/utils_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -322,7 +322,8 @@ func TestGetListMetricsOutputWithWildcard(t *testing.T) {
}

func TestGetMetricDataPerRegion(t *testing.T) {
startTime, endTime := GetStartTimeEndTime(time.Now(), 10*time.Minute, 0)
var previousEndTime time.Time
startTime, endTime := GetStartTimeEndTime(time.Now(), 10*time.Minute, 0, previousEndTime)

mockSvc := &MockCloudWatchClient{}
var metricDataQueries []cloudwatchtypes.MetricDataQuery
Expand Down Expand Up @@ -356,7 +357,8 @@ func TestGetMetricDataPerRegion(t *testing.T) {
}

func TestGetMetricDataResults(t *testing.T) {
startTime, endTime := GetStartTimeEndTime(time.Now(), 10*time.Minute, 0)
var previousEndTime time.Time
startTime, endTime := GetStartTimeEndTime(time.Now(), 10*time.Minute, 0, previousEndTime)

mockSvc := &MockCloudWatchClient{}
metricInfo := cloudwatchtypes.Metric{
Expand Down Expand Up @@ -393,7 +395,8 @@ func TestGetMetricDataResults(t *testing.T) {
}

func TestGetMetricDataResultsCrossAccounts(t *testing.T) {
startTime, endTime := GetStartTimeEndTime(time.Now(), 10*time.Minute, 0)
var previousEndTime time.Time
startTime, endTime := GetStartTimeEndTime(time.Now(), 10*time.Minute, 0, previousEndTime)

mockSvc := &MockCloudwatchClientCrossAccounts{}
metricInfo := cloudwatchtypes.Metric{
Expand Down Expand Up @@ -552,6 +555,11 @@ func TestGetResourcesTags(t *testing.T) {
}

func parseTime(t *testing.T, in string) time.Time {
var zeroTime time.Time
if in == "" {
return zeroTime
}

time, err := time.Parse(time.RFC3339, in)
if err != nil {
t.Errorf("test setup failed - could not parse time with time.RFC3339: %s", in)
Expand All @@ -561,39 +569,46 @@ func parseTime(t *testing.T, in string) time.Time {

func TestGetStartTimeEndTime(t *testing.T) {
var cases = []struct {
title string
start string
period time.Duration
latency time.Duration
expectedStart string
expectedEnd string
title string
start string
period time.Duration
latency time.Duration
previousEndTime string
expectedStart string
expectedEnd string
}{
// window should align with period e.g. requesting a 5 minute period at 10:27 gives 10:20->10:25
{"1 minute", "2022-08-15T13:38:45Z", time.Second * 60, 0, "2022-08-15T13:37:00Z", "2022-08-15T13:38:00Z"},
{"2 minutes", "2022-08-15T13:38:45Z", time.Second * 60 * 2, 0, "2022-08-15T13:36:00Z", "2022-08-15T13:38:00Z"},
{"3 minutes", "2022-08-15T13:38:45Z", time.Second * 60 * 3, 0, "2022-08-15T13:33:00Z", "2022-08-15T13:36:00Z"},
{"5 minutes", "2022-08-15T13:38:45Z", time.Second * 60 * 5, 0, "2022-08-15T13:30:00Z", "2022-08-15T13:35:00Z"},
{"30 minutes", "2022-08-15T13:38:45Z", time.Second * 60 * 30, 0, "2022-08-15T13:00:00Z", "2022-08-15T13:30:00Z"},
{"1 minute", "2022-08-15T13:38:45Z", time.Second * 60, 0, "", "2022-08-15T13:37:00Z", "2022-08-15T13:38:00Z"},
{"2 minutes", "2022-08-15T13:38:45Z", time.Second * 60 * 2, 0, "", "2022-08-15T13:36:00Z", "2022-08-15T13:38:00Z"},
{"3 minutes", "2022-08-15T13:38:45Z", time.Second * 60 * 3, 0, "", "2022-08-15T13:33:00Z", "2022-08-15T13:36:00Z"},
{"5 minutes", "2022-08-15T13:38:45Z", time.Second * 60 * 5, 0, "", "2022-08-15T13:30:00Z", "2022-08-15T13:35:00Z"},
{"30 minutes", "2022-08-15T13:38:45Z", time.Second * 60 * 30, 0, "", "2022-08-15T13:00:00Z", "2022-08-15T13:30:00Z"},

// latency should shift the time *before* period alignment
// e.g. requesting a 5 minute period at 10:27 with 1 minutes latency still gives 10:20->10:25,
// but with 3 minutes latency gives 10:15->10:20
{"1 minute, 10 minutes latency", "2022-08-15T13:38:45Z", time.Second * 60, time.Second * 60 * 10, "2022-08-15T13:27:00Z", "2022-08-15T13:28:00Z"},
{"2 minutes, 1 minute latency", "2022-08-15T13:38:45Z", time.Second * 60 * 2, time.Second * 60, "2022-08-15T13:34:00Z", "2022-08-15T13:36:00Z"},
{"5 minutes, 4 minutes latency", "2022-08-15T13:38:45Z", time.Second * 60 * 5, time.Second * 60 * 4, "2022-08-15T13:25:00Z", "2022-08-15T13:30:00Z"},
{"30 minutes, 30 minutes latency", "2022-08-15T13:38:45Z", time.Second * 60 * 30, time.Second * 60 * 30, "2022-08-15T12:30:00Z", "2022-08-15T13:00:00Z"},
{"1 minute, 10 minutes latency", "2022-08-15T13:38:45Z", time.Second * 60, time.Second * 60 * 10, "", "2022-08-15T13:27:00Z", "2022-08-15T13:28:00Z"},
{"2 minutes, 1 minute latency", "2022-08-15T13:38:45Z", time.Second * 60 * 2, time.Second * 60, "", "2022-08-15T13:34:00Z", "2022-08-15T13:36:00Z"},
{"5 minutes, 4 minutes latency", "2022-08-15T13:38:45Z", time.Second * 60 * 5, time.Second * 60 * 4, "", "2022-08-15T13:25:00Z", "2022-08-15T13:30:00Z"},
{"30 minutes, 30 minutes latency", "2022-08-15T13:38:45Z", time.Second * 60 * 30, time.Second * 60 * 30, "", "2022-08-15T12:30:00Z", "2022-08-15T13:00:00Z"},

// non-whole-minute periods should be rounded up to the nearest minute; latency is applied as-is before period adjustment
{"20 seconds, 45 second latency", "2022-08-15T13:38:45Z", time.Second * 20, time.Second * 45, "2022-08-15T13:37:00Z", "2022-08-15T13:38:00Z"},
{"1.5 minutes, 60 second latency", "2022-08-15T13:38:45Z", time.Second * 90, time.Second * 60, "2022-08-15T13:34:00Z", "2022-08-15T13:36:00Z"},
{"just less than 5 minutes, 3 minute latency", "2022-08-15T13:38:45Z", time.Second * 59 * 5, time.Second * 90, "2022-08-15T13:30:00Z", "2022-08-15T13:35:00Z"},
{"1 minute, 10 minutes latency", "2022-08-15T13:38:45Z", time.Second * 60, time.Second * 60 * 10, "2022-08-15T13:45:00Z", "2022-08-15T13:45:00Z", "2022-08-15T13:46:00Z"},
{"5 minute, 3 minutes latency", "2022-08-15T13:38:45Z", time.Second * 60 * 5, time.Second * 60 * 3, "2022-08-15T13:45:00Z", "2022-08-15T13:45:00Z", "2022-08-15T13:50:00Z"},

// latency should shift the time *before* period alignment
// previousEndTime should be the same as the next startTime
{"1 minute, 10 minutes latency", "2022-08-15T13:38:45Z", time.Second * 60, time.Second * 60 * 10, "", "2022-08-15T13:27:00Z", "2022-08-15T13:28:00Z"},
{"2 minutes, 1 minute latency", "2022-08-15T13:38:45Z", time.Second * 60 * 2, time.Second * 60, "", "2022-08-15T13:34:00Z", "2022-08-15T13:36:00Z"},
{"5 minutes, 4 minutes latency", "2022-08-15T13:38:45Z", time.Second * 60 * 5, time.Second * 60 * 4, "", "2022-08-15T13:25:00Z", "2022-08-15T13:30:00Z"},
{"30 minutes, 30 minutes latency", "2022-08-15T13:38:45Z", time.Second * 60 * 30, time.Second * 60 * 30, "", "2022-08-15T12:30:00Z", "2022-08-15T13:00:00Z"},
}

for _, tt := range cases {
t.Run(tt.title, func(t *testing.T) {
startTime, expectedStartTime, expectedEndTime := parseTime(t, tt.start), parseTime(t, tt.expectedStart), parseTime(t, tt.expectedEnd)
startTime, previousEndTime, expectedStartTime, expectedEndTime := parseTime(t, tt.start), parseTime(t, tt.previousEndTime), parseTime(t, tt.expectedStart), parseTime(t, tt.expectedEnd)

start, end := GetStartTimeEndTime(startTime, tt.period, tt.latency)
start, end := GetStartTimeEndTime(startTime, tt.period, tt.latency, previousEndTime)

if expectedStartTime != start || expectedEndTime != end {
t.Errorf("got (%s, %s), want (%s, %s)", start, end, tt.expectedStart, tt.expectedEnd)
Expand All @@ -607,45 +622,47 @@ func TestGetStartTimeEndTime_AlwaysCreatesContinuousIntervals(t *testing.T) {
start, end string
}

var previousEndTime time.Time
startTime := parseTime(t, "2022-08-24T11:01:00Z")
numCalls := 5

var cases = []struct {
title string
period time.Duration
latency time.Duration
previousEndTime time.Time
expectedIntervals []interval
}{
// with no latency
{"1 minute", time.Second * 60, 0, []interval{
{"1 minute", time.Second * 60, 0, previousEndTime, []interval{
{"2022-08-24T11:00:00Z", "2022-08-24T11:01:00Z"},
{"2022-08-24T11:01:00Z", "2022-08-24T11:02:00Z"},
{"2022-08-24T11:02:00Z", "2022-08-24T11:03:00Z"},
{"2022-08-24T11:03:00Z", "2022-08-24T11:04:00Z"},
{"2022-08-24T11:04:00Z", "2022-08-24T11:05:00Z"},
}},
{"2 minutes", time.Second * 60 * 2, 0, []interval{
{"2 minutes", time.Second * 60 * 2, 0, previousEndTime, []interval{
{"2022-08-24T10:58:00Z", "2022-08-24T11:00:00Z"},
{"2022-08-24T11:00:00Z", "2022-08-24T11:02:00Z"},
{"2022-08-24T11:02:00Z", "2022-08-24T11:04:00Z"},
{"2022-08-24T11:04:00Z", "2022-08-24T11:06:00Z"},
{"2022-08-24T11:06:00Z", "2022-08-24T11:08:00Z"},
}},
{"3 minutes", time.Second * 60 * 3, 0, []interval{
{"3 minutes", time.Second * 60 * 3, 0, previousEndTime, []interval{
{"2022-08-24T10:57:00Z", "2022-08-24T11:00:00Z"},
{"2022-08-24T11:00:00Z", "2022-08-24T11:03:00Z"},
{"2022-08-24T11:03:00Z", "2022-08-24T11:06:00Z"},
{"2022-08-24T11:06:00Z", "2022-08-24T11:09:00Z"},
{"2022-08-24T11:09:00Z", "2022-08-24T11:12:00Z"},
}},
{"5 minutes", time.Second * 60 * 5, 0, []interval{
{"5 minutes", time.Second * 60 * 5, 0, previousEndTime, []interval{
{"2022-08-24T10:55:00Z", "2022-08-24T11:00:00Z"},
{"2022-08-24T11:00:00Z", "2022-08-24T11:05:00Z"},
{"2022-08-24T11:05:00Z", "2022-08-24T11:10:00Z"},
{"2022-08-24T11:10:00Z", "2022-08-24T11:15:00Z"},
{"2022-08-24T11:15:00Z", "2022-08-24T11:20:00Z"},
}},
{"30 minutes", time.Second * 60 * 30, 0, []interval{
{"30 minutes", time.Second * 60 * 30, 0, previousEndTime, []interval{
{"2022-08-24T10:30:00Z", "2022-08-24T11:00:00Z"},
{"2022-08-24T11:00:00Z", "2022-08-24T11:30:00Z"},
{"2022-08-24T11:30:00Z", "2022-08-24T12:00:00Z"},
Expand All @@ -654,7 +671,7 @@ func TestGetStartTimeEndTime_AlwaysCreatesContinuousIntervals(t *testing.T) {
}},

// with 90s latency (sanity check)
{"1 minute with 2 minute latency", time.Second * 60, time.Second * 90, []interval{
{"1 minute with 2 minute latency", time.Second * 60, time.Second * 90, previousEndTime, []interval{
{"2022-08-24T10:58:00Z", "2022-08-24T10:59:00Z"},
{"2022-08-24T10:59:00Z", "2022-08-24T11:00:00Z"},
{"2022-08-24T11:00:00Z", "2022-08-24T11:01:00Z"},
Expand All @@ -669,7 +686,7 @@ func TestGetStartTimeEndTime_AlwaysCreatesContinuousIntervals(t *testing.T) {
intervals := make([]interval, numCalls)
for i := range intervals {
adjustedStartTime := startTime.Add(tt.period * time.Duration(i))
start, end := GetStartTimeEndTime(adjustedStartTime, tt.period, tt.latency)
start, end := GetStartTimeEndTime(adjustedStartTime, tt.period, tt.latency, tt.previousEndTime)
intervals[i] = interval{start.Format(time.RFC3339), end.Format(time.RFC3339)}
}

Expand Down

0 comments on commit cef78ed

Please sign in to comment.