Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

aws-s3 input: Split S3 poller and SQS reader into explicit input objects #39353

Merged
merged 87 commits into from
May 9, 2024

Conversation

faec
Copy link
Contributor

@faec faec commented May 1, 2024

A large cleanup in the aws-s3 input, reorganizing the file structure and splitting internal APIs into additional helpers.

This change is meant to have no functional effect, it is strictly a cleanup and reorganization in preparation for future changes. The hope is that the new layout makes initialization steps and logical dependencies clearer. The main changes are:

  • Make s3Poller and sqsReader into standalone input objects, s3PollerInput and sqsReaderInput, that implement the v2.Input interface, instead of interleaving the two implementations within the same object.
    • Choose the appropriate input in (*s3InputManager).Create based on configuration
    • Move associated internal API out of the shared input.go into the new s3_input.go and sqs_input.go, while leaving s3.go and sqs.go for auxiliary helpers.
    • Give each input a copy of config and awsConfig, and remove redundant struct fields that simply shadowed fields already in those configs.
  • In sqsReaderInput, use a fixed set of worker goroutines and track task allocation via channel-based work requests instead of creating ephemeral workers via the previous custom semaphore implementation (similar to the recent cloudwatch cleanup).
    • Delete aws.Sem, since this was its last remaining caller
  • Collect the helpers related to approximate message count polling into a helper object, messageCountMonitor, so their role in the input is clearer.
  • Generally, break larger steps up into smaller helper functions
  • Generally, collect initialization dependencies in the same place so the sequencing is clearer.

Checklist

  • My code follows the style guidelines of this project
  • I have commented my code, particularly in hard-to-understand areas
  • I have made corresponding changes to the documentation
  • I have made corresponding change to the default configuration files
  • I have added tests that prove my fix is effective or that my feature works
  • I have added an entry in CHANGELOG.next.asciidoc or CHANGELOG-developer.next.asciidoc.

@pierrehilbert pierrehilbert added the Team:Elastic-Agent-Data-Plane Label for the Agent Data Plane team label May 3, 2024
@elasticmachine
Copy link
Collaborator

Pinging @elastic/elastic-agent-data-plane (Team:Elastic-Agent-Data-Plane)

Copy link
Contributor

@leehinman leehinman left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Code changes look fine to me.

Can you add to the description what tests were done to verify that changes keep the same behavior?

I know there are multiple rounds of changes anticipated, so this might be too early for this. But at the end I'd like to see us test a few scenarios:

  1. SQS message every 5 sec that results in 250 events per SQS message
  2. SQS message as quickly as possible that results in 1 event per SQS message
  3. SQS message every 5 minutes that results in 64k events per SQS message
  4. S3 bucket with object added every 5 sec that results in 250 events per object added
  5. S3 bucket where we add objects as quickly as possible that result in one event per object
  6. S3 bucket were we add object every 5 minutes that results in 64k events per object
  7. S3 bucket with 64k objects and append data that results in one event to a randomly selected object every second

@faec
Copy link
Contributor Author

faec commented May 3, 2024

@leehinman I was expecting to write more tests ("all existing tests pass" isn't as reassuring as I want it to be). Some scenarios you mentioned might be out of scope for this pass -- one thing I didn't change at all is the S3/SQS API proxies, so things like "number of events per S3 object" or "appending an event to an object" are opaque to this change, it's all handled internally in sqsProcessor and s3ObjectHandler. So for confirming behavior is unchanged, we can restrict scope to three interface points:

  • Calls to S3/SQS API proxies
  • Calls to create a Beats pipeline client, or publish an event through one
  • Calls to awss3.inputMetrics, the helper object used to update the input's monitoring variables.

There are existing tests for at least the first two using auto-generated mocks, and I've updated them for the new code. I'm not satisfied with them, though -- they're the sort of mocks where you list the exact order and inputs of each mocked call even if it isn't part of the spec, so this PR broke them just by removing redundant calls to deterministic functions and changing who's responsible for starting a helper goroutine.

One thing I was considering was rewriting / expanding those tests with more granular boundaries -- e.g. instead of checking that the full input calls NextPage the right number of times, then calls GetObject the right number of times, then calls Publish the right number of times, just confirm that objects given to the reader loop are sent to the work channel, and separately, that objects sent to the work channel are published by the work loop.

Having said all that: does that plan sound good? Are there any areas of this PR where you'd especially like to see expanded test coverage?

@leehinman
Copy link
Contributor

@leehinman I was expecting to write more tests ("all existing tests pass" isn't as reassuring as I want it to be).

:-) yeah.

Some scenarios you mentioned might be out of scope for this pass

Absolutely out of scope. I wrote them down to start thinking/talking about them. Probably should have done 2 separate comments.

-- one thing I didn't change at all is the S3/SQS API proxies, so things like "number of events per S3 object" or "appending an event to an object" are opaque to this change, it's all handled internally in sqsProcessor and s3ObjectHandler. So for confirming behavior is unchanged, we can restrict scope to three interface points:

* Calls to S3/SQS API proxies

* Calls to create a Beats pipeline client, or publish an event through one

* Calls to `awss3.inputMetrics`, the helper object used to update the input's monitoring variables.

There are existing tests for at least the first two using auto-generated mocks, and I've updated them for the new code. I'm not satisfied with them, though -- they're the sort of mocks where you list the exact order and inputs of each mocked call even if it isn't part of the spec, so this PR broke them just by removing redundant calls to deterministic functions and changing who's responsible for starting a helper goroutine.

One thing I was considering was rewriting / expanding those tests with more granular boundaries -- e.g. instead of checking that the full input calls NextPage the right number of times, then calls GetObject the right number of times, then calls Publish the right number of times, just confirm that objects given to the reader loop are sent to the work channel, and separately, that objects sent to the work channel are published by the work loop.

Having said all that: does that plan sound good? Are there any areas of this PR where you'd especially like to see expanded test coverage?

Any additional automated tests would be great.

For this PR a manual test that either mode still works as expected would be fine.

@zmoog
Copy link
Contributor

zmoog commented May 8, 2024

I am running some manual tests ingesting CloudTrail logs using the SQS mode.

I'm using a very simple setup, here are the differences with the default modules.d/aws.yml file:

  - module: aws
    cloudtrail:
      #    enabled: false
      enabled: true

      # AWS SQS queue url
      var.queue_url: https://sqs.eu-north-1.amazonaws.com/<redacted>/mbranca-cloudtrail-logs

      var.access_key_id: <redacted>
      var.secret_access_key: <redacted>

Filebeat is running and ingesting CloudTrail logs. Here are a couple of screenshot of the SQS queue and the logs on Discover:

CleanShot 2024-05-08 at 22 43 02

CleanShot 2024-05-08 at 22 51 52

@zmoog
Copy link
Contributor

zmoog commented May 8, 2024

I am also running manual tests ingesting CloudTrail logs from the same S3 bucket using the S3 polling.

Here's the config:

  - module: aws
    cloudtrail:
      enabled: true

      # AWS S3 bucket arn
      var.bucket_arn: 'arn:aws:s3:::<redacted>'

      # Number of workers on S3 bucket
      var.number_of_workers: 5

The S3 polling mode panics because the input log is nil; however, I was able to successfully run the input with this small change:

diff --git a/x-pack/filebeat/input/awss3/s3_input.go b/x-pack/filebeat/input/awss3/s3_input.go
index 50786626d2..999b27da53 100644
--- a/x-pack/filebeat/input/awss3/s3_input.go
+++ b/x-pack/filebeat/input/awss3/s3_input.go
@@ -68,11 +68,11 @@ func (in *s3PollerInput) Run(
        inputContext v2.Context,
        pipeline beat.Pipeline,
 ) error {
-       log := inputContext.Logger.Named("s3")
+       in.log = inputContext.Logger.Named("s3")
        var err error
 
        // Load the persistent S3 polling state.
-       in.states, err = newStates(log, in.store)
+       in.states, err = newStates(in.log, in.store)
        if err != nil {
                return fmt.Errorf("can not start persistent store: %w", err)
        }
@@ -95,7 +95,7 @@ func (in *s3PollerInput) Run(
        defer in.metrics.Close()
 
        in.s3ObjectHandler = newS3ObjectProcessorFactory(
-               log,
+               in.log,
                in.metrics,
                in.s3,
                in.config.getFileSelectors(),

The log ingestion is making progress:

CleanShot 2024-05-08 at 23 19 16

@zmoog
Copy link
Contributor

zmoog commented May 8, 2024

A recent change in the main branch created an inconsistency in the versions of the AWS SDK core and services modules. In practice, all the AWS inputs I tested failed with a "not found, ResolveEndpointV2" error.

A PR is being reviewed to address this problem.

Copy link
Contributor

@zmoog zmoog left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I like the new design. It breaks down the current complexity into smaller chunks that are easier to understand and follow how they fit together.

We should address the nil log in the S3 polling mode.

Comment on lines +230 to +248
func (c config) getBucketName() string {
if c.NonAWSBucketName != "" {
return c.NonAWSBucketName
}
if c.BucketARN != "" {
return getBucketNameFromARN(c.BucketARN)
}
return ""
}

func (c config) getBucketARN() string {
if c.NonAWSBucketName != "" {
return c.NonAWSBucketName
}
if c.BucketARN != "" {
return c.BucketARN
}
return ""
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nice idea.

Comment on lines +62 to +63
if config.QueueURL != "" {
return newSQSReaderInput(config, awsConfig), nil
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The new input.go is pretty neat: only having the s3InputManager makes the input more manageable.

Comment on lines 250 to 273
// A callback to apply the configuration's settings to an S3 options struct.
// Should be provided to s3.NewFromConfig.
func (c config) s3ConfigModifier(o *s3.Options) {
if c.NonAWSBucketName != "" {
o.EndpointResolver = nonAWSBucketResolver{endpoint: c.AWSConfig.Endpoint}
}

if c.AWSConfig.FIPSEnabled {
o.EndpointOptions.UseFIPSEndpoint = awssdk.FIPSEndpointStateEnabled
}
o.UsePathStyle = c.PathStyle

o.Retryer = retry.NewStandard(func(so *retry.StandardOptions) {
so.MaxAttempts = 5
// Recover quickly when requests start working again
so.NoRetryIncrement = 100
})
}

func (c config) sqsConfigModifier(o *sqs.Options) {
if c.AWSConfig.FIPSEnabled {
o.EndpointOptions.UseFIPSEndpoint = awssdk.FIPSEndpointStateEnabled
}
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

uber nit: maybe we can highlight a little more these are AWS SDK callbacks. I see we mention "s3.NewFromConfig", but it's at the end of the sentence.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done

Copy link
Contributor

mergify bot commented May 8, 2024

This pull request is now in conflicts. Could you fix it? 🙏
To fixup this pull request, you can check out it locally. See documentation: https://help.github.com/articles/checking-out-pull-requests-locally/

git fetch upstream
git checkout -b awss3-cleanup upstream/awss3-cleanup
git merge upstream/main
git push upstream awss3-cleanup

@faec
Copy link
Contributor Author

faec commented May 9, 2024

Fixed the uninitialized logger, added comments, and synced with main. Doing my own manual testing now.

Copy link
Contributor

@leehinman leehinman left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Assuming manual testing passes. LGTM.

Copy link
Contributor

@zmoog zmoog left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM

@faec faec merged commit cc35cce into elastic:main May 9, 2024
97 of 100 checks passed
@faec faec deleted the awss3-cleanup branch May 9, 2024 20:03
v1v added a commit to v1v/beats that referenced this pull request May 15, 2024
…-actions

* upstream/main: (313 commits)
  github-action: delete opentelemetry workflow (elastic#39559)
  updatecli: move to the .github folder and support for signed commits (elastic#39472)
  Osquerybeat: Add action responses data stream (elastic#39143)
  [winlogbeat] performance improvment; avoid rendering event message twice (elastic#39544)
  Fix the AWS SDK dependencies issue causing the "not found, ResolveEndpointV2" error (elastic#39454)
  x-pack/filebeat/input/cel: add http metrics collection (elastic#39503)
  build(deps): bump github.com/elastic/elastic-agent-libs from 0.9.4 to 0.9.7 (elastic#39424)
  Remove unused env vars from pipelines (elastic#39534)
  [BK] - Remove osx steps from branch execution (elastic#39552)
  [BK] - Remove certain steps from running for Branches (elastic#39533)
  Allow dependabot report BK status checks (elastic#39540)
  Remove hardcoded module definitions in CI (elastic#39506)
  Explicitly set DOCKER_PULL, RACE_DETECTOR and TEST_COVERAGE for pipelines (elastic#39510)
  Fixed pipelines formatting (elastic#39513)
  Update filebeat pipeline to match Jenkins steps (elastic#39261)
  Add error check to groupToEvents so we don't blindly add error values (elastic#39404)
  Remove fields not needed for session view in add_session_view processor (elastic#39500)
  `aws-s3` input: Split S3 poller and SQS reader into explicit input objects (elastic#39353)
  ci(jenkins): remove post-build notifications (elastic#39483)
  [DOCS] Add the `read_pipeline` cluster privilege for winlogbeat and the `auto_configure` index privilege to beats documentation (elastic#38534)
  ...
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
cleanup Team:Elastic-Agent Label for the Agent team Team:Elastic-Agent-Data-Plane Label for the Agent Data Plane team
Projects
None yet
Development

Successfully merging this pull request may close these issues.

5 participants