From faf455ce0032439805b094e25b017f87c057696d Mon Sep 17 00:00:00 2001 From: Ryan Lim Date: Mon, 11 Sep 2023 16:19:26 -0700 Subject: [PATCH] change to using sns --- Dockerfile | 2 +- miniwdl-plugins/sns_notification/README.md | 1 + .../setup.py | 10 ++++----- .../sns_notification.py} | 21 +++++++++---------- miniwdl-plugins/sqs_notification/README.md | 1 - terraform/modules/swipe-sfn-batch-job/main.tf | 2 +- .../modules/swipe-sfn-batch-job/variables.tf | 10 +++------ terraform/modules/swipe-sfn/main.tf | 3 +-- terraform/modules/swipe-sfn/notifications.tf | 7 ++++--- 9 files changed, 26 insertions(+), 31 deletions(-) create mode 100644 miniwdl-plugins/sns_notification/README.md rename miniwdl-plugins/{sqs_notification => sns_notification}/setup.py (72%) rename miniwdl-plugins/{sqs_notification/sqs_notification.py => sns_notification/sns_notification.py} (79%) delete mode 100644 miniwdl-plugins/sqs_notification/README.md diff --git a/Dockerfile b/Dockerfile index 7d4c8fb3..88c8d99e 100644 --- a/Dockerfile +++ b/Dockerfile @@ -63,7 +63,7 @@ ADD miniwdl-plugins miniwdl-plugins RUN pip install miniwdl-plugins/s3upload RUN pip install miniwdl-plugins/sfn_wdl RUN pip install miniwdl-plugins/s3parcp_download -RUN pip install miniwdl-plugins/sqs_notification +RUN pip install miniwdl-plugins/sns_notification RUN cd /usr/bin; curl -O https://amazon-ecr-credential-helper-releases.s3.amazonaws.com/0.4.0/linux-amd64/docker-credential-ecr-login RUN chmod +x /usr/bin/docker-credential-ecr-login diff --git a/miniwdl-plugins/sns_notification/README.md b/miniwdl-plugins/sns_notification/README.md new file mode 100644 index 00000000..243062ab --- /dev/null +++ b/miniwdl-plugins/sns_notification/README.md @@ -0,0 +1 @@ +# sns_notifications \ No newline at end of file diff --git a/miniwdl-plugins/sqs_notification/setup.py b/miniwdl-plugins/sns_notification/setup.py similarity index 72% rename from miniwdl-plugins/sqs_notification/setup.py rename to miniwdl-plugins/sns_notification/setup.py index 6a9723c0..ce3e66ca 100644 --- a/miniwdl-plugins/sqs_notification/setup.py +++ b/miniwdl-plugins/sns_notification/setup.py @@ -7,23 +7,23 @@ long_description = f.read() setup( - name="sqs_notification", + name="sns_notification", version="0.0.1", description="miniwdl plugin for notification of task completion to Amazon SQS", - url="https://github.com/chanzuckerberg/miniwdl-s3upload", + url="https://github.com/chanzuckerberg/swipe", project_urls={}, long_description=long_description, long_description_content_type="text/markdown", author="", - py_modules=["sqs_notification"], + py_modules=["sns_notification"], python_requires=">=3.6", setup_requires=["reentry"], install_requires=["boto3"], reentry_register=True, entry_points={ - "miniwdl.plugin.task": ["sqs_notification_task = sqs_notification:task"], + "miniwdl.plugin.task": ["sns_notification_task = sns_notification:task"], "miniwdl.plugin.workflow": [ - "sqs_notification_workflow = sqs_notification:workflow" + "sns_notification_workflow = sns_notification:workflow" ], }, ) diff --git a/miniwdl-plugins/sqs_notification/sqs_notification.py b/miniwdl-plugins/sns_notification/sns_notification.py similarity index 79% rename from miniwdl-plugins/sqs_notification/sqs_notification.py rename to miniwdl-plugins/sns_notification/sns_notification.py index dfc41014..6e891521 100644 --- a/miniwdl-plugins/sqs_notification/sqs_notification.py +++ b/miniwdl-plugins/sns_notification/sns_notification.py @@ -1,5 +1,5 @@ """ -TODO +Send SNS notifications after each miniwdl step """ import os @@ -10,8 +10,8 @@ import boto3 -sqs_client = boto3.client("sqs", endpoint_url=os.getenv("AWS_ENDPOINT_URL")) -queue_url = os.getenv("AWS_STEP_NOTIFICATION_PLUGIN") +sns_client = boto3.client("sns", endpoint_url=os.getenv("AWS_ENDPOINT_URL")) +topic_arn = os.getenv('STEP_NOTIFICATION_TOPIC_ARN') def process_outputs(outputs: Dict): @@ -21,14 +21,13 @@ def process_outputs(outputs: Dict): def send_message(attr, body): - """send message to SQS, eventually wrap this in a try catch to deal with throttling""" - sqs_resp = sqs_client.send_message( - QueueUrl=queue_url, - DelaySeconds=0, + """send message to SNS""" + sns_resp = sns_client.publish( + TopicArn=topic_arn, + Message=body, MessageAttributes=attr, - MessageBody=body, ) - return sqs_resp + return sns_resp def task(cfg, logger, run_id, run_dir, task, **recv): @@ -36,7 +35,7 @@ def task(cfg, logger, run_id, run_dir, task, **recv): on completion of any task, upload its output files to S3, and record the S3 URI corresponding to each local file (keyed by inode) in _uploaded_files """ - logger = logger.getChild("s3_progressive_upload") + logger = logger.getChild("sns_step_notification") # ignore inputs recv = yield recv @@ -75,7 +74,7 @@ def workflow(cfg, logger, run_id, run_dir, workflow, **recv): with local filenames rewritten to the uploaded S3 URIs (as previously recorded on completion of each task). """ - logger = logger.getChild("s3_progressive_upload") + logger = logger.getChild("sns_step_notification") # ignore inputs recv = yield recv diff --git a/miniwdl-plugins/sqs_notification/README.md b/miniwdl-plugins/sqs_notification/README.md deleted file mode 100644 index 0b569748..00000000 --- a/miniwdl-plugins/sqs_notification/README.md +++ /dev/null @@ -1 +0,0 @@ -# sqs_notifications \ No newline at end of file diff --git a/terraform/modules/swipe-sfn-batch-job/main.tf b/terraform/modules/swipe-sfn-batch-job/main.tf index 0a8934e9..82cfc17f 100644 --- a/terraform/modules/swipe-sfn-batch-job/main.tf +++ b/terraform/modules/swipe-sfn-batch-job/main.tf @@ -34,7 +34,7 @@ locals { "MINIWDL__DOWNLOAD_CACHE__DISABLE_PATTERNS" = "[\"s3://swipe-samples-*/*\"]", "DOWNLOAD_CACHE_MAX_GB" = "500", "WDL_PASSTHRU_ENVVARS" = join(" ", [for k, v in var.extra_env_vars : k]), - "AWS_STEP_NOTIFICATION_PLUGIN" = var.sfn_notification_queue_urls[0], + "STEP_NOTIFICATION_TOPIC_ARN" = var.sfn_notification_topic_arn, "OUTPUT_STATUS_JSON_FILES" = tostring(var.output_status_json_files) }) container_env_vars = { "environment" : [for k in sort(keys(local.batch_env_vars)) : { "name" : k, "value" : local.batch_env_vars[k] }] } diff --git a/terraform/modules/swipe-sfn-batch-job/variables.tf b/terraform/modules/swipe-sfn-batch-job/variables.tf index 35a053fa..4c52b5d3 100644 --- a/terraform/modules/swipe-sfn-batch-job/variables.tf +++ b/terraform/modules/swipe-sfn-batch-job/variables.tf @@ -71,12 +71,8 @@ variable "docker_network" { default = "" } -variable "sfn_notification_queue_arns" { - description = "ARNs of notification SQS queues" - type = list(string) +variable "sfn_notification_topic_arn" { + description = "ARN of notification sns topic" + type = string } -variable "sfn_notification_queue_urls" { - description = "URLs of notification SQS queues" - type = list(string) -} diff --git a/terraform/modules/swipe-sfn/main.tf b/terraform/modules/swipe-sfn/main.tf index b8e0d642..fe08de80 100644 --- a/terraform/modules/swipe-sfn/main.tf +++ b/terraform/modules/swipe-sfn/main.tf @@ -44,8 +44,7 @@ module "batch_job" { docker_network = var.docker_network call_cache = var.call_cache output_status_json_files = var.output_status_json_files - sfn_notification_queue_arns = [for name, queue in aws_sqs_queue.sfn_notifications_queue : queue.arn] - sfn_notification_queue_urls = [for name, queue in aws_sqs_queue.sfn_notifications_queue : queue.url] + sfn_notification_topic_arn = aws_sns_topic.sfn_notifications_topic[0].arn tags = var.tags } diff --git a/terraform/modules/swipe-sfn/notifications.tf b/terraform/modules/swipe-sfn/notifications.tf index c0a04eaf..c7025b47 100644 --- a/terraform/modules/swipe-sfn/notifications.tf +++ b/terraform/modules/swipe-sfn/notifications.tf @@ -64,9 +64,10 @@ data "aws_iam_policy_document" "sfn_notifications_topic_policy_document" { resource "aws_sns_topic_subscription" "sfn_notifications_sqs_target" { for_each = var.sqs_queues - topic_arn = aws_sns_topic.sfn_notifications_topic[0].arn - protocol = "sqs" - endpoint = aws_sqs_queue.sfn_notifications_queue[each.key].arn + topic_arn = aws_sns_topic.sfn_notifications_topic[0].arn + protocol = "sqs" + endpoint = aws_sqs_queue.sfn_notifications_queue[each.key].arn + raw_message_delivery = true } resource "aws_sqs_queue" "sfn_notifications_queue" {