Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[ETL-634] Implement Dispatch Lambda #114

Merged
merged 4 commits into from
May 28, 2024
Merged

[ETL-634] Implement Dispatch Lambda #114

merged 4 commits into from
May 28, 2024

Conversation

philerooski
Copy link
Contributor

@philerooski philerooski commented May 16, 2024

A lot of stacks were created/refactored as part of this PR, so here's a diagram to make things easier to understand:

ETL-634

Major changes include:

  • A new Dispatch Lambda which is responsible for dispatching each JSON in an export as a separate job. These jobs will eventually be handled by the (to be implemented) Dispatch to Raw Lambda where each JSON will be decompressed and uploaded to the Raw S3 bucket.
  • A refactor of the existing stacks responsible for propagating new exports to Glue. Now that we have two SQS queues subscribed to the SNS topic which is published to by our S3 input bucket, we need to name these things more precisely.

Some implementation patterns of note:

  • I split the AWS::SNS::TopicPolicy from the AWS::SNS::Topic in the SNS stacks. This is useful because (for example) the dispatch Lambda needs to reference the dispatch SNS topic, but likewise the AWS::SNS::TopicPolicy needs to reference the Lambda to allow it to publish messages to the dispatch SNS topic. Splitting this out (which is also part of the refactor for SNS Input) allows us to avoid the circular dependency.
  • I separated the business logic of the dispatch Lambda (main) from the entrypoint (lambda_handler). This primarily helps with testing of main. It's significantly more straightforward to use moto to pass mocked boto3 clients to main than it is to patch clients created within the main function.

Copy link
Contributor

@rxu17 rxu17 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

WOW, this was a lot of work! 🔥 Added my review

Copy link
Contributor

@rxu17 rxu17 May 17, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So i'm looking at the great expectations design here
image

So this new Lambda Dispatch is supposed to be the lambda in this section right here in the picture (the unzip and sort) correct?
Why are we adding another SNS topic between the lambda dispatch and the S3 raw data bucket? I don't see it in the original design

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No, that figure is out of date. I realized that I can't just have the bucket broadcast hey NEW export! via SNS and have the unzip work distributed across multiple lambdas. I needed another Lambda to coordinate all of that. So the SNS you see in the diagram above is actually the dispatch SNS.

Copy link
Contributor

@rxu17 rxu17 May 18, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So if I'm understanding this correctly, this part of the pipeline is now:
New JSON exports -> S3 input bucket -> SQS input to dispatch -> Lambda Dispatch (process and gather unzip work needed) -> SNS Dispatch -> Dispatch to Raw Lambda (one lambda for each export type?)

Yikes, this seems much more complex, do you know if it was one of the earlier pre-existing ETL components that might have caused this complexity / just limitations of AWS?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You're missing a few components:

New JSON exports -> S3 input bucket ->
*SNS input ->
SQS input to dispatch -> Lambda Dispatch (process and gather unzip work needed) -> SNS Dispatch ->
*SQS dispatch to raw ->
Dispatch to Raw Lambda

I know it seems like a lot, but the SNS -> SQS components are cookie-cutter simple. Now that I've refactored those in a scalable way as part of this PR it's just a matter of copy/pasting and updating the name/input/output in the stack definitions to create a new SNS -> SQS component.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

do you know if it was one of the earlier pre-existing ETL components that might have caused this complexity / just limitations of AWS?

Nothing like that. I wanted a Lambda that would decompress and upload an individual JSON file to S3, but I realized that I would need another Lambda to dispatch each of these jobs if I wanted to do things in a serverless style (i.e., with Lambdas rather than provisioning an underlying EC2 as we would with Glue, for example).

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it's just that AWS doesn't have the best visualization of our stacks, and we just have to manually check them to make sure they are feeding into the correct components and receiving from the correct components, now with so many new components

Copy link
Contributor

@rxu17 rxu17 May 17, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Grouping the stacks by pipeline type

Given the large amount of growing objects in our config folder and the complexity of our ETL pipeline increasing, I'm wondering if we should be separating out our regular pipeline stacks from the GX data quality stacks. Say something like develop/namespaced/data_quality_pipeline or develop/namespaced/gx_pipeline vs develop/namespaced/main_pipeline? This will make it much easier for us to distinguish what belongs where (separation of concerns).

Example:

/recover
├── develop/
│   ├── main-pipeline/
│   |   ├── namespaced/
│   │    |   ├── sns-input.yaml
│   ├── gx-pipeline/
│   |   ├── namespaced/
│   │    |   ├── sns-dispatch.yaml
├── prod/
│   ├── main-pipeline/
│   |   ├── namespaced/
│   │    |   ├── sns-input-to-sqs.yaml
...

If that is too much work, then i think we need to consider renaming the objects that are specific to the gx work vs objects part of the main pipeline.

For example setting prefixes like etl-main (e.g: etl-main-sns-input-to-sqs) or main-pipeline for main pipeline components and then dq (data quality) or gx-pipeline or gx for gx pipeline components (just throwing ideas here)

Renaming components to be more specific

I also feel like some of the component names itself are more generic / not specific enough.
If someone switched to RECOVER work after a few months off, it would be difficult to remember what is sns-input and what is sns-dispatch esp and where that fits where in the pipeline. This is similar to if in our code, we put a function name like def input()

Something like sns-input renamed to sns-input-to-sqs, sns-dispatch renamed to sns-lambda-dispatch-to-raw (just throwing ideas here)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm wondering if we should be separating out our regular pipeline stacks from the GX data quality stacks

I mentioned this a bit in my previous comment, but there's an opportunity here to consolidate GX stacks with regular stacks. This not only reduces the complexity of our pipeline but also ensures that every job is looking at the same set of data. So I think it's premature at this point to start organizing stacks in this way because we have some significant architecture changes potentially occurring in the near future.

Although, this idea of organizing the stacks more logically is a nice one. Perhaps we could use the buckets as "partition" dividers, since those components are unlikely to depend on one another? So we have one folder for input-intermediate stacks and another for intermediate-processed stacks. But this partition might also disappear if we can throw out the S3 to JSON components and start immediately working with the raw data as a Spark dataframe, as I mentioned in my other comment. More on this organization idea below -- it might make more sense to organize stacks at a more precise level.

If someone switched to RECOVER work after a few months off, it would be difficult to remember what is sns-input and what is sns-dispatch esp and where that fits where in the pipeline.

I also brought this example up in a previous comment, but for the SNS topics they are defined by what they broadcast, not who subscribes to them, so sns-input is more accurate than sns-input-to-sqs. Still, these names aren't great because there could potentially be other SNS stacks that broadcast inputs or dispatch jobs. Right now we only have one concept associated with "input" (the input S3 bucket) and "dispatch" (the SQS/Lambda/SNS flow implemented here) but we could make this more precise/future-proof. One way to do this, besides (or in addition to) renaming, is to organize stacks so that individual components (like our SQS/Lambda/SNS "dispatch" component, for example) are in their own directory: something like config/{account}/namespaced/dispatch/. The stacks in a component are unlikely to be separated from one another in the architecture, or deployed individually without the accompaniment of the other component's stacks.

Let's keep this as-is for now, but I opened a ticket for us to investigate how we could logically organize our stacks more effectively: https://sagebionetworks.jira.com/browse/ETL-649

Copy link
Contributor

@rxu17 rxu17 May 18, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

premature at this point to start organizing stacks in this way because we have some significant architecture changes potentially occurring in the near future.

If that is the case, should we prioritize looking into those potentially big architecture changes right now or is there something blocking us from doing that? It seems like it would be good too for troubleshooting GX if we reduce the complexity of the architecture sooner than later instead of adding more components (possible tech debt from the sounds of it) to it and making it more complex and difficult to troubleshoot (and then also having to remove and refactor it later).

Thanks for making the ticket!

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I wouldn't call these architecture changes, since we aren't changing anything about the stacks. We're just organizing the stacks under config/ in a way that makes deployment more functional. For example, suppose you were working on this PR, and you wanted to test changes related to the "dispatch" component. You could deploy config/develop (which takes a relatively long time), or explicitly name each stack you want to deploy (which is a lot of typing and remembering), or rely on your understanding of the dispatch component's stack dependencies and deploy a subset thereof (which is complicated). OR, if we organized our stacks in a smart way, you might be able to do something like:

sceptre deploy --var "namespace=bla" config/develop/namespaced/dispatch_component

And everything related to that component would be deployed for you. There's other ways we could organize our stacks rather than by component. That's what we want to think about as part of that ticket.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oops after rereading our comments I see what you mean now by architecture changes. The architecture change that I propose would entail hooking the S3 to json job up to the raw bucket, which wouldn't affect GX but it require some modifications to the flow which we currently call input to intermediate. Right now we have a functioning input to intermediate flow so I don't want to make those changes right now because they don't provide any additional functionality, they merely would reduce redundancy with what we're doing with GX.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The other architecture change I proposed was getting rid of the S3 to json job all together and loading the raw bucket's json data directly into a spark data frame and doing all our transformations in spark. That would also be independent of the work we're doing for GX but could help reduce redundancy with stacks we are implementing for GX and optimize existing functionality (again, without adding any new functionality).

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Gotcha! Then I would say I'm for organizing the stacks first since the above architectural changes are big enough (and they impact current functionality as we are reworking it a lot) that it would be some time before moving towards that, that we can go for organizing the stacks first. It will also make it easier on us to implement the bigger architectural changes later.

src/lambda_function/dispatch/README.md Show resolved Hide resolved
src/lambda_function/dispatch/app.py Show resolved Hide resolved
src/lambda_function/dispatch/app.py Show resolved Hide resolved
src/lambda_function/dispatch/app.py Show resolved Hide resolved
tests/test_lambda_dispatch.py Show resolved Hide resolved
Copy link

Quality Gate Passed Quality Gate passed

Issues
51 New issues
0 Accepted issues

Measures
1 Security Hotspot
No data about Coverage
3.8% Duplication on New Code

See analysis details on SonarCloud

Copy link
Contributor

@rxu17 rxu17 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM!

Copy link
Member

@thomasyu888 thomasyu888 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🔥 Nice work and review here!

@philerooski philerooski merged commit 2d4dab4 into main May 28, 2024
15 checks passed
@thomasyu888 thomasyu888 deleted the etl-634 branch October 30, 2024 01:02
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants