Skip to content

Commit

Permalink
aws-s3 input default content-type
Browse files Browse the repository at this point in the history
- new option `content_type`
- can be set at input or file selector level
- overrides Content-Type that was given to the S3 object when it was
  uploaded.

Closes #25697
  • Loading branch information
leehinman committed May 18, 2021
1 parent 5233fb0 commit 87f4f26
Show file tree
Hide file tree
Showing 5 changed files with 40 additions and 12 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.next.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -857,6 +857,7 @@ https://github.com/elastic/beats/compare/v7.0.0-alpha2...master[Check the HEAD d
- Add new grok pattern for iptables module for Ubiquiti UDM {issue}25615[25615] {pull}25616[25616]
- Add multiline support to aws-s3 input. {issue}25249[25249] {pull}25710[25710]
- Add monitoring metrics to the `aws-s3` input. {pull}25711[25711]
- Add Content-Type override to aws-s3 input. {issue}25697[25697] {pull}25772[25772]

*Heartbeat*

Expand Down
33 changes: 21 additions & 12 deletions x-pack/filebeat/docs/inputs/input-aws-s3.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,14 @@ seconds. The maximum is half of the visibility timeout value.
The size in bytes of the buffer that each harvester uses when fetching a file.
This only applies to non-JSON logs. The default is `16 KiB`.

[id="input-{type}-content_type"]
[float]
==== `content_type`

A standard MIME type describing the format of the object data. This
can be set to override the MIME type that was given to the object when
it was uploaded. example: `application/json`

[id="input-{type}-encoding"]
[float]
==== `encoding`
Expand Down Expand Up @@ -94,18 +102,19 @@ Content type will not be checked. If a file has "application/json" content-type,
[float]
==== `file_selectors`

If the SQS queue will have events that correspond to files that {beatname_uc}
shouldn't process `file_selectors` can be used to limit the files that are
downloaded. This is a list of selectors which are made up of `regex` and
`expand_event_list_from_field` options. The `regex` should match the S3 object
key in the SQS message, and the optional `expand_event_list_from_field` is the
same as the global setting. If `file_selectors` is given, then any global
`expand_event_list_from_field` value is ignored in favor of the ones specified
in the `file_selectors`. Regex syntax is the same as the Go language. Files
that don't match one of the regexes won't be processed.
<<input-aws-s3-multiline>>, <<input-aws-s3-max_bytes>>,
<<input-aws-s3-buffer_size>>, and <<input-aws-s3-encoding>> may also be set for
each file selector.
If the SQS queue will have events that correspond to files that
{beatname_uc} shouldn't process `file_selectors` can be used to limit
the files that are downloaded. This is a list of selectors which are
made up of `regex` and `expand_event_list_from_field` options. The
`regex` should match the S3 object key in the SQS message, and the
optional `expand_event_list_from_field` is the same as the global
setting. If `file_selectors` is given, then any global
`expand_event_list_from_field` value is ignored in favor of the ones
specified in the `file_selectors`. Regex syntax is the same as the Go
language. Files that don't match one of the regexes won't be
processed. <<input-aws-s3-content_type>>, <<input-aws-s3-multiline>>,
<<input-aws-s3-max_bytes>>, <<input-aws-s3-buffer_size>>, and
<<input-aws-s3-encoding>> may also be set for each file selector.

["source", "yml"]
----
Expand Down
4 changes: 4 additions & 0 deletions x-pack/filebeat/input/awss3/collector.go
Original file line number Diff line number Diff line change
Expand Up @@ -401,6 +401,10 @@ func (c *s3Collector) createEventsFromS3Info(svc s3iface.ClientAPI, info s3Info,
bodyReader = bufio.NewReader(gzipReader)
}

if info.readerConfig.ContentType != "" {
*resp.ContentType = info.readerConfig.ContentType
}

// Decode JSON documents when content-type is "application/json" or expand_event_list_from_field is given in config
if resp.ContentType != nil && *resp.ContentType == "application/json" || info.ExpandEventListFromField != "" {
decoder := json.NewDecoder(bodyReader)
Expand Down
4 changes: 4 additions & 0 deletions x-pack/filebeat/input/awss3/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,7 @@ type readerConfig struct {
Multiline *multiline.Config `config:"multiline"`
LineTerminator readfile.LineTerminator `config:"line_terminator"`
Encoding string `config:"encoding"`
ContentType string `config:"content_type"`
}

func (f *readerConfig) Validate() error {
Expand All @@ -82,6 +83,9 @@ func (f *readerConfig) Validate() error {
if f.MaxBytes <= 0 {
return fmt.Errorf("max_bytes <%v> must be greater than 0", f.MaxBytes)
}
if f.ExpandEventListFromField != "" && f.ContentType != "" && f.ContentType != "application/json" {
return fmt.Errorf("content_type must be `application/json` to be used with expand_event_list_from_field")
}

return nil
}
Expand Down
10 changes: 10 additions & 0 deletions x-pack/filebeat/input/awss3/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -144,6 +144,16 @@ func TestConfig(t *testing.T) {
"max_bytes <0> must be greater than 0",
nil,
},
{
"error on expand_event_list_from_field and content_type != application/json ",
common.MapStr{
"queue_url": queueURL,
"expand_event_list_from_field": "Records",
"content_type": "text/plain",
},
"content_type must be `application/json` to be used with expand_event_list_from_field",
nil,
},
}

for _, tc := range testCases {
Expand Down

0 comments on commit 87f4f26

Please sign in to comment.