Skip to content

Commit

Permalink
First pass at processing architecture (#154)
Browse files Browse the repository at this point in the history
* first pass at processing
  • Loading branch information
laspsandoval authored Oct 5, 2023
1 parent f39b681 commit 3e48333
Show file tree
Hide file tree
Showing 12 changed files with 697 additions and 11 deletions.
1 change: 1 addition & 0 deletions cdk.json
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
},
"context": {
"env": "backup",
"usernames": ["sandoval"],
"dev": {
"sds_id": "dev"
},
Expand Down
147 changes: 147 additions & 0 deletions sds_data_manager/constructs/batch_compute_resources.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,147 @@
"""
This module provides the FargateBatchResources class which sets up AWS Batch resources
utilizing Fargate as the compute environment. The resources include:
- IAM roles.
- Compute environment for AWS Batch.
- ECR repository for container images.
- Batch job queue and job definition.
"""
from aws_cdk import aws_batch as batch
from aws_cdk import aws_ec2 as ec2
from aws_cdk import aws_ecr as ecr
from aws_cdk import aws_iam as iam
from aws_cdk import aws_s3 as s3
from constructs import Construct


class FargateBatchResources(Construct):
"""Fargate Batch compute environment with named Job Queue, and Job Definition.
"""

def __init__(self,
scope: Construct,
construct_id: str,
sds_id: str,
vpc: ec2.Vpc,
processing_step_name: str,
data_bucket: s3.Bucket,
repo: ecr.Repository,
batch_max_vcpus=10,
job_vcpus=0.25,
job_memory=512):
"""Constructor
Parameters
----------
scope : Construct
Parent construct.
construct_id : str
A unique string identifier for this construct.
sds_id : str
Name suffix for stack
vpc : ec2.Vpc
VPC into which to launch the compute instance.
processing_step_name : str
Name of data product being generated in this Batch job.
data_bucket : s3.Bucket
S3 bucket.
repo : ecr.Repository
Container repo
batch_max_vcpus : int, Optional
Maximum number of virtual CPUs per compute instance.
job_vcpus : int, Optional
Number of virtual CPUs required per Batch job.
Dependent on Docker image contents.
job_memory : int: Optional
Memory required per Batch job in MB. Dependent on Docker image contents.
"""
super().__init__(scope, construct_id)

self.role = iam.Role(self, f"BatchServiceRole-{sds_id}",
assumed_by=iam.ServicePrincipal('batch.amazonaws.com'),
managed_policies=[
iam.ManagedPolicy.from_aws_managed_policy_name(
"service-role/AWSBatchServiceRole")
]
)

# Required since our task is hosted on AWS Fargate,
# is pulling container images from the ECR, and sending
# container logs to CloudWatch.
fargate_execution_role = iam.Role(self, f"FargateExecutionRole-{sds_id}",
assumed_by=iam.ServicePrincipal("ecs-tasks.amazonaws.com"),
managed_policies=[
iam.ManagedPolicy.from_aws_managed_policy_name(
'service-role/AmazonECSTaskExecutionRolePolicy')
])

# Setup a security group for the Fargate-generated EC2 instances.
batch_security_group = ec2.SecurityGroup(self,
f"FargateInstanceSecurityGroup-{sds_id}",
vpc=vpc)

# PRIVATE_WITH_NAT allows batch job to pull images from the ECR.
# TODO: Evaluate SPOT resources
self.compute_environment = batch.CfnComputeEnvironment(
self, f"FargateBatchComputeEnvironment-{sds_id}",
type='MANAGED',
service_role=self.role.role_arn,
compute_resources=batch.CfnComputeEnvironment.ComputeResourcesProperty(
type='FARGATE',
maxv_cpus=batch_max_vcpus,
subnets=vpc.select_subnets(subnet_type=ec2.SubnetType.PRIVATE_WITH_NAT).subnet_ids,
security_group_ids=[batch_security_group.security_group_id]
)
)

# The set of compute environments mapped to a job queue
# and their order relative to each other
compute_environment_order = batch.CfnJobQueue.ComputeEnvironmentOrderProperty(
compute_environment=self.compute_environment.ref,
order=1)

repo.grant_pull_push(fargate_execution_role)

# Setup job queue
self.job_queue_name = f"{processing_step_name}-fargate-batch-job-queue"
self.job_queue = batch.CfnJobQueue(self, f"FargateBatchJobQueue-{sds_id}",
job_queue_name=self.job_queue_name,
priority=1,
compute_environment_order=[compute_environment_order])

# Batch job role, so we can later grant access to the appropriate
# S3 buckets and other resources
self.batch_job_role = iam.Role(self, f"BatchJobRole-{sds_id}",
assumed_by=iam.ServicePrincipal("ecs-tasks.amazonaws.com"),
managed_policies=[
iam.ManagedPolicy.from_aws_managed_policy_name("AmazonS3FullAccess")])
data_bucket.grant_read_write(self.batch_job_role)

#TODO: come back and add ability to grab latest version of
# processing_step_name tag. I think this will require
# setting up a lambda. Maybe there's another way?
self.job_definition_name = f"fargate-batch-job-definition{processing_step_name}"
self.job_definition = batch.CfnJobDefinition(
self, f"FargateBatchJobDefinition-{sds_id}",
job_definition_name=self.job_definition_name,
type="CONTAINER",
platform_capabilities=['FARGATE'],
container_properties={
'image': f"{repo.repository_uri}:{processing_step_name}",
'resourceRequirements': [
{
'value': str(job_memory),
'type': 'MEMORY'
},
{
'value': str(job_vcpus),
'type': 'VCPU'
}
],
'executionRoleArn': fargate_execution_role.role_arn,
'jobRoleArn': self.batch_job_role.role_arn,
},
tags={
'Purpose': 'Batch Processing'
}
)
69 changes: 69 additions & 0 deletions sds_data_manager/constructs/instrument_lambdas.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,69 @@
"""Module containing constructs for instrumenting Lambda functions."""

from pathlib import Path

from aws_cdk import Duration
from aws_cdk import aws_lambda as lambda_
from aws_cdk import aws_lambda_python_alpha as lambda_alpha_
from aws_cdk import aws_s3 as s3
from constructs import Construct


class InstrumentLambda(Construct):
"""Generic Construct with customizable runtime code
"""

def __init__(self,
scope: Construct,
construct_id: str,
processing_step_name: str,
data_bucket: s3.Bucket,
code_path: str or Path,
instrument_target: str,
instrument_sources: str):
"""
InstrumentLambda Constructor.
Parameters
----------
scope : Construct
construct_id : str
processing_step_name : str
Processing step name
data_bucket: s3.Bucket
S3 bucket
code_path : str or Path
Path to the Lambda code directory
instrument_target : str
Target data product (i.e. expected product)
instrument_sources : str
Data product sources (i.e. dependencies)
"""

super().__init__(scope, construct_id)

# Define Lambda Environment Variables
# TODO: if we need more variables change so we can pass as input
lambda_environment = {
"S3_BUCKET": f"{data_bucket.bucket_name}",
"S3_KEY_PATH": instrument_sources,
"INSTRUMENT_TARGET": instrument_target,
"PROCESSING_NAME": processing_step_name,
"OUTPUT_PATH": f"s3://{data_bucket.bucket_name}/{instrument_target}"
}

#TODO: Add Lambda layers for more libraries (or Dockerize)
self.instrument_lambda = lambda_alpha_.PythonFunction(
self,
id=f"InstrumentLambda-{processing_step_name}",
function_name=f"{processing_step_name}",
entry=str(code_path),
index=f"instruments/{instrument_target.lower()}.py",
handler="lambda_handler",
runtime=lambda_.Runtime.PYTHON_3_11,
timeout=Duration.seconds(10),
memory_size=512,
environment=lambda_environment
)

data_bucket.grant_read_write(self.instrument_lambda)
126 changes: 126 additions & 0 deletions sds_data_manager/constructs/sdc_step_function.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,126 @@
"""
The state machine integrates with AWS Lambda and AWS Batch to execute processing
components.
Key Features:
- Configures AWS Step Functions tasks to invoke specific Lambda functions.
- Dynamically constructs ARNs for Batch job definitions and queues.
- Handles branching logic based on the success or failure of previous steps.
- Defines a comprehensive state machine for the entire data processing flow.
"""
from aws_cdk import Stack
from aws_cdk import aws_s3 as s3
from aws_cdk import aws_stepfunctions as sfn
from aws_cdk import aws_stepfunctions_tasks as tasks
from constructs import Construct

from sds_data_manager.constructs.batch_compute_resources import FargateBatchResources
from sds_data_manager.constructs.instrument_lambdas import InstrumentLambda


class SdcStepFunction(Construct):
"""Step Function Construct
Creates state machine using processing components.
"""

def __init__(self,
scope: Construct,
construct_id: str,
processing_step_name: str,
processing_system: InstrumentLambda,
batch_resources: FargateBatchResources,
instrument_target: str,
data_bucket: s3.Bucket):
"""SdcStepFunction Constructor
Parameters
----------
scope : Construct
construct_id : str
processing_step_name : str
The string identifier for the processing step
processing_system: BatchProcessingSystem
Batch processing system
batch_resources: FargateBatchResources
Fargate compute environment
instrument_target : str
Target data product
data_bucket : str
S3 bucket
"""
super().__init__(scope, construct_id)

# Reformat EventBridge Inputs
add_specifics_to_input = sfn.Pass(
self, "Reformat EventBridge Inputs",
parameters={
"TIMEOUT_TIME.$": "$.time",
}
)

# Step Functions Tasks to invoke Lambda function
instrument_task = tasks.LambdaInvoke(self,
f"InstrumentLambda-{processing_step_name}",
lambda_function=processing_system.instrument_lambda,
payload=sfn.TaskInput.from_object(
{"TIMEOUT_TIME.$": "$.TIMEOUT_TIME"}),
result_path="$.InstrumentOutput",
result_selector={
"STATE.$": "$.Payload.STATE",
"JOB_NAME.$": "$.Payload.JOB_NAME",
"COMMAND.$": "$.Payload.COMMAND",
"OUTPUT_PATH": "$.Payload.OUTPUT_PATH",
"INSTRUMENT_TARGET":
"$.Payload.INSTRUMENT_TARGET"
})

# Batch Job Inputs
stack = Stack.of(self)
job_definition_arn = \
f'arn:aws:batch:{stack.region}:{stack.account}:job-definition/' \
f'{batch_resources.job_definition_name}'
job_queue_arn = f'arn:aws:batch:{stack.region}:{stack.account}:job-queue/' \
f'{batch_resources.job_queue_name}'

instrument_target = f"{instrument_target}"

# Batch Job
submit_job = tasks.BatchSubmitJob(
self, f"BatchJob-{processing_step_name}",
job_name=sfn.JsonPath.string_at("$.InstrumentOutput.JOB_NAME"),
job_definition_arn=job_definition_arn,
job_queue_arn=job_queue_arn,
container_overrides=tasks.BatchContainerOverrides(
command=sfn.JsonPath.list_at("$.InstrumentOutput.COMMAND"),
environment={
"OUTPUT_PATH": data_bucket.bucket_name,
"INSTRUMENT_TARGET": instrument_target
}
),
result_path='$.BatchJobOutput'
)

# Success and Fail Final States
fail_state = sfn.Fail(self, "Fail State")

# Choice State
instrument_status = sfn.Choice(self, "Success?")
# Go to Batch job
created = sfn.Condition.string_equals("$.InstrumentOutput.STATE", "SUCCESS")
instrument_status.when(created, submit_job)
instrument_status.otherwise(fail_state)

submit_job.add_catch(fail_state)

# State sequences
add_specifics_to_input.next(
instrument_task).next(
instrument_status)

# Define the state machine
definition_body = sfn.DefinitionBody.from_chainable(add_specifics_to_input)
self.state_machine = sfn.StateMachine(self,
f"CDKProcessingStepStateMachine-{processing_step_name}",
definition_body=definition_body,
state_machine_name=f"{processing_step_name}-step-function")
Empty file.
42 changes: 42 additions & 0 deletions sds_data_manager/lambda_code/SDSCode/instruments/l1b_codice.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
"""Lambda runtime code that triggers off of arrival of data into S3 bucket.
"""
import logging
import os
from datetime import datetime

import boto3

# Setup the logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)

#TODO: ability to access database, EFS, calibration data, etc.

def lambda_handler(event: dict, context):
"""Handler function"""
"""Handler function"""
logger.info(f"Event: {event}")
logger.info(f"Context: {context}")

now = datetime.now()
logger.info(f"Now time is: {now}")

# Get the environment variables
bucket = os.environ['S3_BUCKET']
prefix = os.environ['S3_KEY_PATH']

# Retrieves objects in the S3 bucket under the given prefix
try:
s3 = boto3.client('s3')
object_list = s3.list_objects_v2(Bucket=bucket, Prefix=prefix)["Contents"]
logger.info(f"Object list: {object_list}")
except KeyError:
logger.warning("No files present.")

#TODO: this state will change based on availability of data
#TODO: we need to think about what needs to be passed into the container
return {
"STATE": "SUCCESS",
"JOB_NAME": os.environ['PROCESSING_NAME'],
'COMMAND': ["packet-ingest"]
}
Loading

0 comments on commit 3e48333

Please sign in to comment.