Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

First pass at processing architecture #154

Merged
merged 21 commits into from
Oct 5, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions cdk.json
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
},
"context": {
"env": "backup",
"usernames": ["sandoval"],
"dev": {
"sds_id": "dev"
},
Expand Down
147 changes: 147 additions & 0 deletions sds_data_manager/constructs/batch_compute_resources.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,147 @@
"""
This module provides the FargateBatchResources class which sets up AWS Batch resources
utilizing Fargate as the compute environment. The resources include:
- IAM roles.
- Compute environment for AWS Batch.
- ECR repository for container images.
- Batch job queue and job definition.
"""
from aws_cdk import aws_batch as batch
from aws_cdk import aws_ec2 as ec2
from aws_cdk import aws_ecr as ecr
from aws_cdk import aws_iam as iam
from aws_cdk import aws_s3 as s3
from constructs import Construct


class FargateBatchResources(Construct):
"""Fargate Batch compute environment with named Job Queue, and Job Definition.
"""

def __init__(self,
scope: Construct,
construct_id: str,
sds_id: str,
vpc: ec2.Vpc,
processing_step_name: str,
data_bucket: s3.Bucket,
repo: ecr.Repository,
batch_max_vcpus=10,
job_vcpus=0.25,
job_memory=512):
"""Constructor

Parameters
----------
scope : Construct
Parent construct.
construct_id : str
A unique string identifier for this construct.
sds_id : str
Name suffix for stack
vpc : ec2.Vpc
VPC into which to launch the compute instance.
processing_step_name : str
Name of data product being generated in this Batch job.
data_bucket : s3.Bucket
S3 bucket.
repo : ecr.Repository
Container repo
batch_max_vcpus : int, Optional
Maximum number of virtual CPUs per compute instance.
job_vcpus : int, Optional
Number of virtual CPUs required per Batch job.
Dependent on Docker image contents.
job_memory : int: Optional
Memory required per Batch job in MB. Dependent on Docker image contents.
"""
super().__init__(scope, construct_id)

self.role = iam.Role(self, f"BatchServiceRole-{sds_id}",
assumed_by=iam.ServicePrincipal('batch.amazonaws.com'),
managed_policies=[
iam.ManagedPolicy.from_aws_managed_policy_name(
"service-role/AWSBatchServiceRole")
]
)

# Required since our task is hosted on AWS Fargate,
# is pulling container images from the ECR, and sending
# container logs to CloudWatch.
fargate_execution_role = iam.Role(self, f"FargateExecutionRole-{sds_id}",
assumed_by=iam.ServicePrincipal("ecs-tasks.amazonaws.com"),
managed_policies=[
iam.ManagedPolicy.from_aws_managed_policy_name(
'service-role/AmazonECSTaskExecutionRolePolicy')
])

# Setup a security group for the Fargate-generated EC2 instances.
batch_security_group = ec2.SecurityGroup(self,
f"FargateInstanceSecurityGroup-{sds_id}",
vpc=vpc)

# PRIVATE_WITH_NAT allows batch job to pull images from the ECR.
# TODO: Evaluate SPOT resources
self.compute_environment = batch.CfnComputeEnvironment(
self, f"FargateBatchComputeEnvironment-{sds_id}",
type='MANAGED',
service_role=self.role.role_arn,
compute_resources=batch.CfnComputeEnvironment.ComputeResourcesProperty(
type='FARGATE',
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we had mentioned using SPOT resources as well at one point? Do we want to make this an option in the arguments to the setup so we can decide later? Or put a NOTE: here so we can reevaluate again later?

maxv_cpus=batch_max_vcpus,
subnets=vpc.select_subnets(subnet_type=ec2.SubnetType.PRIVATE_WITH_NAT).subnet_ids,
security_group_ids=[batch_security_group.security_group_id]
)
)

# The set of compute environments mapped to a job queue
# and their order relative to each other
compute_environment_order = batch.CfnJobQueue.ComputeEnvironmentOrderProperty(
compute_environment=self.compute_environment.ref,
order=1)

repo.grant_pull_push(fargate_execution_role)

# Setup job queue
self.job_queue_name = f"{processing_step_name}-fargate-batch-job-queue"
self.job_queue = batch.CfnJobQueue(self, f"FargateBatchJobQueue-{sds_id}",
job_queue_name=self.job_queue_name,
priority=1,
compute_environment_order=[compute_environment_order])

# Batch job role, so we can later grant access to the appropriate
# S3 buckets and other resources
self.batch_job_role = iam.Role(self, f"BatchJobRole-{sds_id}",
assumed_by=iam.ServicePrincipal("ecs-tasks.amazonaws.com"),
managed_policies=[
iam.ManagedPolicy.from_aws_managed_policy_name("AmazonS3FullAccess")])
data_bucket.grant_read_write(self.batch_job_role)

#TODO: come back and add ability to grab latest version of
# processing_step_name tag. I think this will require
# setting up a lambda. Maybe there's another way?
self.job_definition_name = f"fargate-batch-job-definition{processing_step_name}"
self.job_definition = batch.CfnJobDefinition(
self, f"FargateBatchJobDefinition-{sds_id}",
job_definition_name=self.job_definition_name,
type="CONTAINER",
platform_capabilities=['FARGATE'],
container_properties={
'image': f"{repo.repository_uri}:{processing_step_name}",
'resourceRequirements': [
{
'value': str(job_memory),
'type': 'MEMORY'
},
{
'value': str(job_vcpus),
'type': 'VCPU'
}
],
'executionRoleArn': fargate_execution_role.role_arn,
'jobRoleArn': self.batch_job_role.role_arn,
},
tags={
'Purpose': 'Batch Processing'
}
)
69 changes: 69 additions & 0 deletions sds_data_manager/constructs/instrument_lambdas.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,69 @@
"""Module containing constructs for instrumenting Lambda functions."""

from pathlib import Path

from aws_cdk import Duration
from aws_cdk import aws_lambda as lambda_
from aws_cdk import aws_lambda_python_alpha as lambda_alpha_
from aws_cdk import aws_s3 as s3
from constructs import Construct


class InstrumentLambda(Construct):
"""Generic Construct with customizable runtime code
"""

def __init__(self,
scope: Construct,
construct_id: str,
processing_step_name: str,
data_bucket: s3.Bucket,
code_path: str or Path,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

since this lambda is using code_path to get its code, should we also add ability to pass an optional argument for lambda layers if it needs more libraries?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Will do.

Copy link
Contributor Author

@laspsandoval laspsandoval Oct 2, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Will come back to this later.

instrument_target: str,
instrument_sources: str):
"""
InstrumentLambda Constructor.

Parameters
----------
scope : Construct
construct_id : str
processing_step_name : str
Processing step name
data_bucket: s3.Bucket
S3 bucket
code_path : str or Path
Path to the Lambda code directory
instrument_target : str
Target data product (i.e. expected product)
instrument_sources : str
Data product sources (i.e. dependencies)
"""

super().__init__(scope, construct_id)

# Define Lambda Environment Variables
# TODO: if we need more variables change so we can pass as input
lambda_environment = {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can we take lambda environment variable dictionary as input in case we need to add more environment variable?

"S3_BUCKET": f"{data_bucket.bucket_name}",
"S3_KEY_PATH": instrument_sources,
"INSTRUMENT_TARGET": instrument_target,
"PROCESSING_NAME": processing_step_name,
"OUTPUT_PATH": f"s3://{data_bucket.bucket_name}/{instrument_target}"
}

#TODO: Add Lambda layers for more libraries (or Dockerize)
self.instrument_lambda = lambda_alpha_.PythonFunction(
self,
id=f"InstrumentLambda-{processing_step_name}",
function_name=f"{processing_step_name}",
entry=str(code_path),
index=f"instruments/{instrument_target.lower()}.py",
handler="lambda_handler",
runtime=lambda_.Runtime.PYTHON_3_11,
timeout=Duration.seconds(10),
memory_size=512,
environment=lambda_environment
)

data_bucket.grant_read_write(self.instrument_lambda)
126 changes: 126 additions & 0 deletions sds_data_manager/constructs/sdc_step_function.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,126 @@
"""
The state machine integrates with AWS Lambda and AWS Batch to execute processing
components.

Key Features:
- Configures AWS Step Functions tasks to invoke specific Lambda functions.
- Dynamically constructs ARNs for Batch job definitions and queues.
- Handles branching logic based on the success or failure of previous steps.
- Defines a comprehensive state machine for the entire data processing flow.
"""
from aws_cdk import Stack
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Might be good to include a module docstring here as well

from aws_cdk import aws_s3 as s3
from aws_cdk import aws_stepfunctions as sfn
from aws_cdk import aws_stepfunctions_tasks as tasks
from constructs import Construct

from sds_data_manager.constructs.batch_compute_resources import FargateBatchResources
from sds_data_manager.constructs.instrument_lambdas import InstrumentLambda


class SdcStepFunction(Construct):
"""Step Function Construct

Creates state machine using processing components.
"""

def __init__(self,
scope: Construct,
construct_id: str,
processing_step_name: str,
processing_system: InstrumentLambda,
batch_resources: FargateBatchResources,
instrument_target: str,
data_bucket: s3.Bucket):
"""SdcStepFunction Constructor

Parameters
----------
scope : Construct
construct_id : str
processing_step_name : str
The string identifier for the processing step
processing_system: BatchProcessingSystem
Batch processing system
batch_resources: FargateBatchResources
Fargate compute environment
instrument_target : str
Target data product
data_bucket : str
S3 bucket
"""
super().__init__(scope, construct_id)

# Reformat EventBridge Inputs
add_specifics_to_input = sfn.Pass(
self, "Reformat EventBridge Inputs",
parameters={
"TIMEOUT_TIME.$": "$.time",
}
Comment on lines +57 to +59
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

should we add/get information about file name or path that the processing step function should use? What do we use TIMEOUT_TIME for?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You are right. We will add onto this in the Trigger code I'll write. TIMEOUT_TIME is just the current time. I'll keep it as a placeholder, but it will likely change.

)

# Step Functions Tasks to invoke Lambda function
instrument_task = tasks.LambdaInvoke(self,
f"InstrumentLambda-{processing_step_name}",
lambda_function=processing_system.instrument_lambda,
payload=sfn.TaskInput.from_object(
{"TIMEOUT_TIME.$": "$.TIMEOUT_TIME"}),
result_path="$.InstrumentOutput",
result_selector={
"STATE.$": "$.Payload.STATE",
"JOB_NAME.$": "$.Payload.JOB_NAME",
"COMMAND.$": "$.Payload.COMMAND",
"OUTPUT_PATH": "$.Payload.OUTPUT_PATH",
"INSTRUMENT_TARGET":
"$.Payload.INSTRUMENT_TARGET"
})

# Batch Job Inputs
stack = Stack.of(self)
job_definition_arn = \
f'arn:aws:batch:{stack.region}:{stack.account}:job-definition/' \
f'{batch_resources.job_definition_name}'
job_queue_arn = f'arn:aws:batch:{stack.region}:{stack.account}:job-queue/' \
f'{batch_resources.job_queue_name}'

instrument_target = f"{instrument_target}"

# Batch Job
submit_job = tasks.BatchSubmitJob(
self, f"BatchJob-{processing_step_name}",
job_name=sfn.JsonPath.string_at("$.InstrumentOutput.JOB_NAME"),
job_definition_arn=job_definition_arn,
job_queue_arn=job_queue_arn,
container_overrides=tasks.BatchContainerOverrides(
command=sfn.JsonPath.list_at("$.InstrumentOutput.COMMAND"),
environment={
"OUTPUT_PATH": data_bucket.bucket_name,
"INSTRUMENT_TARGET": instrument_target
}
),
result_path='$.BatchJobOutput'
)

# Success and Fail Final States
fail_state = sfn.Fail(self, "Fail State")

# Choice State
instrument_status = sfn.Choice(self, "Success?")
# Go to Batch job
created = sfn.Condition.string_equals("$.InstrumentOutput.STATE", "SUCCESS")
instrument_status.when(created, submit_job)
instrument_status.otherwise(fail_state)

submit_job.add_catch(fail_state)

# State sequences
add_specifics_to_input.next(
instrument_task).next(
instrument_status)

# Define the state machine
definition_body = sfn.DefinitionBody.from_chainable(add_specifics_to_input)
self.state_machine = sfn.StateMachine(self,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So we get a state from the lambda, a batch job is created (which will run the container?), then that is all put into the state machine which is run in this line?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes. The Batch job will run the container from the ECR (and there will be a different container for each instrument)

f"CDKProcessingStepStateMachine-{processing_step_name}",
definition_body=definition_body,
state_machine_name=f"{processing_step_name}-step-function")
Empty file.
42 changes: 42 additions & 0 deletions sds_data_manager/lambda_code/SDSCode/instruments/l1b_codice.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
"""Lambda runtime code that triggers off of arrival of data into S3 bucket.
"""
import logging
import os
from datetime import datetime

import boto3

# Setup the logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)

#TODO: ability to access database, EFS, calibration data, etc.

def lambda_handler(event: dict, context):
"""Handler function"""
"""Handler function"""
logger.info(f"Event: {event}")
logger.info(f"Context: {context}")

now = datetime.now()
logger.info(f"Now time is: {now}")

# Get the environment variables
bucket = os.environ['S3_BUCKET']
prefix = os.environ['S3_KEY_PATH']

# Retrieves objects in the S3 bucket under the given prefix
try:
s3 = boto3.client('s3')
object_list = s3.list_objects_v2(Bucket=bucket, Prefix=prefix)["Contents"]
logger.info(f"Object list: {object_list}")
except KeyError:
logger.warning("No files present.")

#TODO: this state will change based on availability of data
#TODO: we need to think about what needs to be passed into the container
return {
"STATE": "SUCCESS",
"JOB_NAME": os.environ['PROCESSING_NAME'],
'COMMAND': ["packet-ingest"]
}
Loading