-
Notifications
You must be signed in to change notification settings - Fork 4.9k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[Filebeat] Add option for S3 input to work without SQS notification #27332
[Filebeat] Add option for S3 input to work without SQS notification #27332
Conversation
Pinging @elastic/integrations (Team:Integrations) |
💚 Build Succeeded
Expand to view the summary
Build stats
Test stats 🧪
Trends 🧪💚 Flaky test reportTests succeeded. Expand to view the summary
Test stats 🧪
|
|
Pinging @elastic/security-external-integrations (Team:Security-External Integrations) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Looks good. My comments are all minor things and a few questions.
For other reviewers - please look over states.go. I didn't spend much time on that.
x-pack/filebeat/input/awss3/acker.go
Outdated
@@ -28,8 +28,8 @@ func newEventACKTracker(ctx context.Context) *eventACKTracker { | |||
return &eventACKTracker{ctx: ctx, cancel: cancel} | |||
} | |||
|
|||
// Add increments the number of pending ACKs by the specified amount. | |||
func (a *eventACKTracker) Add(messageCount int64) { | |||
// Add increments the number of pending ACKs |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
// Add increments the number of pending ACKs | |
// Add increments the number of pending ACKs. |
QueueURL string `config:"queue_url"` | ||
S3Bucket string `config:"s3_bucket"` | ||
S3BucketPollInterval time.Duration `config:"s3_bucket_poll_interval"` | ||
S3BucketNumberOfWorkers int `config:"s3_bucket_number_of_workers"` |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm thinking about the config names. How about s3.bucket
and s3.bucket_list_interval
(I think "list" might be more descriptive than "poll")?
On the s3_bucket_number_of_workers
, thinking about the future, perhaps we should call this number_of_workers
. Then if we do make the changes you mentioned to SQS we could use this option for both.
x-pack/filebeat/input/awss3/state.go
Outdated
Bucket string `json:"bucket" struct:"bucket"` | ||
Key string `json:"key" struct:"key"` | ||
Etag string `json:"etag" struct:"etag"` | ||
LastModified time.Time `json:"last_modified" struct:"last_modifed"` |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LastModified time.Time `json:"last_modified" struct:"last_modifed"` | |
LastModified time.Time `json:"last_modified" struct:"last_modified"` |
|
||
previousState := s.FindPrevious(state) | ||
|
||
// status is forget. if there is no previous state and |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
// status is forget. if there is no previous state and | |
// status is forgotten. if there is no previous state and |
// No existing state found, add new one | ||
s.idx[id] = len(s.states) | ||
s.states = append(s.states, newState) | ||
logp.Debug("input", "New state added for %s", newState.Id) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This function is deprecated. Prefer creating a logger per instance, but if you must then use lopg.L().Debugf
.
return true, nil | ||
} | ||
|
||
// try to decode. Ingore faulty/incompatible values. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
// try to decode. Ingore faulty/incompatible values. | |
// try to decode. Ignore faulty/incompatible values. |
@andrewkroh
As soon there's an even number of Similarly to what I reported I the other PR: we should clarify that That's what we do for both input method so it's not a problem. |
The godocs should be clarified that the eventACKTracker cannot be reused to wait for multiple independent sets of ACKs. But we could also change the implementation to allow for reuse if this is what you need. Do you need that? |
I don't need it, just wanted to be sure I didn't overlook the limitation |
/test |
…ithout-SQS-notification-refactored
@andrewkroh I should have addressed all the comments, please check, thanks |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM
filebeat/docs/modules/aws.asciidoc
Outdated
|
||
Interval between list requests to the S3 bucket. Default to be 120 seconds. | ||
Wait interval between completion of a list request to the S3 bucket and beginning of the nest one. Default to be 120 seconds. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Wait interval between completion of a list request to the S3 bucket and beginning of the nest one. Default to be 120 seconds. | |
Wait interval between completion of a list request to the S3 bucket and beginning of the next one. Default to be 120 seconds. |
s3ObjectsListedTotal *monitoring.Uint // Number of S3 objects listed. | ||
s3ObjectsProcessedTotal *monitoring.Uint // Number of S3 objects processed. | ||
// s3ObjectsListedTotal is the number of S3 objects processed that were fully ACKed. | ||
s3ObjectsAckedTotal *monitoring.Uint // Number of S3 objects fully ACKed. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This one has two godocs.
s3ObjectsAckedTotal *monitoring.Uint // Number of S3 objects fully ACKed. | |
s3ObjectsAckedTotal *monitoring.Uint |
filebeat/docs/modules/aws.asciidoc
Outdated
@@ -44,6 +49,9 @@ Example config: | |||
cloudtrail: | |||
enabled: false | |||
#var.queue_url: https://sqs.myregion.amazonaws.com/123456/myqueue | |||
#var.bucket: 'arn:aws:s3:::mybucket |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
#var.bucket: 'arn:aws:s3:::mybucket | |
#var.bucket: 'arn:aws:s3:::mybucket' |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
and same for the other sections below.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Besides the missing quote, everything else looks great. Thanks!
/test |
What does this PR do?
Introduces an alternative polling method to ingest logs in S3 buckets through buckets listing instead of SQS notifications
Why is it important?
Some users are prevented to attach SQS notifications to S3 buckets and this enhancement will allow them to use Filebeat to ingest logs from the buckets anyway
Checklist
CHANGELOG.next.asciidoc
orCHANGELOG-developer.next.asciidoc
.Author's Checklist
How to test this PR locally
-d *
)Related issues
Closes #18205
Use cases
Screenshots
Logs