Skip to content

Commit

Permalink
Decouple S3 to JSON workflow from JSON to Parquet workflow
Browse files Browse the repository at this point in the history
  • Loading branch information
philerooski committed Oct 12, 2023
1 parent 1c7196b commit 18f117d
Show file tree
Hide file tree
Showing 8 changed files with 80 additions and 46 deletions.
4 changes: 2 additions & 2 deletions .github/workflows/upload-and-deploy.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -287,7 +287,7 @@ jobs:
- name: Invoke Lambda
run: |
cd src/lambda_function/s3_to_glue/
sam local invoke -e events/records.json --parameter-overrides "PrimaryWorkflowName=$NAMESPACE-PrimaryWorkflow"
sam local invoke -e events/records.json --parameter-overrides "S3ToJsonWorkflowName=$NAMESPACE-S3ToJsonWorkflow"
sceptre-deploy-staging:
Expand Down Expand Up @@ -358,4 +358,4 @@ jobs:
- name: Invoke Lambda
run: |
cd src/lambda_function/s3_to_glue/
sam local invoke -e events/records.json --parameter-overrides "PrimaryWorkflowName=staging-PrimaryWorkflow"
sam local invoke -e events/records.json --parameter-overrides "S3ToJsonWorkflowName=staging-S3ToJsonWorkflow"
2 changes: 1 addition & 1 deletion config/develop/namespaced/s3-to-glue-lambda.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,6 @@ stack_tags: {{ stack_group_config.default_stack_tags }}
parameters:
SQSQueueArn: !stack_output_external "{{ stack_group_config.namespace }}-sqs-S3ToLambda::PrimaryQueueArn"
S3ToGlueRoleArn: !stack_output_external "{{ stack_group_config.namespace }}-s3-to-glue-lambda-role::RoleArn"
PrimaryWorkflowName: !stack_output_external "{{ stack_group_config.namespace }}-glue-workflow::WorkflowName"
S3ToJsonWorkflowName: !stack_output_external "{{ stack_group_config.namespace }}-glue-workflow::S3ToJsonWorkflowName"
LambdaBatchSize: '10'
LambdaMaximumBatchingWindowInSeconds: '300'
2 changes: 1 addition & 1 deletion config/prod/namespaced/s3-to-glue-lambda.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,6 @@ stack_tags: {{ stack_group_config.default_stack_tags }}
parameters:
SQSQueueArn: !stack_output_external "{{ stack_group_config.namespace }}-sqs-S3ToLambda::PrimaryQueueArn"
S3ToGlueRoleArn: !stack_output_external "{{ stack_group_config.namespace }}-s3-to-glue-lambda-role::RoleArn"
PrimaryWorkflowName: !stack_output_external "{{ stack_group_config.namespace }}-glue-workflow::WorkflowName"
S3ToJsonWorkflowName: !stack_output_external "{{ stack_group_config.namespace }}-glue-workflow::S3ToJsonWorkflowName"
LambdaBatchSize: '10'
LambdaMaximumBatchingWindowInSeconds: '300'
8 changes: 4 additions & 4 deletions src/lambda_function/s3_to_glue/app.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
"""
This Lambda app responds to an SQS event notification and starts a Glue workflow.
The Glue workflow name is set by the environment variable `PRIMARY_WORKFLOW_NAME`.
The Glue workflow name is set by the environment variable `S3_TO_JSON_WORKFLOW_NAME`.
Subsequently, the S3 objects which were contained in the SQS event are written as a
JSON string to the `messages` workflow run property.
"""
Expand Down Expand Up @@ -141,14 +141,14 @@ def lambda_handler(event, context) -> dict:
if len(s3_objects_info) > 0:
logger.info(
"Submitting the following files to "
f"{os.environ['PRIMARY_WORKFLOW_NAME']}: {json.dumps(s3_objects_info)}"
f"{os.environ['S3_TO_JSON_WORKFLOW_NAME']}: {json.dumps(s3_objects_info)}"
)
submit_s3_to_json_workflow(
objects_info=s3_objects_info,
workflow_name=os.environ["PRIMARY_WORKFLOW_NAME"]
workflow_name=os.environ["S3_TO_JSON_WORKFLOW_NAME"]
)
else:
logger.info(
"NO files were submitted to "
f"{os.environ['PRIMARY_WORKFLOW_NAME']}: {json.dumps(s3_objects_info)}"
f"{os.environ['S3_TO_JSON_WORKFLOW_NAME']}: {json.dumps(s3_objects_info)}"
)
4 changes: 2 additions & 2 deletions src/lambda_function/s3_to_glue/template.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ Parameters:
Type: String
Description: Arn for the S3 to Glue Lambda Role

PrimaryWorkflowName:
S3ToJsonWorkflowName:
Type: String
Description: >
Name of the main glue workflow that runs glue jobs from S3 to JSON and JSON to Parquet
Expand Down Expand Up @@ -50,7 +50,7 @@ Resources:
Timeout: 30
Environment:
Variables:
PRIMARY_WORKFLOW_NAME: !Ref PrimaryWorkflowName
S3_TO_JSON_WORKFLOW_NAME: !Ref S3ToJsonWorkflowName
Events:
SQSEvent:
Type: SQS
Expand Down
2 changes: 1 addition & 1 deletion src/lambda_function/s3_to_glue/test-env-vars.json
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
{
"S3ToGlueFunction": {
"PRIMARY_WORKFLOW_NAME": "main-PrimaryWorkflow"
"PRIMARY_WORKFLOW_NAME": "main-S3ToJsonWorkflow"
}
}
102 changes: 68 additions & 34 deletions templates/glue-workflow.j2
Original file line number Diff line number Diff line change
@@ -1,15 +1,21 @@
AWSTemplateFormatVersion: '2010-09-09'

Description: >-
The primary workflow for processing RECOVER data. An outline of the workflow is below:
The two workflows for processing RECOVER data. An outline of each workflow is below:

S3 to JSON ->
(S3 to JSON) On-demand trigger (triggered by Lambda) ->
(S3 to JSON) S3 to JSON

and

(JSON to Parquet) Scheduled trigger ->
(JSON to Parquet) Crawler ->
(JSON to Parquet) EnrolledParticipants and SymptomLog ->
(JSON to Parquet) HealthKit ->
(JSON to Parquet) Fitbit ->
(JSON to Parquet) Google ->
(JSON to Parquet) Garmin ->
CompareParquetJob
(JSON to Parquet) CompareParquetJob (if Namespace != "main")

Parameters:

Expand Down Expand Up @@ -50,6 +56,15 @@ Parameters:
Type: String
Description: The name of the S3 To JSON Job

JsontoParquetTriggerSchedule:
Type: String
Description: >-
The cron schedule on which the JSON to Parquet workflow is triggered.
When `IsMainNamespace`, the respective trigger is active from the moment
of deployment. Otherwise, the trigger is disabled so that we don't waste
resources running our development pipelines every day.
Default: cron(0 2 * * ? *)

CompareParquetStagingNamespace:
Type: String
Description: the name of the "staging" namespace
Expand All @@ -59,7 +74,7 @@ Parameters:
Description: The name of the "main" namespace

Conditions:
IsStagingNamespace: !Not [!Equals [!Ref Namespace, "main"]]
IsMainNamespace: !Equals [!Ref Namespace, "main"]

Resources:

Expand All @@ -72,46 +87,65 @@ Resources:
{% do datasets.append(dataset) %}
{% endfor %}

PrimaryWorkflow:
S3ToJsonWorkflow:
Type: AWS::Glue::Workflow
Properties:
DefaultRunProperties:
namespace: !Ref Namespace
json_bucket: !Ref JsonBucketName
json_prefix: !Ref JsonKeyPrefix
parquet_bucket: !Ref ParquetBucketName
parquet_prefix: !Ref ParquetKeyPrefix
glue_database: !Ref GlueDatabase
Description: >-
Glue workflow for exporting RECOVER data to Parquet datasets
MaxConcurrentRuns: 1
Name: !Sub ${Namespace}-PrimaryWorkflow
Glue workflow for exporting raw data to their JSON datasets
Name: !Sub ${Namespace}-S3ToJsonWorkflow

InitialTrigger:
S3ToJsonTrigger:
Type: AWS::Glue::Trigger
Properties:
Name: !Sub "${Namespace}-InitialTrigger"
Name: !Sub "${Namespace}-S3ToJsonTrigger"
Actions:
- JobName: !Ref S3ToJsonJobName
Description: This is the first trigger in the primary workflow.
Type: ON_DEMAND
WorkflowName: !Ref PrimaryWorkflow
WorkflowName: !Ref S3ToJsonWorkflow

S3ToJsonCompleteTrigger:
JsonToParquetWorkflow:
Type: AWS::Glue::Workflow
Properties:
DefaultRunProperties:
namespace: !Ref Namespace
parquet_bucket: !Ref ParquetBucketName
parquet_prefix: !Ref ParquetKeyPrefix
glue_database: !Ref GlueDatabase
Description: >-
Glue workflow which loads the JSON datasets and writes to them to Parquet datasets
MaxConcurrentRuns: 1
Name: !Sub ${Namespace}-JsonToParquetWorkflow

JsontoParquetTrigger:
Condition: IsMainNamespace
Type: AWS::Glue::Trigger
Properties:
Name: !Sub "${Namespace}-S3ToJsonCompleteTrigger"
Name: !Sub "${Namespace}-JsontoParquetTrigger"
Actions:
- CrawlerName: !Ref StandardCrawler
Description: This trigger starts the crawler.
Type: CONDITIONAL
Predicate:
Conditions:
- JobName: !Ref S3ToJsonJobName
State: SUCCEEDED
LogicalOperator: EQUALS
Description: This trigger starts the JSON to Parquet workflow.
Type: SCHEDULED
Schedule: !Ref JsontoParquetTriggerSchedule
StartOnCreation: true
WorkflowName: !Ref PrimaryWorkflow
WorkflowName: !Ref JsonToParquetWorkflow

JsontoParquetTrigger:
Condition: !Not IsMainNamespace
Type: AWS::Glue::Trigger
Properties:
Name: !Sub "${Namespace}-JsontoParquetTrigger"
Actions:
- CrawlerName: !Ref StandardCrawler
Description: This trigger starts the JSON to Parquet workflow.
Type: SCHEDULED
Schedule: !Ref JsontoParquetTriggerSchedule
StartOnCreation: false
WorkflowName: !Ref JsonToParquetWorkflow

StandardCrawler:
Type: AWS::Glue::Crawler
Expand Down Expand Up @@ -150,7 +184,7 @@ Resources:
LogicalOperator: EQUALS
CrawlState: SUCCEEDED
StartOnCreation: true
WorkflowName: !Ref PrimaryWorkflow
WorkflowName: !Ref JsonToParquetWorkflow

HealthKitTrigger:
Type: AWS::Glue::Trigger
Expand All @@ -172,7 +206,7 @@ Resources:
{% endfor %}
Logical: AND
StartOnCreation: true
WorkflowName: !Ref PrimaryWorkflow
WorkflowName: !Ref JsonToParquetWorkflow

FitbitTrigger:
Type: AWS::Glue::Trigger
Expand All @@ -194,7 +228,7 @@ Resources:
{% endfor %}
Logical: AND
StartOnCreation: true
WorkflowName: !Ref PrimaryWorkflow
WorkflowName: !Ref JsonToParquetWorkflow

GoogleTrigger:
Type: AWS::Glue::Trigger
Expand All @@ -216,7 +250,7 @@ Resources:
{% endfor %}
Logical: AND
StartOnCreation: true
WorkflowName: !Ref PrimaryWorkflow
WorkflowName: !Ref JsonToParquetWorkflow

GarminTrigger:
Type: AWS::Glue::Trigger
Expand All @@ -238,11 +272,11 @@ Resources:
{% endfor %}
Logical: AND
StartOnCreation: true
WorkflowName: !Ref PrimaryWorkflow
WorkflowName: !Ref JsonToParquetWorkflow

JsontoParquetCompleteTrigger:
Type: AWS::Glue::Trigger
Condition: IsStagingNamespace
Condition: !Not IsMainNamespace
Properties:
Name: !Sub "${Namespace}-JsontoParquetCompleteTrigger"
Actions:
Expand All @@ -266,11 +300,11 @@ Resources:
{% endfor %}
Logical: AND
StartOnCreation: true
WorkflowName: !Ref PrimaryWorkflow
WorkflowName: !Ref JsonToParquetWorkflow

Outputs:

WorkflowName:
Value: !Ref PrimaryWorkflow
S3ToJsonWorkflowName:
Value: !Ref S3ToJsonWorkflow
Export:
Name: !Sub '${AWS::Region}-${AWS::StackName}-WorkflowName'
Name: !Sub '${AWS::Region}-${AWS::StackName}-S3ToJsonWorkflowName'
2 changes: 1 addition & 1 deletion tests/test_s3_to_glue_lambda.py
Original file line number Diff line number Diff line change
Expand Up @@ -156,7 +156,7 @@ def object_info(self):

@pytest.fixture
def set_env_var(self, monkeypatch, sqs_queue):
monkeypatch.setenv("PRIMARY_WORKFLOW_NAME", "test_workflow")
monkeypatch.setenv("S3_TO_JSON_WORKFLOW_NAME", "test_workflow")

def test_submit_s3_to_json_workflow(self, object_info, monkeypatch):
monkeypatch.setattr("boto3.client", lambda x: MockGlueClient())
Expand Down

0 comments on commit 18f117d

Please sign in to comment.