Skip to content

Commit

Permalink
[input/awscloudwatch] Set startTime to 0 for the first iteration of r…
Browse files Browse the repository at this point in the history
…etrieving log events from CloudWatch (#40079) (#40274)

* set startTime to 0 for the first iteration of retrieving log events from CloudWatch

Signed-off-by: Tetiana Kravchenko <tetiana.kravchenko@elastic.co>

* add pr link; fix description

Signed-off-by: Tetiana Kravchenko <tetiana.kravchenko@elastic.co>

* add test

Signed-off-by: Tetiana Kravchenko <tetiana.kravchenko@elastic.co>

---------

Signed-off-by: Tetiana Kravchenko <tetiana.kravchenko@elastic.co>
(cherry picked from commit c00345f)

Co-authored-by: Tetiana Kravchenko <tetiana.kravchenko@elastic.co>
  • Loading branch information
mergify[bot] and tetianakravchenko authored Jul 17, 2024
1 parent cd8b8a3 commit 8af2d57
Show file tree
Hide file tree
Showing 3 changed files with 48 additions and 2 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -142,6 +142,7 @@ https://github.com/elastic/beats/compare/v8.13.4\...v8.14.0[View commits]
- Fix concurrency/error handling bugs in the AWS S3 input that could drop data and prevent ingestion of large buckets. {pull}39131[39131]
- Fix EntraID query handling. {issue}39419[39419] {pull}39420[39420]
- Expand ID patterns in request trace logger for HTTP Endpoint. {pull}39656[39656]
- Fix awscloudwarch input: set startTime to `0` for the first iteration of retrieving log events from CloudWatch. {pull}40079[40079]

*Heartbeat*

Expand Down
13 changes: 11 additions & 2 deletions x-pack/filebeat/input/awscloudwatch/cloudwatch.go
Original file line number Diff line number Diff line change
Expand Up @@ -104,8 +104,8 @@ func (p *cloudwatchPoller) getLogEventsFromCloudWatch(svc *cloudwatchlogs.Client
func (p *cloudwatchPoller) constructFilterLogEventsInput(startTime, endTime time.Time, logGroup string) *cloudwatchlogs.FilterLogEventsInput {
filterLogEventsInput := &cloudwatchlogs.FilterLogEventsInput{
LogGroupName: awssdk.String(logGroup),
StartTime: awssdk.Int64(startTime.UnixNano() / int64(time.Millisecond)),
EndTime: awssdk.Int64(endTime.UnixNano() / int64(time.Millisecond)),
StartTime: awssdk.Int64(unixMsFromTime(startTime)),
EndTime: awssdk.Int64(unixMsFromTime(endTime)),
}

if len(p.config.LogStreams) > 0 {
Expand Down Expand Up @@ -185,3 +185,12 @@ func (p *cloudwatchPoller) receive(ctx context.Context, logGroupNames []string,
startTime, endTime = endTime, clock().Add(-p.config.Latency)
}
}

// unixMsFromTime converts time to unix milliseconds.
// Returns 0 both the init time `time.Time{}`, instead of -6795364578871
func unixMsFromTime(v time.Time) int64 {
if v.IsZero() {
return 0
}
return v.UnixNano() / int64(time.Millisecond)
}
36 changes: 36 additions & 0 deletions x-pack/filebeat/input/awscloudwatch/cloudwatch_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,8 @@ import (
"testing"
"time"

awssdk "github.com/aws/aws-sdk-go-v2/aws"
"github.com/aws/aws-sdk-go-v2/service/cloudwatchlogs"
"github.com/stretchr/testify/assert"

"github.com/elastic/elastic-agent-libs/logp"
Expand Down Expand Up @@ -205,3 +207,37 @@ func TestReceive(t *testing.T) {
cancel()
}
}

type filterLogEventsTestCase struct {
name string
logGroup string
startTime time.Time
endTime time.Time
expected *cloudwatchlogs.FilterLogEventsInput
}

func TestFilterLogEventsInput(t *testing.T) {
now, _ := time.Parse(time.RFC3339, "2024-07-12T13:00:00+00:00")
testCases := []filterLogEventsTestCase{
{
name: "StartPosition: beginning, first iteration",
logGroup: "a",
// The zero value of type time.Time{} is January 1, year 1, 00:00:00.000000000 UTC
// Events with a timestamp before the time - January 1, 1970, 00:00:00 UTC are not returned by AWS API
// make sure zero value of time.Time{} was converted
startTime: time.Time{},
endTime: now,
expected: &cloudwatchlogs.FilterLogEventsInput{
LogGroupName: awssdk.String("a"),
StartTime: awssdk.Int64(0),
EndTime: awssdk.Int64(1720789200000),
},
},
}
for _, test := range testCases {
p := cloudwatchPoller{}
result := p.constructFilterLogEventsInput(test.startTime, test.endTime, test.logGroup)
assert.Equal(t, test.expected, result)
}

}

0 comments on commit 8af2d57

Please sign in to comment.