Skip to content

Commit

Permalink
Set log.offset to the start of the reported line in filestream (#30445
Browse files Browse the repository at this point in the history
)
  • Loading branch information
kvch authored Feb 17, 2022
1 parent 18f1e2b commit 8aca673
Show file tree
Hide file tree
Showing 3 changed files with 7 additions and 3 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.next.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@ https://github.com/elastic/beats/compare/v7.0.0-alpha2...main[Check the HEAD dif
- Fix using log_group_name_prefix in aws-cloudwatch input. {pull}29695[29695]
- Fix multiple instances of the same module configured within `filebeat.modules` in filebeat.yml. {issue}29649[29649] {pull}29952[29952]
- aws-s3: fix race condition in states used by s3-poller. {issue}30123[30123] {pull}30131[30131]
- Report the starting offset of the line in `log.offset` when using `filestream` instead of the end to be ECS compliant. {pull}30445[30445]

*Filebeat*
- Fix broken Kafka input {issue}29746[29746] {pull}30277[30277]
Expand Down
6 changes: 4 additions & 2 deletions libbeat/reader/readfile/metafields.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,10 +41,9 @@ func NewFilemeta(r reader.Reader, path string, offset int64) reader.Reader {
func (r *FileMetaReader) Next() (reader.Message, error) {
message, err := r.reader.Next()

r.offset += int64(message.Bytes)

// if the message is empty, there is no need to enrich it with file metadata
if message.IsEmpty() {
r.offset += int64(message.Bytes)
return message, err
}

Expand All @@ -56,6 +55,9 @@ func (r *FileMetaReader) Next() (reader.Message, error) {
},
},
})

r.offset += int64(message.Bytes)

return message, err
}

Expand Down
3 changes: 2 additions & 1 deletion libbeat/reader/readfile/metafields_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,6 @@ func TestMetaFieldsOffset(t *testing.T) {
if err == io.EOF {
break
}
offset += int64(msg.Bytes)

expectedFields := common.MapStr{}
if len(msg.Content) != 0 {
Expand All @@ -67,6 +66,8 @@ func TestMetaFieldsOffset(t *testing.T) {
},
}
}
offset += int64(msg.Bytes)

require.Equal(t, expectedFields, msg.Fields)
require.Equal(t, offset, in.offset)
}
Expand Down

0 comments on commit 8aca673

Please sign in to comment.