Skip to content

Commit

Permalink
[7.x](backport #28252) [Filebeat] - S3 Input - Add support for only i…
Browse files Browse the repository at this point in the history
…terating/accessing only… (#28369)

* [Filebeat] - S3 Input - Add support for only iterating/accessing only specific folders or datapaths

(cherry picked from commit 764045e)


Co-authored-by: Andrea Spacca <andrea.spacca@elastic.co>
  • Loading branch information
mergify[bot] and Andrea Spacca authored Oct 14, 2021
1 parent e485a88 commit 5a78bac
Show file tree
Hide file tree
Showing 28 changed files with 195 additions and 30 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.next.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -193,6 +193,7 @@ https://github.com/elastic/beats/compare/v7.0.0-alpha2...master[Check the HEAD d
- Drop aws.vpcflow.pkt_srcaddr and aws.vpcflow.pkt_dstaddr when equal to "-". {pull}22721[22721] {issue}22716[22716]
- Improve Cisco ASA/FTD parsing of messages - better support for identity FW messages. Change network.bytes, source.bytes, and destination.bytes to long from integer since value can exceed integer capacity. Add descriptions for various processors for easier pipeline editing in Kibana UI. {pull}23766[23766]
- Update indentation for azure filebeat configuration. {pull}26604[26604]
- Add support for passing a prefix on S3 bucket list mode for AWS-S3 input {pull}28252[28252] {issue}27965[27965]
- Tolerate faults when Windows Event Log session is interrupted {issue}27947[27947] {pull}28191[28191]
- Add support for username in cisco asa security negotiation logs {pull}26975[26975]
- Relax time parsing and capture group and session type in Cisco ASA module {issue}24710[24710] {pull}28325[28325]
Expand Down
10 changes: 10 additions & 0 deletions filebeat/docs/modules/aws.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@ Example config:
enabled: false
#var.queue_url: https://sqs.myregion.amazonaws.com/123456/myqueue
#var.bucket_arn: 'arn:aws:s3:::mybucket'
#var.bucket_list_prefix: 'prefix'
#var.bucket_list_interval: 300s
#var.number_of_workers: 5
#var.shared_credential_file: /etc/filebeat/aws_credentials
Expand All @@ -67,6 +68,7 @@ Example config:
enabled: false
#var.queue_url: https://sqs.myregion.amazonaws.com/123456/myqueue
#var.bucket_arn: 'arn:aws:s3:::mybucket'
#var.bucket_list_prefix: 'prefix'
#var.bucket_list_interval: 300s
#var.number_of_workers: 5
#var.shared_credential_file: /etc/filebeat/aws_credentials
Expand All @@ -84,6 +86,7 @@ Example config:
enabled: false
#var.queue_url: https://sqs.myregion.amazonaws.com/123456/myqueue
#var.bucket_arn: 'arn:aws:s3:::mybucket'
#var.bucket_list_prefix: 'prefix'
#var.bucket_list_interval: 300s
#var.number_of_workers: 5
#var.shared_credential_file: /etc/filebeat/aws_credentials
Expand All @@ -101,6 +104,7 @@ Example config:
enabled: false
#var.queue_url: https://sqs.myregion.amazonaws.com/123456/myqueue
#var.bucket_arn: 'arn:aws:s3:::mybucket'
#var.bucket_list_prefix: 'prefix'
#var.bucket_list_interval: 300s
#var.number_of_workers: 5
#var.shared_credential_file: /etc/filebeat/aws_credentials
Expand All @@ -118,6 +122,7 @@ Example config:
enabled: false
#var.queue_url: https://sqs.myregion.amazonaws.com/123456/myqueue
#var.bucket_arn: 'arn:aws:s3:::mybucket'
#var.bucket_list_prefix: 'prefix'
#var.bucket_list_interval: 300s
#var.number_of_workers: 5
#var.shared_credential_file: /etc/filebeat/aws_credentials
Expand All @@ -135,6 +140,7 @@ Example config:
enabled: false
#var.queue_url: https://sqs.myregion.amazonaws.com/123456/myqueue
#var.bucket_arn: 'arn:aws:s3:::mybucket'
#var.bucket_list_prefix: 'prefix'
#var.bucket_list_interval: 300s
#var.number_of_workers: 5
#var.shared_credential_file: /etc/filebeat/aws_credentials
Expand Down Expand Up @@ -178,6 +184,10 @@ Use to vertically scale the input.

Wait interval between completion of a list request to the S3 bucket and beginning of the next one. Default to be 120 seconds.

*`var.bucket_list_prefix`*::

Prefix to apply for the list request to the S3 bucket. Default empty.

*`var.endpoint`*::

Custom endpoint used to access AWS APIs.
Expand Down
18 changes: 18 additions & 0 deletions x-pack/filebeat/filebeat.reference.yml
Original file line number Diff line number Diff line change
Expand Up @@ -105,6 +105,9 @@ filebeat.modules:
# AWS S3 bucket arn
#var.bucket_arn: 'arn:aws:s3:::mybucket'

# AWS S3 list prefix
#var.bucket_list_prefix: 'prefix'

# Bucket list interval on S3 bucket
#var.bucket_list_interval: 300s

Expand Down Expand Up @@ -166,6 +169,9 @@ filebeat.modules:
# AWS S3 bucket arn
#var.bucket_arn: 'arn:aws:s3:::mybucket'

# AWS S3 list prefix
#var.bucket_list_prefix: 'prefix'

# Bucket list interval on S3 bucket
#var.bucket_list_interval: 300s

Expand Down Expand Up @@ -215,6 +221,9 @@ filebeat.modules:
# AWS S3 bucket arn
#var.bucket_arn: 'arn:aws:s3:::mybucket'

# AWS S3 list prefix
#var.bucket_list_prefix: 'prefix'

# Bucket list interval on S3 bucket
#var.bucket_list_interval: 300s

Expand Down Expand Up @@ -264,6 +273,9 @@ filebeat.modules:
# AWS S3 bucket arn
#var.bucket_arn: 'arn:aws:s3:::mybucket'

# AWS S3 list prefix
#var.bucket_list_prefix: 'prefix'

# Bucket list interval on S3 bucket
#var.bucket_list_interval: 300s

Expand Down Expand Up @@ -313,6 +325,9 @@ filebeat.modules:
# AWS S3 bucket arn
#var.bucket_arn: 'arn:aws:s3:::mybucket'

# AWS S3 list prefix
#var.bucket_list_prefix: 'prefix'

# Bucket list interval on S3 bucket
#var.bucket_list_interval: 300s

Expand Down Expand Up @@ -362,6 +377,9 @@ filebeat.modules:
# AWS S3 bucket arn
#var.bucket_arn: 'arn:aws:s3:::mybucket'

# AWS S3 list prefix
#var.bucket_list_prefix: 'prefix'

# Bucket list interval on S3 bucket
#var.bucket_list_interval: 300s

Expand Down
2 changes: 2 additions & 0 deletions x-pack/filebeat/input/awss3/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ type config struct {
QueueURL string `config:"queue_url"`
BucketARN string `config:"bucket_arn"`
BucketListInterval time.Duration `config:"bucket_list_interval"`
BucketListPrefix string `config:"bucket_list_prefix"`
NumberOfWorkers int `config:"number_of_workers"`
AWSConfig awscommon.ConfigAWS `config:",inline"`
FileSelectors []fileSelectorConfig `config:"file_selectors"`
Expand All @@ -40,6 +41,7 @@ func defaultConfig() config {
APITimeout: 120 * time.Second,
VisibilityTimeout: 300 * time.Second,
BucketListInterval: 120 * time.Second,
BucketListPrefix: "",
SQSWaitTime: 20 * time.Second,
SQSMaxReceiveCount: 5,
FIPSEnabled: false,
Expand Down
16 changes: 9 additions & 7 deletions x-pack/filebeat/input/awss3/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,13 +28,15 @@ func TestConfig(t *testing.T) {
parserConf := parser.Config{}
require.NoError(t, parserConf.Unpack(common.MustNewConfigFrom("")))
return config{
QueueURL: quequeURL,
BucketARN: s3Bucket,
APITimeout: 120 * time.Second,
VisibilityTimeout: 300 * time.Second,
SQSMaxReceiveCount: 5,
SQSWaitTime: 20 * time.Second,
BucketListInterval: 120 * time.Second,
QueueURL: quequeURL,
BucketARN: s3Bucket,
APITimeout: 120 * time.Second,
VisibilityTimeout: 300 * time.Second,
SQSMaxReceiveCount: 5,
SQSWaitTime: 20 * time.Second,
BucketListInterval: 120 * time.Second,
BucketListPrefix: "",

FIPSEnabled: false,
MaxNumberOfMessages: 5,
ReaderConfig: readerConfig{
Expand Down
2 changes: 2 additions & 0 deletions x-pack/filebeat/input/awss3/input.go
Original file line number Diff line number Diff line change
Expand Up @@ -215,6 +215,7 @@ func (in *s3Input) createS3Lister(ctx v2.Context, cancelCtx context.Context, cli
log := ctx.Logger.With("bucket_arn", in.config.BucketARN)
log.Infof("number_of_workers is set to %v.", in.config.NumberOfWorkers)
log.Infof("bucket_list_interval is set to %v.", in.config.BucketListInterval)
log.Infof("bucket_list_prefix is set to %v.", in.config.BucketListPrefix)
log.Infof("AWS region is set to %v.", in.awsConfig.Region)
log.Debugf("AWS S3 service name is %v.", s3ServiceName)

Expand All @@ -233,6 +234,7 @@ func (in *s3Input) createS3Lister(ctx v2.Context, cancelCtx context.Context, cli
states,
persistentStore,
in.config.BucketARN,
in.config.BucketListPrefix,
in.awsConfig.Region,
in.config.NumberOfWorkers,
in.config.BucketListInterval)
Expand Down
4 changes: 2 additions & 2 deletions x-pack/filebeat/input/awss3/input_benchmark_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -134,7 +134,7 @@ func (c constantS3) GetObject(ctx context.Context, bucket, key string) (*s3.GetO
return newS3GetObjectResponse(c.filename, c.data, c.contentType), nil
}

func (c constantS3) ListObjectsPaginator(bucket string) s3Pager {
func (c constantS3) ListObjectsPaginator(bucket, prefix string) s3Pager {
return c.pagerConstant
}

Expand Down Expand Up @@ -277,7 +277,7 @@ func benchmarkInputS3(t *testing.T, numberOfWorkers int) testing.BenchmarkResult
}

s3EventHandlerFactory := newS3ObjectProcessorFactory(log.Named("s3"), metrics, s3API, client, conf.FileSelectors)
s3Poller := newS3Poller(logp.NewLogger(inputName), metrics, s3API, s3EventHandlerFactory, newStates(inputCtx), store, "bucket", "region", numberOfWorkers, time.Second)
s3Poller := newS3Poller(logp.NewLogger(inputName), metrics, s3API, s3EventHandlerFactory, newStates(inputCtx), store, "bucket", "key-", "region", numberOfWorkers, time.Second)

ctx, cancel := context.WithCancel(context.Background())
b.Cleanup(cancel)
Expand Down
49 changes: 49 additions & 0 deletions x-pack/filebeat/input/awss3/input_integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -384,3 +384,52 @@ func TestGetRegionForBucketARN(t *testing.T) {
regionName, err := getRegionForBucketARN(context.Background(), s3Client, tfConfig.BucketName)
assert.Equal(t, tfConfig.AWSRegion, regionName)
}

func TestPaginatorListPrefix(t *testing.T) {
logp.TestingSetup()

// Terraform is used to setup S3 and must be executed manually.
tfConfig := getTerraformOutputs(t)

uploadS3TestFiles(t, tfConfig.AWSRegion, tfConfig.BucketName,
"testdata/events-array.json",
"testdata/invalid.json",
"testdata/log.json",
"testdata/log.ndjson",
"testdata/multiline.json",
"testdata/multiline.json.gz",
"testdata/multiline.txt",
"testdata/log.txt", // Skipped (no match).
)

awsConfig, err := external.LoadDefaultAWSConfig()
awsConfig.Region = tfConfig.AWSRegion
if err != nil {
t.Fatal(err)
}

s3Client := s3.New(awscommon.EnrichAWSConfigWithEndpoint("", "s3", "", awsConfig))

s3API := &awsS3API{
client: s3Client,
}

var objects []string
paginator := s3API.ListObjectsPaginator(tfConfig.BucketName, "log")
for paginator.Next(context.Background()) {
page := paginator.CurrentPage()
for _, object := range page.Contents {
objects = append(objects, *object.Key)
}
}

assert.NoError(t, paginator.Err())

expected := []string{
"log.json",
"log.ndjson",
"log.txt",
}

assert.Equal(t, expected, objects)
}
5 changes: 3 additions & 2 deletions x-pack/filebeat/input/awss3/interfaces.go
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,7 @@ type s3Getter interface {
}

type s3Lister interface {
ListObjectsPaginator(bucket string) s3Pager
ListObjectsPaginator(bucket, prefix string) s3Pager
}

type s3Pager interface {
Expand Down Expand Up @@ -204,9 +204,10 @@ func (a *awsS3API) GetObject(ctx context.Context, bucket, key string) (*s3.GetOb
return resp, nil
}

func (a *awsS3API) ListObjectsPaginator(bucket string) s3Pager {
func (a *awsS3API) ListObjectsPaginator(bucket, prefix string) s3Pager {
req := a.client.ListObjectsRequest(&s3.ListObjectsInput{
Bucket: awssdk.String(bucket),
Prefix: awssdk.String(prefix),
})

pager := s3.NewListObjectsPaginator(req)
Expand Down
16 changes: 8 additions & 8 deletions x-pack/filebeat/input/awss3/mock_interfaces_test.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

5 changes: 4 additions & 1 deletion x-pack/filebeat/input/awss3/s3.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ type s3ObjectPayload struct {
type s3Poller struct {
numberOfWorkers int
bucket string
listPrefix string
region string
bucketPollInterval time.Duration
workerSem *sem
Expand All @@ -61,6 +62,7 @@ func newS3Poller(log *logp.Logger,
states *states,
store *statestore.Store,
bucket string,
listPrefix string,
awsRegion string,
numberOfWorkers int,
bucketPollInterval time.Duration) *s3Poller {
Expand All @@ -70,6 +72,7 @@ func newS3Poller(log *logp.Logger,
return &s3Poller{
numberOfWorkers: numberOfWorkers,
bucket: bucket,
listPrefix: listPrefix,
region: awsRegion,
bucketPollInterval: bucketPollInterval,
workerSem: newSem(numberOfWorkers),
Expand Down Expand Up @@ -142,7 +145,7 @@ func (p *s3Poller) GetS3Objects(ctx context.Context, s3ObjectPayloadChan chan<-
bucketMetadata := strings.Split(p.bucket, ":")
bucketName := bucketMetadata[len(bucketMetadata)-1]

paginator := p.s3.ListObjectsPaginator(bucketName)
paginator := p.s3.ListObjectsPaginator(bucketName, p.listPrefix)
for paginator.Next(ctx) {
listingID, err := uuid.NewV4()
if err != nil {
Expand Down
4 changes: 2 additions & 2 deletions x-pack/filebeat/input/awss3/s3_objects_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -233,11 +233,11 @@ func TestNewMockS3Pager(t *testing.T) {
defer ctrl.Finish()
mockS3Pager := newMockS3Pager(ctrl, 1, fakeObjects)
mockS3API := NewMockS3API(ctrl)
mockS3API.EXPECT().ListObjectsPaginator(gomock.Any()).Return(mockS3Pager)
mockS3API.EXPECT().ListObjectsPaginator(gomock.Any(), "").Return(mockS3Pager)

// Test the mock.
var keys []string
pager := mockS3API.ListObjectsPaginator("nombre")
pager := mockS3API.ListObjectsPaginator("nombre", "")
for pager.Next(ctx) {
for _, s3Obj := range pager.CurrentPage().Contents {
keys = append(keys, *s3Obj.Key)
Expand Down
Loading

0 comments on commit 5a78bac

Please sign in to comment.