Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[ETL-636] Raw Lambda #138

Merged
merged 5 commits into from
Sep 6, 2024
Merged

[ETL-636] Raw Lambda #138

merged 5 commits into from
Sep 6, 2024

Conversation

philerooski
Copy link
Contributor

Adds resources:

  • Lambda: The raw Lambda, which polls the dispatch-to-raw SQS queue and writes compressed JSON to the raw S3 bucket.
  • IAM Role: An IAM role for the above Lambda.

Raw Lambda compresses JSON data contained in an export (zip archive) from the input S3 bucket and uploads it to the raw S3 bucket. It makes heavy use of Python file objects and multipart uploads and can download/compress/upload with a relatively low, fixed memory overhead with respect to the size of the uncompressed JSON. For example, during testing I was able to process a 450 MB (uncompressed) JSON file with a max memory used of 345 MB.

@philerooski philerooski requested a review from a team as a code owner August 30, 2024 00:59
Copy link

@@ -3,7 +3,7 @@ template:
parameters:
MessageRetentionPeriod: "1209600"
ReceiveMessageWaitTimeSeconds: "20"
VisibilityTimeout: "120"
VisibilityTimeout: "900"
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you speak to the increase here?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We don't know yet how long it could take for this Lambda to run on prod-sized data. Especially given the relatively small memory I've allotted. So I've set the Lambda timeout to it's maximum (900). Since the Lambda could take up to 900 seconds to complete, we also need the associated SQS event to show up in the queue again even if the Lambda fails at 899 seconds, hence we set the visibility timeout on the SQS queue to the timeout of the Lambda. AWS actually enforces this server side, so the deployment will fail if we use a smaller value.

- The data type is derived from the first underscore-delimited component of the file basename.
"""
key_components = key.split("/")
# input bucket keys are formatted like `{namespace}/{cohort}/{export_basename}`
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just checking, but is it always formatted like this?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@thomasyu888 It could change in the future, but that would also mean changing every other stack that tries to read data from the input bucket, so no harm in hard coding this format.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Lets hope it doesn't change in the future.

# }

Notes:
- Parts must be larger than AWS minimum requirements (typically 5 MB),
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Did you need to configure anything to make sure the parts are > 5MB?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That's taken care of by yield_compressed_data. The part_threshold is set to 8 MB by default.

Copy link
Member

@thomasyu888 thomasyu888 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🔥 LGTM! Going to pre-approve, but not sure if @rxu17 had any last comments?

Comment on lines +300 to +306
with io.BytesIO() as object_stream:
s3_client.download_fileobj(
Bucket=sns_message["Bucket"],
Key=sns_message["Key"],
Fileobj=object_stream,
)
object_stream.seek(0)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is super cool how you can do this

Copy link
Contributor

@BryanFauble BryanFauble left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM!

@philerooski philerooski merged commit a68065b into main Sep 6, 2024
18 checks passed
@thomasyu888 thomasyu888 deleted the etl-636 branch October 30, 2024 01:02
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants