diff --git a/CHANGELOG.next.asciidoc b/CHANGELOG.next.asciidoc index c46d61fa9023..7107b4197c10 100644 --- a/CHANGELOG.next.asciidoc +++ b/CHANGELOG.next.asciidoc @@ -197,6 +197,7 @@ https://github.com/elastic/beats/compare/v8.8.1\...main[Check the HEAD diff] - Further rate limiting fix in the Okta provider of the Entity Analytics input. {issue}40106[40106] {pull}41977[41977] - Fix streaming input handling of invalid or empty websocket messages. {pull}42036[42036] - Fix awss3 document ID construction when using the CSV decoder. {pull}42019[42019] +- The `_id` generation process for S3 events has been updated to incorporate the LastModified field. This enhancement ensures that the `_id` is unique. {pull}42078[42078] - Fix Netflow Template Sharing configuration handling. {pull}42080[42080] *Heartbeat* diff --git a/x-pack/filebeat/input/awss3/s3_input.go b/x-pack/filebeat/input/awss3/s3_input.go index 88f28e39e83e..c2d3b8446cc4 100644 --- a/x-pack/filebeat/input/awss3/s3_input.go +++ b/x-pack/filebeat/input/awss3/s3_input.go @@ -256,5 +256,6 @@ func (in *s3PollerInput) s3EventForState(state state) s3EventV2 { event.S3.Bucket.Name = state.Bucket event.S3.Bucket.ARN = in.config.getBucketARN() event.S3.Object.Key = state.Key + event.S3.Object.LastModified = state.LastModified return event } diff --git a/x-pack/filebeat/input/awss3/s3_objects.go b/x-pack/filebeat/input/awss3/s3_objects.go index c36bd7858f96..93cb0f262bdc 100644 --- a/x-pack/filebeat/input/awss3/s3_objects.go +++ b/x-pack/filebeat/input/awss3/s3_objects.go @@ -117,7 +117,8 @@ func (p *s3ObjectProcessor) ProcessS3Object(log *logp.Logger, eventCallback func p.eventCallback = eventCallback log = log.With( "bucket_arn", p.s3Obj.S3.Bucket.Name, - "object_key", p.s3Obj.S3.Object.Key) + "object_key", p.s3Obj.S3.Object.Key, + "last_modified", p.s3Obj.S3.Object.LastModified) // Metrics and Logging log.Debug("Begin S3 object processing.") @@ -454,7 +455,7 @@ func (p *s3ObjectProcessor) createEvent(message string, offset int64) beat.Event } if offset >= 0 { event.Fields.Put("log.offset", offset) - event.SetID(objectID(p.s3ObjHash, offset)) + event.SetID(objectID(p.s3Obj.S3.Object.LastModified, p.s3ObjHash, offset)) } if len(p.s3Metadata) > 0 { @@ -484,8 +485,8 @@ func (p *s3ObjectProcessor) FinalizeS3Object() error { return nil } -func objectID(objectHash string, offset int64) string { - return fmt.Sprintf("%s-%012d", objectHash, offset) +func objectID(lastModified time.Time, objectHash string, offset int64) string { + return fmt.Sprintf("%d-%s-%012d", lastModified.UnixNano(), objectHash, offset) } // s3ObjectHash returns a short sha256 hash of the bucket arn + object key name. diff --git a/x-pack/filebeat/input/awss3/s3_objects_test.go b/x-pack/filebeat/input/awss3/s3_objects_test.go index e6178c33a87f..fe2ffd84e245 100644 --- a/x-pack/filebeat/input/awss3/s3_objects_test.go +++ b/x-pack/filebeat/input/awss3/s3_objects_test.go @@ -13,6 +13,7 @@ import ( "path/filepath" "strings" "testing" + "time" awssdk "github.com/aws/aws-sdk-go-v2/aws" "github.com/aws/aws-sdk-go-v2/service/s3" @@ -435,6 +436,13 @@ func TestNewMockS3Pager(t *testing.T) { assert.Equal(t, []string{"foo", "bar", "baz"}, keys) } +func Test_objectID(t *testing.T) { + lastModified, _ := time.Parse("2006-01-02 15:04:05 -0700", "2024-11-07 12:44:22 +0000") + objId := objectID(lastModified, "fe8a230c26", 42) + + assert.Equal(t, "1730983462000000000-fe8a230c26-000000000042", objId) +} + // newMockS3Pager returns a s3Pager that paginates the given s3Objects based on // the specified page size. It never returns an error. func newMockS3Pager(ctrl *gomock.Controller, pageSize int, s3Objects []types.Object) *MockS3Pager { diff --git a/x-pack/filebeat/input/awss3/sqs_s3_event.go b/x-pack/filebeat/input/awss3/sqs_s3_event.go index 884cf7adbbce..cb39376f7b47 100644 --- a/x-pack/filebeat/input/awss3/sqs_s3_event.go +++ b/x-pack/filebeat/input/awss3/sqs_s3_event.go @@ -77,7 +77,8 @@ type s3EventV2 struct { ARN string `json:"arn"` } `json:"bucket"` Object struct { - Key string `json:"key"` + Key string `json:"key"` + LastModified time.Time `json:"lastModified"` } `json:"object"` } `json:"s3"` } diff --git a/x-pack/filebeat/input/azureblobstorage/input_test.go b/x-pack/filebeat/input/azureblobstorage/input_test.go index d4b004980b8d..0e5441dcd438 100644 --- a/x-pack/filebeat/input/azureblobstorage/input_test.go +++ b/x-pack/filebeat/input/azureblobstorage/input_test.go @@ -540,7 +540,8 @@ func Test_StorageClient(t *testing.T) { var err error val, err = got.Fields.GetValue("message") assert.NoError(t, err) - assert.True(t, tt.expected[val.(string)]) + + assert.True(t, tt.expected[strings.ReplaceAll(val.(string), "\r\n", "\n")]) assert.Equal(t, tt.expectedError, err) receivedCount += 1 if receivedCount == len(tt.expected) { diff --git a/x-pack/filebeat/input/gcs/input_test.go b/x-pack/filebeat/input/gcs/input_test.go index 8ff0576bdf5e..f8f4f006e870 100644 --- a/x-pack/filebeat/input/gcs/input_test.go +++ b/x-pack/filebeat/input/gcs/input_test.go @@ -13,6 +13,7 @@ import ( "fmt" "net/http" "net/http/httptest" + "strings" "testing" "time" @@ -662,7 +663,7 @@ func Test_StorageClient(t *testing.T) { if !tt.checkJSON { val, err = got.Fields.GetValue("message") assert.NoError(t, err) - assert.True(t, tt.expected[val.(string)]) + assert.True(t, tt.expected[strings.ReplaceAll(val.(string), "\r\n", "\n")]) } else { val, err = got.Fields.GetValue("gcs.storage.object.json_data") fVal := fmt.Sprintf("%v", val)