From 74bfc789f1acc48fe5126bbbfa9f274d557e5d71 Mon Sep 17 00:00:00 2001 From: stefans-elastic Date: Tue, 17 Dec 2024 12:35:46 +0200 Subject: [PATCH 01/12] [x-pack][filebeat] incorporate LastModified into s3 event _id generation --- CHANGELOG.next.asciidoc | 1 + x-pack/filebeat/input/awss3/s3_input.go | 1 + x-pack/filebeat/input/awss3/s3_objects.go | 1 + x-pack/filebeat/input/awss3/sqs_s3_event.go | 3 ++- 4 files changed, 5 insertions(+), 1 deletion(-) diff --git a/CHANGELOG.next.asciidoc b/CHANGELOG.next.asciidoc index 50442ba2f5c..b6be6524efc 100644 --- a/CHANGELOG.next.asciidoc +++ b/CHANGELOG.next.asciidoc @@ -371,6 +371,7 @@ https://github.com/elastic/beats/compare/v8.8.1\...main[Check the HEAD diff] - The environment variable `BEATS_AZURE_EVENTHUB_INPUT_TRACING_ENABLED: true` enables internal logs tracer for the azure-eventhub input. {issue}41931[41931] {pull}41932[41932] - Rate limiting operability improvements in the Okta provider of the Entity Analytics input. {issue}40106[40106] {pull}41977[41977] - Added default values in the streaming input for websocket retries and put a cap on retry wait time to be lesser than equal to the maximum defined wait time. {pull}42012[42012] +- The `_id` generation process for S3 events has been updated to incorporate the LastModified field. This enhancement ensures that the `_id` is unique. {pull}0[0] *Auditbeat* diff --git a/x-pack/filebeat/input/awss3/s3_input.go b/x-pack/filebeat/input/awss3/s3_input.go index 88f28e39e83..c2d3b8446cc 100644 --- a/x-pack/filebeat/input/awss3/s3_input.go +++ b/x-pack/filebeat/input/awss3/s3_input.go @@ -256,5 +256,6 @@ func (in *s3PollerInput) s3EventForState(state state) s3EventV2 { event.S3.Bucket.Name = state.Bucket event.S3.Bucket.ARN = in.config.getBucketARN() event.S3.Object.Key = state.Key + event.S3.Object.LastModified = state.LastModified return event } diff --git a/x-pack/filebeat/input/awss3/s3_objects.go b/x-pack/filebeat/input/awss3/s3_objects.go index acd4d173439..3bd1c23a120 100644 --- a/x-pack/filebeat/input/awss3/s3_objects.go +++ b/x-pack/filebeat/input/awss3/s3_objects.go @@ -486,6 +486,7 @@ func s3ObjectHash(obj s3EventV2) string { h := sha256.New() h.Write([]byte(obj.S3.Bucket.ARN)) h.Write([]byte(obj.S3.Object.Key)) + h.Write([]byte(obj.S3.Object.LastModified.String())) prefix := hex.EncodeToString(h.Sum(nil)) return prefix[:10] } diff --git a/x-pack/filebeat/input/awss3/sqs_s3_event.go b/x-pack/filebeat/input/awss3/sqs_s3_event.go index 884cf7adbbc..cb39376f7b4 100644 --- a/x-pack/filebeat/input/awss3/sqs_s3_event.go +++ b/x-pack/filebeat/input/awss3/sqs_s3_event.go @@ -77,7 +77,8 @@ type s3EventV2 struct { ARN string `json:"arn"` } `json:"bucket"` Object struct { - Key string `json:"key"` + Key string `json:"key"` + LastModified time.Time `json:"lastModified"` } `json:"object"` } `json:"s3"` } From 68de656403cea5b26e1fbf1434c3bd85d4d7c295 Mon Sep 17 00:00:00 2001 From: stefans-elastic Date: Tue, 17 Dec 2024 12:39:36 +0200 Subject: [PATCH 02/12] update PR id in changelog --- CHANGELOG.next.asciidoc | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/CHANGELOG.next.asciidoc b/CHANGELOG.next.asciidoc index b6be6524efc..e2f5db22eea 100644 --- a/CHANGELOG.next.asciidoc +++ b/CHANGELOG.next.asciidoc @@ -371,7 +371,7 @@ https://github.com/elastic/beats/compare/v8.8.1\...main[Check the HEAD diff] - The environment variable `BEATS_AZURE_EVENTHUB_INPUT_TRACING_ENABLED: true` enables internal logs tracer for the azure-eventhub input. {issue}41931[41931] {pull}41932[41932] - Rate limiting operability improvements in the Okta provider of the Entity Analytics input. {issue}40106[40106] {pull}41977[41977] - Added default values in the streaming input for websocket retries and put a cap on retry wait time to be lesser than equal to the maximum defined wait time. {pull}42012[42012] -- The `_id` generation process for S3 events has been updated to incorporate the LastModified field. This enhancement ensures that the `_id` is unique. {pull}0[0] +- The `_id` generation process for S3 events has been updated to incorporate the LastModified field. This enhancement ensures that the `_id` is unique. {pull}42078[42078] *Auditbeat* From 30bc1f4a482246a4dba78f74b9174400074171e1 Mon Sep 17 00:00:00 2001 From: stefans-elastic Date: Tue, 17 Dec 2024 13:56:59 +0200 Subject: [PATCH 03/12] log LastModified value --- x-pack/filebeat/input/awss3/s3_objects.go | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/x-pack/filebeat/input/awss3/s3_objects.go b/x-pack/filebeat/input/awss3/s3_objects.go index 3bd1c23a120..01f2f236d56 100644 --- a/x-pack/filebeat/input/awss3/s3_objects.go +++ b/x-pack/filebeat/input/awss3/s3_objects.go @@ -117,7 +117,8 @@ func (p *s3ObjectProcessor) ProcessS3Object(log *logp.Logger, eventCallback func p.eventCallback = eventCallback log = log.With( "bucket_arn", p.s3Obj.S3.Bucket.Name, - "object_key", p.s3Obj.S3.Object.Key) + "object_key", p.s3Obj.S3.Object.Key, + "last_modified", p.s3Obj.S3.Object.LastModified) // Metrics and Logging log.Debug("Begin S3 object processing.") From 3041ff46fbd4412a47d223cae30245310b8bdc6f Mon Sep 17 00:00:00 2001 From: stefans-elastic Date: Tue, 17 Dec 2024 14:31:41 +0200 Subject: [PATCH 04/12] add unit test --- .../filebeat/input/awss3/s3_objects_test.go | 28 +++++++++++++++++++ 1 file changed, 28 insertions(+) diff --git a/x-pack/filebeat/input/awss3/s3_objects_test.go b/x-pack/filebeat/input/awss3/s3_objects_test.go index e6178c33a87..84b31d676b6 100644 --- a/x-pack/filebeat/input/awss3/s3_objects_test.go +++ b/x-pack/filebeat/input/awss3/s3_objects_test.go @@ -13,6 +13,7 @@ import ( "path/filepath" "strings" "testing" + "time" awssdk "github.com/aws/aws-sdk-go-v2/aws" "github.com/aws/aws-sdk-go-v2/service/s3" @@ -435,6 +436,33 @@ func TestNewMockS3Pager(t *testing.T) { assert.Equal(t, []string{"foo", "bar", "baz"}, keys) } +// Test_s3ObjectHash verifies that s3ObjectHash function produces correct value +func Test_s3ObjectHash(t *testing.T) { + lastModified, _ := time.Parse("2006-01-02 15:04:05 -0700", "2022-10-07 12:44:22 +0530") + objHash := s3ObjectHash(s3EventV2{ + S3: struct { + Bucket struct { + Name string `json:"name"` + ARN string `json:"arn"` + } `json:"bucket"` + Object struct { + Key string `json:"key"` + LastModified time.Time `json:"lastModified"` + } `json:"object"` + }{ + Bucket: struct { + Name string `json:"name"` + ARN string `json:"arn"` + }{Name: "test", ARN: "arn:aws:s3:::test"}, + Object: struct { + Key string `json:"key"` + LastModified time.Time `json:"lastModified"` + }{Key: "test-obj", LastModified: lastModified}, + }, + }) + assert.Equal(t, "fe8a230c26", objHash) +} + // newMockS3Pager returns a s3Pager that paginates the given s3Objects based on // the specified page size. It never returns an error. func newMockS3Pager(ctrl *gomock.Controller, pageSize int, s3Objects []types.Object) *MockS3Pager { From 32a69669319331cc5e65306499f1c48bc1c8abb3 Mon Sep 17 00:00:00 2001 From: Stefan Stas Date: Tue, 17 Dec 2024 15:10:32 +0000 Subject: [PATCH 05/12] fix unit tests on windows --- x-pack/filebeat/input/awss3/s3_objects_test.go | 5 +++++ x-pack/filebeat/input/azureblobstorage/input_test.go | 3 ++- x-pack/filebeat/input/gcs/input_test.go | 3 ++- 3 files changed, 9 insertions(+), 2 deletions(-) diff --git a/x-pack/filebeat/input/awss3/s3_objects_test.go b/x-pack/filebeat/input/awss3/s3_objects_test.go index 84b31d676b6..74b77d21cb9 100644 --- a/x-pack/filebeat/input/awss3/s3_objects_test.go +++ b/x-pack/filebeat/input/awss3/s3_objects_test.go @@ -11,6 +11,7 @@ import ( "io" "os" "path/filepath" + "runtime" "strings" "testing" "time" @@ -319,6 +320,10 @@ func TestProcessObjectMetricCollection(t *testing.T) { }, } + if runtime.GOOS == "windows" { + tests[0].objectSize = 20 + } + for _, test := range tests { t.Run(test.name, func(t *testing.T) { // given diff --git a/x-pack/filebeat/input/azureblobstorage/input_test.go b/x-pack/filebeat/input/azureblobstorage/input_test.go index d4b004980b8..62e23fa94a3 100644 --- a/x-pack/filebeat/input/azureblobstorage/input_test.go +++ b/x-pack/filebeat/input/azureblobstorage/input_test.go @@ -540,7 +540,8 @@ func Test_StorageClient(t *testing.T) { var err error val, err = got.Fields.GetValue("message") assert.NoError(t, err) - assert.True(t, tt.expected[val.(string)]) + + assert.True(t, tt.expected[strings.ReplaceAll(val.(string), "\r", "")]) assert.Equal(t, tt.expectedError, err) receivedCount += 1 if receivedCount == len(tt.expected) { diff --git a/x-pack/filebeat/input/gcs/input_test.go b/x-pack/filebeat/input/gcs/input_test.go index 8ff0576bdf5..f2b307a2a01 100644 --- a/x-pack/filebeat/input/gcs/input_test.go +++ b/x-pack/filebeat/input/gcs/input_test.go @@ -13,6 +13,7 @@ import ( "fmt" "net/http" "net/http/httptest" + "strings" "testing" "time" @@ -662,7 +663,7 @@ func Test_StorageClient(t *testing.T) { if !tt.checkJSON { val, err = got.Fields.GetValue("message") assert.NoError(t, err) - assert.True(t, tt.expected[val.(string)]) + assert.True(t, tt.expected[strings.ReplaceAll(val.(string), "\r", "")]) } else { val, err = got.Fields.GetValue("gcs.storage.object.json_data") fVal := fmt.Sprintf("%v", val) From 94c1054c4faffc5b2465064414fd0734463abea2 Mon Sep 17 00:00:00 2001 From: stefans-elastic Date: Tue, 17 Dec 2024 17:44:50 +0200 Subject: [PATCH 06/12] use UnixMilli instead of time string --- x-pack/filebeat/input/awss3/s3_objects.go | 2 +- x-pack/filebeat/input/awss3/s3_objects_test.go | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/x-pack/filebeat/input/awss3/s3_objects.go b/x-pack/filebeat/input/awss3/s3_objects.go index 01f2f236d56..5310fb6f241 100644 --- a/x-pack/filebeat/input/awss3/s3_objects.go +++ b/x-pack/filebeat/input/awss3/s3_objects.go @@ -487,7 +487,7 @@ func s3ObjectHash(obj s3EventV2) string { h := sha256.New() h.Write([]byte(obj.S3.Bucket.ARN)) h.Write([]byte(obj.S3.Object.Key)) - h.Write([]byte(obj.S3.Object.LastModified.String())) + h.Write([]byte(fmt.Sprintf("%d", obj.S3.Object.LastModified.UnixMilli()))) prefix := hex.EncodeToString(h.Sum(nil)) return prefix[:10] } diff --git a/x-pack/filebeat/input/awss3/s3_objects_test.go b/x-pack/filebeat/input/awss3/s3_objects_test.go index 74b77d21cb9..be6075ec50a 100644 --- a/x-pack/filebeat/input/awss3/s3_objects_test.go +++ b/x-pack/filebeat/input/awss3/s3_objects_test.go @@ -465,7 +465,7 @@ func Test_s3ObjectHash(t *testing.T) { }{Key: "test-obj", LastModified: lastModified}, }, }) - assert.Equal(t, "fe8a230c26", objHash) + assert.Equal(t, "414903f3f1", objHash) } // newMockS3Pager returns a s3Pager that paginates the given s3Objects based on From 64af26c346d1678c2e903732f2a6c5cb391607f9 Mon Sep 17 00:00:00 2001 From: stefans-elastic Date: Tue, 17 Dec 2024 18:05:21 +0200 Subject: [PATCH 07/12] change s3 event _id format --- x-pack/filebeat/input/awss3/s3_objects.go | 7 ++--- .../filebeat/input/awss3/s3_objects_test.go | 28 ------------------- 2 files changed, 3 insertions(+), 32 deletions(-) diff --git a/x-pack/filebeat/input/awss3/s3_objects.go b/x-pack/filebeat/input/awss3/s3_objects.go index 5310fb6f241..19d29109fda 100644 --- a/x-pack/filebeat/input/awss3/s3_objects.go +++ b/x-pack/filebeat/input/awss3/s3_objects.go @@ -449,7 +449,7 @@ func (p *s3ObjectProcessor) createEvent(message string, offset int64) beat.Event }, }, } - event.SetID(objectID(p.s3ObjHash, offset)) + event.SetID(objectID(p.s3Obj.S3.Object.LastModified, p.s3ObjHash, offset)) if len(p.s3Metadata) > 0 { _, _ = event.Fields.Put("aws.s3.metadata", p.s3Metadata) @@ -478,8 +478,8 @@ func (p *s3ObjectProcessor) FinalizeS3Object() error { return nil } -func objectID(objectHash string, offset int64) string { - return fmt.Sprintf("%s-%012d", objectHash, offset) +func objectID(lastModified time.Time, objectHash string, offset int64) string { + return fmt.Sprintf("%d-%s-%012d", lastModified.UnixNano(), objectHash, offset) } // s3ObjectHash returns a short sha256 hash of the bucket arn + object key name. @@ -487,7 +487,6 @@ func s3ObjectHash(obj s3EventV2) string { h := sha256.New() h.Write([]byte(obj.S3.Bucket.ARN)) h.Write([]byte(obj.S3.Object.Key)) - h.Write([]byte(fmt.Sprintf("%d", obj.S3.Object.LastModified.UnixMilli()))) prefix := hex.EncodeToString(h.Sum(nil)) return prefix[:10] } diff --git a/x-pack/filebeat/input/awss3/s3_objects_test.go b/x-pack/filebeat/input/awss3/s3_objects_test.go index be6075ec50a..bf5cf4c3cea 100644 --- a/x-pack/filebeat/input/awss3/s3_objects_test.go +++ b/x-pack/filebeat/input/awss3/s3_objects_test.go @@ -14,7 +14,6 @@ import ( "runtime" "strings" "testing" - "time" awssdk "github.com/aws/aws-sdk-go-v2/aws" "github.com/aws/aws-sdk-go-v2/service/s3" @@ -441,33 +440,6 @@ func TestNewMockS3Pager(t *testing.T) { assert.Equal(t, []string{"foo", "bar", "baz"}, keys) } -// Test_s3ObjectHash verifies that s3ObjectHash function produces correct value -func Test_s3ObjectHash(t *testing.T) { - lastModified, _ := time.Parse("2006-01-02 15:04:05 -0700", "2022-10-07 12:44:22 +0530") - objHash := s3ObjectHash(s3EventV2{ - S3: struct { - Bucket struct { - Name string `json:"name"` - ARN string `json:"arn"` - } `json:"bucket"` - Object struct { - Key string `json:"key"` - LastModified time.Time `json:"lastModified"` - } `json:"object"` - }{ - Bucket: struct { - Name string `json:"name"` - ARN string `json:"arn"` - }{Name: "test", ARN: "arn:aws:s3:::test"}, - Object: struct { - Key string `json:"key"` - LastModified time.Time `json:"lastModified"` - }{Key: "test-obj", LastModified: lastModified}, - }, - }) - assert.Equal(t, "414903f3f1", objHash) -} - // newMockS3Pager returns a s3Pager that paginates the given s3Objects based on // the specified page size. It never returns an error. func newMockS3Pager(ctrl *gomock.Controller, pageSize int, s3Objects []types.Object) *MockS3Pager { From c642bc35fc05d1d43152d22a42cac05ea60f654d Mon Sep 17 00:00:00 2001 From: stefans-elastic Date: Wed, 18 Dec 2024 11:37:57 +0200 Subject: [PATCH 08/12] fix changelog --- CHANGELOG.next.asciidoc | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/CHANGELOG.next.asciidoc b/CHANGELOG.next.asciidoc index 64480fea1d9..86518fdf758 100644 --- a/CHANGELOG.next.asciidoc +++ b/CHANGELOG.next.asciidoc @@ -197,6 +197,7 @@ https://github.com/elastic/beats/compare/v8.8.1\...main[Check the HEAD diff] - Further rate limiting fix in the Okta provider of the Entity Analytics input. {issue}40106[40106] {pull}41977[41977] - Fix streaming input handling of invalid or empty websocket messages. {pull}42036[42036] - Fix awss3 document ID construction when using the CSV decoder. {pull}42019[42019] +- The `_id` generation process for S3 events has been updated to incorporate the LastModified field. This enhancement ensures that the `_id` is unique. {pull}42078[42078] *Heartbeat* @@ -373,7 +374,6 @@ https://github.com/elastic/beats/compare/v8.8.1\...main[Check the HEAD diff] - The environment variable `BEATS_AZURE_EVENTHUB_INPUT_TRACING_ENABLED: true` enables internal logs tracer for the azure-eventhub input. {issue}41931[41931] {pull}41932[41932] - Rate limiting operability improvements in the Okta provider of the Entity Analytics input. {issue}40106[40106] {pull}41977[41977] - Added default values in the streaming input for websocket retries and put a cap on retry wait time to be lesser than equal to the maximum defined wait time. {pull}42012[42012] -- The `_id` generation process for S3 events has been updated to incorporate the LastModified field. This enhancement ensures that the `_id` is unique. {pull}42078[42078] *Auditbeat* From 47d432a51c48154c6b628908d0bd6d70b2090b10 Mon Sep 17 00:00:00 2001 From: stefans-elastic Date: Wed, 18 Dec 2024 11:39:08 +0200 Subject: [PATCH 09/12] Update x-pack/filebeat/input/gcs/input_test.go Co-authored-by: Dan Kortschak --- x-pack/filebeat/input/gcs/input_test.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/x-pack/filebeat/input/gcs/input_test.go b/x-pack/filebeat/input/gcs/input_test.go index f2b307a2a01..f8f4f006e87 100644 --- a/x-pack/filebeat/input/gcs/input_test.go +++ b/x-pack/filebeat/input/gcs/input_test.go @@ -663,7 +663,7 @@ func Test_StorageClient(t *testing.T) { if !tt.checkJSON { val, err = got.Fields.GetValue("message") assert.NoError(t, err) - assert.True(t, tt.expected[strings.ReplaceAll(val.(string), "\r", "")]) + assert.True(t, tt.expected[strings.ReplaceAll(val.(string), "\r\n", "\n")]) } else { val, err = got.Fields.GetValue("gcs.storage.object.json_data") fVal := fmt.Sprintf("%v", val) From b01f431af8bb0517bccdf5bd61cdde95b000407c Mon Sep 17 00:00:00 2001 From: stefans-elastic Date: Wed, 18 Dec 2024 11:39:21 +0200 Subject: [PATCH 10/12] Update x-pack/filebeat/input/azureblobstorage/input_test.go Co-authored-by: Dan Kortschak --- x-pack/filebeat/input/azureblobstorage/input_test.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/x-pack/filebeat/input/azureblobstorage/input_test.go b/x-pack/filebeat/input/azureblobstorage/input_test.go index 62e23fa94a3..0e5441dcd43 100644 --- a/x-pack/filebeat/input/azureblobstorage/input_test.go +++ b/x-pack/filebeat/input/azureblobstorage/input_test.go @@ -541,7 +541,7 @@ func Test_StorageClient(t *testing.T) { val, err = got.Fields.GetValue("message") assert.NoError(t, err) - assert.True(t, tt.expected[strings.ReplaceAll(val.(string), "\r", "")]) + assert.True(t, tt.expected[strings.ReplaceAll(val.(string), "\r\n", "\n")]) assert.Equal(t, tt.expectedError, err) receivedCount += 1 if receivedCount == len(tt.expected) { From d623d34b81be86f75d4cc3a8fe446220a10f0607 Mon Sep 17 00:00:00 2001 From: stefans-elastic Date: Thu, 19 Dec 2024 12:26:50 +0200 Subject: [PATCH 11/12] revert windows test fix --- x-pack/filebeat/input/awss3/s3_objects_test.go | 5 ----- 1 file changed, 5 deletions(-) diff --git a/x-pack/filebeat/input/awss3/s3_objects_test.go b/x-pack/filebeat/input/awss3/s3_objects_test.go index bf5cf4c3cea..e6178c33a87 100644 --- a/x-pack/filebeat/input/awss3/s3_objects_test.go +++ b/x-pack/filebeat/input/awss3/s3_objects_test.go @@ -11,7 +11,6 @@ import ( "io" "os" "path/filepath" - "runtime" "strings" "testing" @@ -319,10 +318,6 @@ func TestProcessObjectMetricCollection(t *testing.T) { }, } - if runtime.GOOS == "windows" { - tests[0].objectSize = 20 - } - for _, test := range tests { t.Run(test.name, func(t *testing.T) { // given From 32c0dd45c5dcd0412310cd89d6ce5f4d0cd58dae Mon Sep 17 00:00:00 2001 From: stefans-elastic Date: Thu, 19 Dec 2024 16:07:51 +0200 Subject: [PATCH 12/12] add unit test --- x-pack/filebeat/input/awss3/s3_objects_test.go | 8 ++++++++ 1 file changed, 8 insertions(+) diff --git a/x-pack/filebeat/input/awss3/s3_objects_test.go b/x-pack/filebeat/input/awss3/s3_objects_test.go index e6178c33a87..fe2ffd84e24 100644 --- a/x-pack/filebeat/input/awss3/s3_objects_test.go +++ b/x-pack/filebeat/input/awss3/s3_objects_test.go @@ -13,6 +13,7 @@ import ( "path/filepath" "strings" "testing" + "time" awssdk "github.com/aws/aws-sdk-go-v2/aws" "github.com/aws/aws-sdk-go-v2/service/s3" @@ -435,6 +436,13 @@ func TestNewMockS3Pager(t *testing.T) { assert.Equal(t, []string{"foo", "bar", "baz"}, keys) } +func Test_objectID(t *testing.T) { + lastModified, _ := time.Parse("2006-01-02 15:04:05 -0700", "2024-11-07 12:44:22 +0000") + objId := objectID(lastModified, "fe8a230c26", 42) + + assert.Equal(t, "1730983462000000000-fe8a230c26-000000000042", objId) +} + // newMockS3Pager returns a s3Pager that paginates the given s3Objects based on // the specified page size. It never returns an error. func newMockS3Pager(ctrl *gomock.Controller, pageSize int, s3Objects []types.Object) *MockS3Pager {