Skip to content

Commit

Permalink
[x-pack][filebeat] incorporate LastModified into s3 event _id generat…
Browse files Browse the repository at this point in the history
…ion (#42078)

* [x-pack][filebeat] incorporate LastModified into s3 event _id generation

* update PR id in changelog

* log LastModified value

* add unit test

* fix unit tests on windows

* use UnixMilli instead of time string

* change s3 event _id format

* fix changelog

* Update x-pack/filebeat/input/gcs/input_test.go

Co-authored-by: Dan Kortschak <dan.kortschak@elastic.co>

* Update x-pack/filebeat/input/azureblobstorage/input_test.go

Co-authored-by: Dan Kortschak <dan.kortschak@elastic.co>

* revert windows test fix

* add unit test

---------

Co-authored-by: Dan Kortschak <dan.kortschak@elastic.co>
  • Loading branch information
stefans-elastic and efd6 authored Dec 20, 2024
1 parent 20a1776 commit 58a5369
Show file tree
Hide file tree
Showing 7 changed files with 21 additions and 7 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.next.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -197,6 +197,7 @@ https://github.com/elastic/beats/compare/v8.8.1\...main[Check the HEAD diff]
- Further rate limiting fix in the Okta provider of the Entity Analytics input. {issue}40106[40106] {pull}41977[41977]
- Fix streaming input handling of invalid or empty websocket messages. {pull}42036[42036]
- Fix awss3 document ID construction when using the CSV decoder. {pull}42019[42019]
- The `_id` generation process for S3 events has been updated to incorporate the LastModified field. This enhancement ensures that the `_id` is unique. {pull}42078[42078]
- Fix Netflow Template Sharing configuration handling. {pull}42080[42080]

*Heartbeat*
Expand Down
1 change: 1 addition & 0 deletions x-pack/filebeat/input/awss3/s3_input.go
Original file line number Diff line number Diff line change
Expand Up @@ -256,5 +256,6 @@ func (in *s3PollerInput) s3EventForState(state state) s3EventV2 {
event.S3.Bucket.Name = state.Bucket
event.S3.Bucket.ARN = in.config.getBucketARN()
event.S3.Object.Key = state.Key
event.S3.Object.LastModified = state.LastModified
return event
}
9 changes: 5 additions & 4 deletions x-pack/filebeat/input/awss3/s3_objects.go
Original file line number Diff line number Diff line change
Expand Up @@ -117,7 +117,8 @@ func (p *s3ObjectProcessor) ProcessS3Object(log *logp.Logger, eventCallback func
p.eventCallback = eventCallback
log = log.With(
"bucket_arn", p.s3Obj.S3.Bucket.Name,
"object_key", p.s3Obj.S3.Object.Key)
"object_key", p.s3Obj.S3.Object.Key,
"last_modified", p.s3Obj.S3.Object.LastModified)

// Metrics and Logging
log.Debug("Begin S3 object processing.")
Expand Down Expand Up @@ -454,7 +455,7 @@ func (p *s3ObjectProcessor) createEvent(message string, offset int64) beat.Event
}
if offset >= 0 {
event.Fields.Put("log.offset", offset)
event.SetID(objectID(p.s3ObjHash, offset))
event.SetID(objectID(p.s3Obj.S3.Object.LastModified, p.s3ObjHash, offset))
}

if len(p.s3Metadata) > 0 {
Expand Down Expand Up @@ -484,8 +485,8 @@ func (p *s3ObjectProcessor) FinalizeS3Object() error {
return nil
}

func objectID(objectHash string, offset int64) string {
return fmt.Sprintf("%s-%012d", objectHash, offset)
func objectID(lastModified time.Time, objectHash string, offset int64) string {
return fmt.Sprintf("%d-%s-%012d", lastModified.UnixNano(), objectHash, offset)
}

// s3ObjectHash returns a short sha256 hash of the bucket arn + object key name.
Expand Down
8 changes: 8 additions & 0 deletions x-pack/filebeat/input/awss3/s3_objects_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ import (
"path/filepath"
"strings"
"testing"
"time"

awssdk "github.com/aws/aws-sdk-go-v2/aws"
"github.com/aws/aws-sdk-go-v2/service/s3"
Expand Down Expand Up @@ -435,6 +436,13 @@ func TestNewMockS3Pager(t *testing.T) {
assert.Equal(t, []string{"foo", "bar", "baz"}, keys)
}

func Test_objectID(t *testing.T) {
lastModified, _ := time.Parse("2006-01-02 15:04:05 -0700", "2024-11-07 12:44:22 +0000")
objId := objectID(lastModified, "fe8a230c26", 42)

assert.Equal(t, "1730983462000000000-fe8a230c26-000000000042", objId)
}

// newMockS3Pager returns a s3Pager that paginates the given s3Objects based on
// the specified page size. It never returns an error.
func newMockS3Pager(ctrl *gomock.Controller, pageSize int, s3Objects []types.Object) *MockS3Pager {
Expand Down
3 changes: 2 additions & 1 deletion x-pack/filebeat/input/awss3/sqs_s3_event.go
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,8 @@ type s3EventV2 struct {
ARN string `json:"arn"`
} `json:"bucket"`
Object struct {
Key string `json:"key"`
Key string `json:"key"`
LastModified time.Time `json:"lastModified"`
} `json:"object"`
} `json:"s3"`
}
Expand Down
3 changes: 2 additions & 1 deletion x-pack/filebeat/input/azureblobstorage/input_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -540,7 +540,8 @@ func Test_StorageClient(t *testing.T) {
var err error
val, err = got.Fields.GetValue("message")
assert.NoError(t, err)
assert.True(t, tt.expected[val.(string)])

assert.True(t, tt.expected[strings.ReplaceAll(val.(string), "\r\n", "\n")])
assert.Equal(t, tt.expectedError, err)
receivedCount += 1
if receivedCount == len(tt.expected) {
Expand Down
3 changes: 2 additions & 1 deletion x-pack/filebeat/input/gcs/input_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ import (
"fmt"
"net/http"
"net/http/httptest"
"strings"
"testing"
"time"

Expand Down Expand Up @@ -662,7 +663,7 @@ func Test_StorageClient(t *testing.T) {
if !tt.checkJSON {
val, err = got.Fields.GetValue("message")
assert.NoError(t, err)
assert.True(t, tt.expected[val.(string)])
assert.True(t, tt.expected[strings.ReplaceAll(val.(string), "\r\n", "\n")])
} else {
val, err = got.Fields.GetValue("gcs.storage.object.json_data")
fVal := fmt.Sprintf("%v", val)
Expand Down

0 comments on commit 58a5369

Please sign in to comment.