Skip to content

Commit

Permalink
add formatting
Browse files Browse the repository at this point in the history
  • Loading branch information
rzlim08 committed Sep 11, 2023
1 parent faf455c commit e9f58e4
Show file tree
Hide file tree
Showing 2 changed files with 22 additions and 25 deletions.
19 changes: 8 additions & 11 deletions miniwdl-plugins/sns_notification/sns_notification.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
from typing import Dict
from datetime import datetime
from WDL import values_to_json
from WDL._util import StructuredLogMessage as _

import boto3

Expand All @@ -24,25 +25,25 @@ def send_message(attr, body):
"""send message to SNS"""
sns_resp = sns_client.publish(
TopicArn=topic_arn,
Message=body,
Message=body,
MessageAttributes=attr,
)
return sns_resp


def task(cfg, logger, run_id, run_dir, task, **recv):
"""
on completion of any task, upload its output files to S3, and record the S3 URI corresponding
to each local file (keyed by inode) in _uploaded_files
on completion of any task sends a message to sns with the output files
"""
logger = logger.getChild("sns_step_notification")
log = logger.getChild("sns_step_notification")

# ignore inputs
recv = yield recv

# ignore command/runtime/container
recv = yield recv

log.info(_("sending message to sns"))

message_attributes = {
"WorkflowName": {"DataType": "String", "StringValue": run_id[0]},
"TaskName": {"DataType": "String", "StringValue": run_id[-1]},
Expand All @@ -69,14 +70,10 @@ def task(cfg, logger, run_id, run_dir, task, **recv):


def workflow(cfg, logger, run_id, run_dir, workflow, **recv):
"""
on workflow completion, add a file outputs.s3.json to the run directory, which is outputs.json
with local filenames rewritten to the uploaded S3 URIs (as previously recorded on completion of
each task).
"""
logger = logger.getChild("sns_step_notification")
log = logger.getChild("sns_step_notification")

# ignore inputs
recv = yield recv

log.info(_("ignores workflow calls"))
yield recv
28 changes: 14 additions & 14 deletions terraform/modules/swipe-sfn/main.tf
Original file line number Diff line number Diff line change
Expand Up @@ -32,20 +32,20 @@ resource "aws_iam_role_policy_attachment" "swipe_sfn_service" {
}

module "batch_job" {
source = "../swipe-sfn-batch-job"
app_name = var.app_name
batch_job_docker_image = var.batch_job_docker_image
batch_job_timeout_seconds = var.batch_job_timeout_seconds
miniwdl_dir = var.miniwdl_dir
workspace_s3_prefixes = var.workspace_s3_prefixes
wdl_workflow_s3_prefix = var.wdl_workflow_s3_prefix
job_policy_arns = var.job_policy_arns
extra_env_vars = var.extra_env_vars
docker_network = var.docker_network
call_cache = var.call_cache
output_status_json_files = var.output_status_json_files
sfn_notification_topic_arn = aws_sns_topic.sfn_notifications_topic[0].arn
tags = var.tags
source = "../swipe-sfn-batch-job"
app_name = var.app_name
batch_job_docker_image = var.batch_job_docker_image
batch_job_timeout_seconds = var.batch_job_timeout_seconds
miniwdl_dir = var.miniwdl_dir
workspace_s3_prefixes = var.workspace_s3_prefixes
wdl_workflow_s3_prefix = var.wdl_workflow_s3_prefix
job_policy_arns = var.job_policy_arns
extra_env_vars = var.extra_env_vars
docker_network = var.docker_network
call_cache = var.call_cache
output_status_json_files = var.output_status_json_files
sfn_notification_topic_arn = aws_sns_topic.sfn_notifications_topic[0].arn
tags = var.tags
}

module "sfn_io_helper" {
Expand Down

0 comments on commit e9f58e4

Please sign in to comment.