Skip to content

Commit

Permalink
change to using sns
Browse files Browse the repository at this point in the history
  • Loading branch information
rzlim08 committed Sep 11, 2023
1 parent 7975d03 commit faf455c
Show file tree
Hide file tree
Showing 9 changed files with 26 additions and 31 deletions.
2 changes: 1 addition & 1 deletion Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@ ADD miniwdl-plugins miniwdl-plugins
RUN pip install miniwdl-plugins/s3upload
RUN pip install miniwdl-plugins/sfn_wdl
RUN pip install miniwdl-plugins/s3parcp_download
RUN pip install miniwdl-plugins/sqs_notification
RUN pip install miniwdl-plugins/sns_notification

RUN cd /usr/bin; curl -O https://amazon-ecr-credential-helper-releases.s3.amazonaws.com/0.4.0/linux-amd64/docker-credential-ecr-login
RUN chmod +x /usr/bin/docker-credential-ecr-login
Expand Down
1 change: 1 addition & 0 deletions miniwdl-plugins/sns_notification/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
# sns_notifications
Original file line number Diff line number Diff line change
Expand Up @@ -7,23 +7,23 @@
long_description = f.read()

setup(
name="sqs_notification",
name="sns_notification",
version="0.0.1",
description="miniwdl plugin for notification of task completion to Amazon SQS",
url="https://github.com/chanzuckerberg/miniwdl-s3upload",
url="https://github.com/chanzuckerberg/swipe",
project_urls={},
long_description=long_description,
long_description_content_type="text/markdown",
author="",
py_modules=["sqs_notification"],
py_modules=["sns_notification"],
python_requires=">=3.6",
setup_requires=["reentry"],
install_requires=["boto3"],
reentry_register=True,
entry_points={
"miniwdl.plugin.task": ["sqs_notification_task = sqs_notification:task"],
"miniwdl.plugin.task": ["sns_notification_task = sns_notification:task"],
"miniwdl.plugin.workflow": [
"sqs_notification_workflow = sqs_notification:workflow"
"sns_notification_workflow = sns_notification:workflow"
],
},
)
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
"""
TODO
Send SNS notifications after each miniwdl step
"""

import os
Expand All @@ -10,8 +10,8 @@

import boto3

sqs_client = boto3.client("sqs", endpoint_url=os.getenv("AWS_ENDPOINT_URL"))
queue_url = os.getenv("AWS_STEP_NOTIFICATION_PLUGIN")
sns_client = boto3.client("sns", endpoint_url=os.getenv("AWS_ENDPOINT_URL"))
topic_arn = os.getenv('STEP_NOTIFICATION_TOPIC_ARN')


def process_outputs(outputs: Dict):
Expand All @@ -21,22 +21,21 @@ def process_outputs(outputs: Dict):


def send_message(attr, body):
"""send message to SQS, eventually wrap this in a try catch to deal with throttling"""
sqs_resp = sqs_client.send_message(
QueueUrl=queue_url,
DelaySeconds=0,
"""send message to SNS"""
sns_resp = sns_client.publish(
TopicArn=topic_arn,
Message=body,
MessageAttributes=attr,
MessageBody=body,
)
return sqs_resp
return sns_resp


def task(cfg, logger, run_id, run_dir, task, **recv):
"""
on completion of any task, upload its output files to S3, and record the S3 URI corresponding
to each local file (keyed by inode) in _uploaded_files
"""
logger = logger.getChild("s3_progressive_upload")
logger = logger.getChild("sns_step_notification")

# ignore inputs
recv = yield recv
Expand Down Expand Up @@ -75,7 +74,7 @@ def workflow(cfg, logger, run_id, run_dir, workflow, **recv):
with local filenames rewritten to the uploaded S3 URIs (as previously recorded on completion of
each task).
"""
logger = logger.getChild("s3_progressive_upload")
logger = logger.getChild("sns_step_notification")

# ignore inputs
recv = yield recv
Expand Down
1 change: 0 additions & 1 deletion miniwdl-plugins/sqs_notification/README.md

This file was deleted.

2 changes: 1 addition & 1 deletion terraform/modules/swipe-sfn-batch-job/main.tf
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ locals {
"MINIWDL__DOWNLOAD_CACHE__DISABLE_PATTERNS" = "[\"s3://swipe-samples-*/*\"]",
"DOWNLOAD_CACHE_MAX_GB" = "500",
"WDL_PASSTHRU_ENVVARS" = join(" ", [for k, v in var.extra_env_vars : k]),
"AWS_STEP_NOTIFICATION_PLUGIN" = var.sfn_notification_queue_urls[0],
"STEP_NOTIFICATION_TOPIC_ARN" = var.sfn_notification_topic_arn,
"OUTPUT_STATUS_JSON_FILES" = tostring(var.output_status_json_files)
})
container_env_vars = { "environment" : [for k in sort(keys(local.batch_env_vars)) : { "name" : k, "value" : local.batch_env_vars[k] }] }
Expand Down
10 changes: 3 additions & 7 deletions terraform/modules/swipe-sfn-batch-job/variables.tf
Original file line number Diff line number Diff line change
Expand Up @@ -71,12 +71,8 @@ variable "docker_network" {
default = ""
}

variable "sfn_notification_queue_arns" {
description = "ARNs of notification SQS queues"
type = list(string)
variable "sfn_notification_topic_arn" {
description = "ARN of notification sns topic"
type = string
}

variable "sfn_notification_queue_urls" {
description = "URLs of notification SQS queues"
type = list(string)
}
3 changes: 1 addition & 2 deletions terraform/modules/swipe-sfn/main.tf
Original file line number Diff line number Diff line change
Expand Up @@ -44,8 +44,7 @@ module "batch_job" {
docker_network = var.docker_network
call_cache = var.call_cache
output_status_json_files = var.output_status_json_files
sfn_notification_queue_arns = [for name, queue in aws_sqs_queue.sfn_notifications_queue : queue.arn]
sfn_notification_queue_urls = [for name, queue in aws_sqs_queue.sfn_notifications_queue : queue.url]
sfn_notification_topic_arn = aws_sns_topic.sfn_notifications_topic[0].arn
tags = var.tags
}

Expand Down
7 changes: 4 additions & 3 deletions terraform/modules/swipe-sfn/notifications.tf
Original file line number Diff line number Diff line change
Expand Up @@ -64,9 +64,10 @@ data "aws_iam_policy_document" "sfn_notifications_topic_policy_document" {
resource "aws_sns_topic_subscription" "sfn_notifications_sqs_target" {
for_each = var.sqs_queues

topic_arn = aws_sns_topic.sfn_notifications_topic[0].arn
protocol = "sqs"
endpoint = aws_sqs_queue.sfn_notifications_queue[each.key].arn
topic_arn = aws_sns_topic.sfn_notifications_topic[0].arn
protocol = "sqs"
endpoint = aws_sqs_queue.sfn_notifications_queue[each.key].arn
raw_message_delivery = true
}

resource "aws_sqs_queue" "sfn_notifications_queue" {
Expand Down

0 comments on commit faf455c

Please sign in to comment.