Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

x-pack/filebeat/input/awss3: support for Access Point ARN #41495

Merged
merged 11 commits into from
Nov 19, 2024
3 changes: 3 additions & 0 deletions CHANGELOG.next.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -169,6 +169,9 @@ https://github.com/elastic/beats/compare/v8.8.1\...main[Check the HEAD diff]
- Journald input now can read events from all boots {issue}41083[41083] {pull}41244[41244]
- Fix double encoding of client_secret in the Entity Analytics input's Azure Active Directory provider {pull}41393[41393]
- Fix aws region in aws-s3 input s3 polling mode. {pull}41572[41572]
- Fix errors in SQS host resolution in the `aws-s3` input when using custom (non-AWS) endpoints. {pull}41504[41504]
- The azure-eventhub input now correctly reports its status to the Elastic Agent on fatal errors {pull}41469[41469]
- Add support for Access Points in the `aws-s3` input. {pull}41495[41495]
- Fix the "No such input type exist: 'salesforce'" error on the Windows/AIX platform. {pull}41664[41664]

*Heartbeat*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -102,6 +102,9 @@
# Bucket ARN used for polling AWS S3 buckets
#bucket_arn: arn:aws:s3:::test-s3-bucket

# Access Point ARN used for polling AWS S3 buckets
#access_point_arn: arn:aws:s3:us-east-1:123456789:accesspoint/my-accesspoint

# Bucket Name used for polling non-AWS S3 buckets
#non_aws_bucket_name: test-s3-bucket

Expand Down
11 changes: 8 additions & 3 deletions x-pack/filebeat/docs/inputs/input-aws-s3.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -348,7 +348,7 @@ configuring multiline options.
[float]
==== `queue_url`

URL of the AWS SQS queue that messages will be received from. (Required when `bucket_arn` and `non_aws_bucket_name` are not set).
URL of the AWS SQS queue that messages will be received from. (Required when `bucket_arn`, `access_point_arn`, and `non_aws_bucket_name` are not set).

[float]
==== `region`
Expand Down Expand Up @@ -472,7 +472,12 @@ value is `20s`.
[float]
==== `bucket_arn`

ARN of the AWS S3 bucket that will be polled for list operation. (Required when `queue_url` and `non_aws_bucket_name` are not set).
ARN of the AWS S3 bucket that will be polled for list operation. (Required when `queue_url`, `access_point_arn, and `non_aws_bucket_name` are not set).

[float]
==== `access_point_arn`

ARN of the AWS S3 Access Point that will be polled for list operation. (Required when `queue_url`, `bucket_arn`, and `non_aws_bucket_name` are not set).

[float]
==== `non_aws_bucket_name`
Expand All @@ -492,7 +497,7 @@ Prefix to apply for the list request to the S3 bucket. Default empty.
[float]
==== `number_of_workers`

Number of workers that will process the S3 or SQS objects listed. Required when `bucket_arn` is set, otherwise (in the SQS case) defaults to 5.
Number of workers that will process the S3 or SQS objects listed. Required when `bucket_arn` or `access_point_arn` is set, otherwise (in the SQS case) defaults to 5.


[float]
Expand Down
3 changes: 3 additions & 0 deletions x-pack/filebeat/filebeat.reference.yml
Original file line number Diff line number Diff line change
Expand Up @@ -2990,6 +2990,9 @@ filebeat.inputs:
# Bucket ARN used for polling AWS S3 buckets
#bucket_arn: arn:aws:s3:::test-s3-bucket

# Access Point ARN used for polling AWS S3 buckets
#access_point_arn: arn:aws:s3:us-east-1:123456789:accesspoint/my-accesspoint

# Bucket Name used for polling non-AWS S3 buckets
#non_aws_bucket_name: test-s3-bucket

Expand Down
37 changes: 29 additions & 8 deletions x-pack/filebeat/input/awss3/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
"errors"
"fmt"
"net/url"
"strings"
"time"

awssdk "github.com/aws/aws-sdk-go-v2/aws"
Expand All @@ -33,6 +34,7 @@
QueueURL string `config:"queue_url"`
RegionName string `config:"region"`
BucketARN string `config:"bucket_arn"`
AccessPointARN string `config:"access_point_arn"`
NonAWSBucketName string `config:"non_aws_bucket_name"`
BucketListInterval time.Duration `config:"bucket_list_interval"`
BucketListPrefix string `config:"bucket_list_prefix"`
Expand Down Expand Up @@ -61,28 +63,32 @@
}

func (c *config) Validate() error {
configs := []bool{c.QueueURL != "", c.BucketARN != "", c.NonAWSBucketName != ""}
configs := []bool{c.QueueURL != "", c.BucketARN != "", c.AccessPointARN != "", c.NonAWSBucketName != ""}
enabled := []bool{}
for i := range configs {
if configs[i] {
enabled = append(enabled, configs[i])
}
}
if len(enabled) == 0 {
return errors.New("neither queue_url, bucket_arn nor non_aws_bucket_name were provided")
return errors.New("neither queue_url, bucket_arn, access_point_arn, nor non_aws_bucket_name were provided")
} else if len(enabled) > 1 {
return fmt.Errorf("queue_url <%v>, bucket_arn <%v>, non_aws_bucket_name <%v> "+
"cannot be set at the same time", c.QueueURL, c.BucketARN, c.NonAWSBucketName)
return fmt.Errorf("queue_url <%v>, bucket_arn <%v>, access_point_arn <%v>, non_aws_bucket_name <%v> "+
"cannot be set at the same time", c.QueueURL, c.BucketARN, c.AccessPointARN, c.NonAWSBucketName)
}

if (c.BucketARN != "" || c.NonAWSBucketName != "") && c.BucketListInterval <= 0 {
if (c.BucketARN != "" || c.AccessPointARN != "" || c.NonAWSBucketName != "") && c.BucketListInterval <= 0 {
return fmt.Errorf("bucket_list_interval <%v> must be greater than 0", c.BucketListInterval)
}

if (c.BucketARN != "" || c.NonAWSBucketName != "") && c.NumberOfWorkers <= 0 {
if (c.BucketARN != "" || c.AccessPointARN != "" || c.NonAWSBucketName != "") && c.NumberOfWorkers <= 0 {
return fmt.Errorf("number_of_workers <%v> must be greater than 0", c.NumberOfWorkers)
}

if c.AccessPointARN != "" && !isValidAccessPointARN(c.AccessPointARN) {
return fmt.Errorf("invalid format for access_point_arn <%v>", c.AccessPointARN)
}

if c.QueueURL != "" && (c.VisibilityTimeout <= 0 || c.VisibilityTimeout.Hours() > 12) {
return fmt.Errorf("visibility_timeout <%v> must be greater than 0 and "+
"less than or equal to 12h", c.VisibilityTimeout)
Expand Down Expand Up @@ -117,14 +123,15 @@
if c.BackupConfig.NonAWSBackupToBucketName != "" && c.NonAWSBucketName == "" {
return errors.New("backup to non-AWS bucket can only be used for non-AWS sources")
}
if c.BackupConfig.BackupToBucketArn != "" && c.BucketARN == "" {
if c.BackupConfig.BackupToBucketArn != "" && c.BucketARN == "" && c.AccessPointARN == "" {
return errors.New("backup to AWS bucket can only be used for AWS sources")
}
if c.BackupConfig.BackupToBucketArn != "" && c.BackupConfig.NonAWSBackupToBucketName != "" {
return errors.New("backup_to_bucket_arn and non_aws_backup_to_bucket_name cannot be used together")
}
if c.BackupConfig.GetBucketName() != "" && c.QueueURL == "" {
if (c.BackupConfig.BackupToBucketArn != "" && c.BackupConfig.BackupToBucketArn == c.BucketARN) ||
if (c.BackupConfig.BackupToBucketArn != "" &&
(c.BackupConfig.BackupToBucketArn == c.BucketARN || c.BackupConfig.BackupToBucketArn == c.AccessPointARN)) ||
(c.BackupConfig.NonAWSBackupToBucketName != "" && c.BackupConfig.NonAWSBackupToBucketName == c.NonAWSBucketName) {
if c.BackupConfig.BackupToBucketPrefix == "" {
return errors.New("backup_to_bucket_prefix is a required property when source and backup bucket are the same")
Expand Down Expand Up @@ -233,6 +240,9 @@
if c.NonAWSBucketName != "" {
return c.NonAWSBucketName
}
if c.AccessPointARN != "" {
return c.AccessPointARN
}
if c.BucketARN != "" {
return getBucketNameFromARN(c.BucketARN)
}
Expand All @@ -246,6 +256,9 @@
if c.BucketARN != "" {
return c.BucketARN
}
if c.AccessPointARN != "" {
return c.AccessPointARN
}
return ""
}

Expand Down Expand Up @@ -282,7 +295,7 @@
o.EndpointOptions.UseFIPSEndpoint = awssdk.FIPSEndpointStateEnabled
}
if c.AWSConfig.Endpoint != "" {
o.EndpointResolver = sqs.EndpointResolverFromURL(c.AWSConfig.Endpoint)

Check failure on line 298 in x-pack/filebeat/input/awss3/config.go

View workflow job for this annotation

GitHub Actions / lint (linux)

SA1019: o.EndpointResolver is deprecated: Deprecated: EndpointResolver and WithEndpointResolver. Providing a value for this field will likely prevent you from using any endpoint-related service features released after the introduction of EndpointResolverV2 and BaseEndpoint. (staticcheck)
}
}

Expand All @@ -292,3 +305,11 @@
}
return []fileSelectorConfig{{ReaderConfig: c.ReaderConfig}}
}

// Helper function to detect if an ARN is an Access Point
func isValidAccessPointARN(arn string) bool {
kaiyan-sheng marked this conversation as resolved.
Show resolved Hide resolved
parts := strings.Split(arn, ":")
return len(parts) >= 6 &&
strings.HasPrefix(parts[5], "accesspoint/") &&
len(strings.TrimPrefix(parts[5], "accesspoint/")) > 0
}
Loading
Loading