Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

First pass at processing architecture #154

Merged
merged 21 commits into from
Oct 5, 2023
Merged
Show file tree
Hide file tree
Changes from 12 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
133 changes: 133 additions & 0 deletions sds_data_manager/constructs/batch_compute_resources.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,133 @@
from aws_cdk import RemovalPolicy
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It might be good to have a module docstring here that describes the purpose of the module.

from aws_cdk import aws_batch as batch
from aws_cdk import aws_ec2 as ec2
from aws_cdk import aws_ecr as ecr
from aws_cdk import aws_iam as iam
from aws_cdk import aws_s3 as s3
from constructs import Construct


class FargateBatchResources(Construct):
"""Fargate Batch compute environment with named Job Queue, and Job Definition.
"""

def __init__(self,
scope: Construct,
construct_id: str,
sds_id: str,
vpc: ec2.Vpc,
processing_step_name: str,
archive_bucket: s3.Bucket,
security_group: classmethod,
batch_max_vcpus=10,
job_vcpus=0.25,
job_memory=512):
"""Constructor

Parameters
----------
scope : Construct
construct_id : str
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think these are missing descriptions?

sds_id : str
Name suffix for stack
vpc : ec2.Vpc
VPC into which to launch the compute instance.
processing_step_name : str
Name of data product being generated in this Batch job.
archive_bucket : s3.Bucket
S3 bucket.
security_group : classmethod
Batch processing security group.
batch_max_vcpus : int, Optional
Maximum number of virtual CPUs per compute instance.
job_vcpus : int, Optional
Number of virtual CPUs required per Batch job.
Dependent on Docker image contents.
job_memory : int: Optional
Memory required per Batch job in MB. Dependent on Docker image contents.
"""
super().__init__(scope, construct_id)

self.role = iam.Role(self, f"BatchServiceRole-{sds_id}",
assumed_by=iam.ServicePrincipal('batch.amazonaws.com'),
managed_policies=[
iam.ManagedPolicy.from_aws_managed_policy_name(
"service-role/AWSBatchServiceRole")
]
)

# Required since our task is hosted on AWS Fargate,
# is pulling container images from the ECR, and sending
# container logs to CloudWatch.
fargate_execution_role = iam.Role(self, f"FargateExecutionRole-{sds_id}",
assumed_by=iam.ServicePrincipal("ecs-tasks.amazonaws.com"),
managed_policies=[
iam.ManagedPolicy.from_aws_managed_policy_name(
'service-role/AmazonECSTaskExecutionRolePolicy')
])

# PRIVATE_WITH_NAT allows batch job to pull images from the ECR.
self.compute_environment = batch.CfnComputeEnvironment(
self, f"FargateBatchComputeEnvironment-{sds_id}",
type='MANAGED',
service_role=self.role.role_arn,
compute_resources=batch.CfnComputeEnvironment.ComputeResourcesProperty(
type='FARGATE',
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we had mentioned using SPOT resources as well at one point? Do we want to make this an option in the arguments to the setup so we can decide later? Or put a NOTE: here so we can reevaluate again later?

maxv_cpus=batch_max_vcpus,
subnets=vpc.select_subnets(subnet_type=ec2.SubnetType.PRIVATE_WITH_NAT).subnet_ids,
security_group_ids=[security_group.security_group_id]
)
)

# The set of compute environments mapped to a job queue
# and their order relative to each other
compute_environment_order = batch.CfnJobQueue.ComputeEnvironmentOrderProperty(
compute_environment=self.compute_environment.ref,
order=1)

# Define registry for storing processing docker images
self.container_registry = ecr.Repository(self, f"BatchRepository-{sds_id}",
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Are we creating ECR in this batch job stack? should we move that out into its own stack?

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we want these as private registries, or will we need to create a public registry for our L3 code and thus we should make one public registry with each of these repositories under that public namespace?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we settled on

  1. keep it private for now
  2. have different repos for instruments but not levels (but that change will come later)

Does that sound ok? So basically keeping it as is for now, but with these goals in mind.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, I think that is what I remember from that conversation as well :)

repository_name=f"{processing_step_name.lower()}-repo",
image_scan_on_push=True)

self.container_registry.apply_removal_policy(RemovalPolicy.DESTROY)
self.container_registry.grant_pull(fargate_execution_role)

# Setup job queue
self.job_queue_name = f"{processing_step_name}-fargate-batch-job-queue"
self.job_queue = batch.CfnJobQueue(self, f"FargateBatchJobQueue-{sds_id}",
job_queue_name=self.job_queue_name,
priority=1,
compute_environment_order=[compute_environment_order])

# Batch job role, so we can later grant access to the appropriate
# S3 buckets and other resources
self.batch_job_role = iam.Role(self, f"BatchJobRole-{sds_id}",
assumed_by=iam.ServicePrincipal("ecs-tasks.amazonaws.com"),
managed_policies=[
iam.ManagedPolicy.from_aws_managed_policy_name("AmazonS3FullAccess")])
archive_bucket.grant_read_write(self.batch_job_role)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If we are going through the APIs for file upload/download do we need s3 access?

It might be nice to keep this here in case we want to mount our s3 bucket into the container and avoid those upload/download if we don't need them, so just something to think about.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nice idea!


# create job definition
self.job_definition_name = f"fargate-batch-job-definition{processing_step_name}"
self.job_definition = batch.CfnJobDefinition(
self, f"FargateBatchJobDefinition-{sds_id}",
job_definition_name=self.job_definition_name,
type="CONTAINER",
platform_capabilities=['FARGATE'],
container_properties={
'image': self.container_registry.repository_uri,
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we need to include a tag as well?

'resourceRequirements': [
{
'value': str(job_memory),
'type': 'MEMORY'
},
{
'value': str(job_vcpus),
'type': 'VCPU'
}
],
'executionRoleArn': fargate_execution_role.role_arn,
'jobRoleArn': self.batch_job_role.role_arn,
},
)
67 changes: 67 additions & 0 deletions sds_data_manager/constructs/instrument_lambdas.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,67 @@
"""Module containing Constructs for instsrument Lambda functions"""
from pathlib import Path

from aws_cdk import Duration
from aws_cdk import aws_lambda as lambda_
from aws_cdk import aws_lambda_python_alpha as lambda_alpha_
from aws_cdk import aws_s3 as s3
from constructs import Construct


class InstrumentLambda(Construct):
"""Generic Construct with customizable runtime code
"""

def __init__(self,
scope: Construct,
construct_id: str,
sds_id:str,
processing_step_name: str,
archive_bucket: s3.Bucket,
code_path: str or Path,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

since this lambda is using code_path to get its code, should we also add ability to pass an optional argument for lambda layers if it needs more libraries?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Will do.

Copy link
Contributor Author

@laspsandoval laspsandoval Oct 2, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Will come back to this later.

instrument_target: str,
instrument_sources: str):
"""InstrumentLambda Constructor

Parameters
----------
scope : Construct
construct_id : str
sds_id : str
Name suffix for stack
processing_step_name : str
Processing step name
archive_bucket: s3.Bucket
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you give a background of why we are using archive_bucket in these constructs? Just curious. I feel like data that we are using in processing pipeline should be coming from the data bucket itself. But in the architecture diagram, we are sending data to database(Eg. opensearch) and to archival bucket. So thought you are passing that argument for that. Is that right?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yep. We discussed this. It should be the same bucket. Sorry for the confusion.

S3 bucket
code_path : str or Path
Path to the Lambda code directory
instrument_target : str
Target data product
instrument_sources : str
Data product sources
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you give an example in the doc string?

"""
super().__init__(scope, construct_id)

# Create Environment Variables
lambda_environment = {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can we take lambda environment variable dictionary as input in case we need to add more environment variable?

"PROCESSING_PATH": f"archive-{sds_id}",
"INSTRUMENT_SOURCES": instrument_sources,
"INSTRUMENT_TARGET": instrument_target,
"PROCESSING_NAME": processing_step_name,
"OUTPUT_PATH":f"s3://{archive_bucket.bucket_name}/{instrument_target}"
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I feel like processing path and output path should point to data bucket and not archival bucket. archival bucket syncs data bucket.

}

self.instrument_lambda = lambda_alpha_.PythonFunction(
self,
id=f"InstrumentLambda-{processing_step_name}",
function_name=f"{processing_step_name}",
entry=str(code_path),
index=f"instruments/{instrument_target}.py",
handler="lambda_handler",
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That's interesting. I didn't know you can break up path and handler like this. So entry is pointing to a parent directory, then index is pointing to the code file path within parent directory and finally, handler is referring to a function in the code file. Is that right?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yes :)

runtime=lambda_.Runtime.PYTHON_3_9,
timeout=Duration.minutes(10),
memory_size=1000,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can we take those value as input too to make this construct more flexible?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes. Those are the defaults, but we can pass in anything we would like.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
runtime=lambda_.Runtime.PYTHON_3_9,
timeout=Duration.minutes(10),
memory_size=1000,
runtime=lambda_.Runtime.PYTHON_3_11,
timeout=Duration.seconds(10),
memory_size=512,

This is the lambda that is just checking if files are available right? Lets make sure this is fast and set low timeout and memory use unless we find it is failing.

environment=lambda_environment
)

archive_bucket.grant_read_write(self.instrument_lambda)
116 changes: 116 additions & 0 deletions sds_data_manager/constructs/sdc_step_function.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,116 @@
from aws_cdk import Stack
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Might be good to include a module docstring here as well

from aws_cdk import aws_s3 as s3
from aws_cdk import aws_stepfunctions as sfn
from aws_cdk import aws_stepfunctions_tasks as tasks
from constructs import Construct

from sds_data_manager.constructs.batch_compute_resources import FargateBatchResources
from sds_data_manager.constructs.instrument_lambdas import InstrumentLambda


class SdcStepFunction(Construct):
"""Step Function Construct

Creates state machine using processing components.
"""

def __init__(self,
scope: Construct,
construct_id: str,
processing_step_name: str,
processing_system: InstrumentLambda,
batch_resources: FargateBatchResources,
instrument_target: str,
archive_bucket: s3.Bucket):
"""SdcStepFunction Constructor

Parameters
----------
scope : Construct
construct_id : str
processing_step_name : str
The string identifier for the processing step
processing_system: BatchProcessingSystem
Batch processing system
batch_resources: FargateBatchResources
Fargate compute environment
instrument_target : str
Target data product
archive_bucket : str
S3 bucket
"""
super().__init__(scope, construct_id)
print(archive_bucket.bucket_name)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

did you mean to remove this?

# Reformat EventBridge Inputs
add_specifics_to_input = sfn.Pass(
self, "Reformat EventBridge Inputs",
parameters={
"TIMEOUT_TIME.$": "$.time",
}
Comment on lines +57 to +59
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

should we add/get information about file name or path that the processing step function should use? What do we use TIMEOUT_TIME for?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You are right. We will add onto this in the Trigger code I'll write. TIMEOUT_TIME is just the current time. I'll keep it as a placeholder, but it will likely change.

)

# Step Functions Tasks to invoke Lambda function
instrument_task = tasks.LambdaInvoke(self,
f"InstrumentLambda-{processing_step_name}",
lambda_function=processing_system.instrument_lambda,
payload=sfn.TaskInput.from_object(
{"TIMEOUT_TIME.$": "$.TIMEOUT_TIME"}),
result_path="$.InstrumentOutput",
result_selector={
"STATE.$": "$.Payload.STATE",
"JOB_NAME.$": "$.Payload.JOB_NAME",
"COMMAND.$": "$.Payload.COMMAND",
"OUTPUT_PATH": "$.Payload.OUTPUT_PATH",
"INSTRUMENT_TARGET":
"$.Payload.INSTRUMENT_TARGET"
})

# Batch Job Inputs
stack = Stack.of(self)
job_definition_arn = \
f'arn:aws:batch:{stack.region}:{stack.account}:job-definition/' \
f'{batch_resources.job_definition_name}'
job_queue_arn = f'arn:aws:batch:{stack.region}:{stack.account}:job-queue/' \
f'{batch_resources.job_queue_name}'

instrument_target = f"{instrument_target}"

# Batch Job
submit_job = tasks.BatchSubmitJob(
self, f"BatchJob-{processing_step_name}",
job_name=sfn.JsonPath.string_at("$.InstrumentOutput.JOB_NAME"),
job_definition_arn=job_definition_arn,
job_queue_arn=job_queue_arn,
container_overrides=tasks.BatchContainerOverrides(
command=sfn.JsonPath.list_at("$.InstrumentOutput.COMMAND"),
environment={
"OUTPUT_PATH": archive_bucket.bucket_name,
"INSTRUMENT_TARGET": instrument_target
}
),
result_path='$.BatchJobOutput'
)

# Success and Fail Final States
fail_state = sfn.Fail(self, "Fail State")

# Choice State
instrument_status = sfn.Choice(self, "Success?")
# Go to Batch job
created = sfn.Condition.string_equals("$.InstrumentOutput.STATE", "SUCCESS")
instrument_status.when(created, submit_job)
instrument_status.otherwise(fail_state)

submit_job.add_catch(fail_state)

# State sequences
add_specifics_to_input.next(
instrument_task).next(
instrument_status)

# Define the state machine
definition_body = sfn.DefinitionBody.from_chainable(add_specifics_to_input)
self.state_machine = sfn.StateMachine(self,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So we get a state from the lambda, a batch job is created (which will run the container?), then that is all put into the state machine which is run in this line?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes. The Batch job will run the container from the ECR (and there will be a different container for each instrument)

f"CDKProcessingStepStateMachine-{processing_step_name}",
definition_body=definition_body,
state_machine_name=f"{processing_step_name}-step-function")
Empty file.
37 changes: 37 additions & 0 deletions sds_data_manager/lambda_code/SDSCode/instruments/l1b_Codice.py
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is the plan for each step to get it's own Lambda then?

Or, alternatively is this generic enough that we could abstract it out into a single lambda that each step would just input the files it needs?

I'm thinking something like this could be a generic data_checker_lambda.py
Then you would iterate through the INSTRUMENT_SOURCES, or add some information to the context/event you trigger with that could be updated (I think you can control what you send via step functions? So this could be abstracted over to there possibly)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think what we settled on was a lambda for each instrument, but not each level.

Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
"""Lambda runtime code that triggers off of arrival of data into S3 bucket.
"""
import os
from datetime import datetime

import boto3

#TODO: ability to access database, EFS, calibration data, etc.

def lambda_handler(event: dict, context):
"""Handler function"""
print(event)
print(context)

now = datetime.now()
print("Now time is")
print(now)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

log these so we get the time automatically?


# Get the environment variables
bucket = os.environ['PROCESSING_PATH']
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I didn't know PROCESSING_PATH was bucket name. can we name it to S3_BUCKET or something?

prefix = os.environ['INSTRUMENT_SOURCES']
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

rename this to S3_KEY_PATH?


# Retrieves objects in the S3 bucket under the given prefix
try:
s3 = boto3.client('s3')
object_list = s3.list_objects_v2(Bucket=bucket, Prefix=prefix)["Contents"]
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is the end goal for this to confirm that the files needed for this level of processing are returned in the object list? Or will that be handled by event bridge?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We will do the following:

  1. Trigger the step function using EventBridge when new objects are created in s3 bucket or EFS
  2. Each instrument will check to see if enough data is available for processing using the lambda (this is where the trigger gets refined)
  3. Based on the results of the lambda the Batch Job will run

This will be worked out with the EventBridge code I'll work on next.

print(object_list)
except KeyError:
print("No files present.")
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should this alter our STATE below? If nothing found: FAIL.

I see, below you do add that as a TODO.


#TODO: this state will change based on availability of data
#TODO: we need to think about what needs to be passed into the container
return {
"STATE": "SUCCESS",
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is the success state just indicating that the files exist?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes. This is where each instrument will check to make certain that processing is ready.

"JOB_NAME": os.environ['PROCESSING_NAME'],
'COMMAND': ["packet-ingest"]
}
37 changes: 37 additions & 0 deletions sds_data_manager/lambda_code/SDSCode/instruments/l1c_Codice.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
"""Lambda runtime code that triggers off of arrival of data into S3 bucket.
"""
import os
from datetime import datetime

import boto3

#TODO: ability to access database, EFS, calibration data, etc.

def lambda_handler(event: dict, context):
"""Handler function"""
print(event)
print(context)

now = datetime.now()
print("Now time is")
print(now)

# Get the environment variables
bucket = os.environ['PROCESSING_PATH']
prefix = os.environ['INSTRUMENT_SOURCES']

# Retrieves objects in the S3 bucket under the given prefix
try:
s3 = boto3.client('s3')
object_list = s3.list_objects_v2(Bucket=bucket, Prefix=prefix)["Contents"]
print(object_list)
except KeyError:
print("No files present.")

#TODO: this state will change based on availability of data
#TODO: we need to think about what needs to be passed into the container
return {
"STATE": "SUCCESS",
"JOB_NAME": os.environ['PROCESSING_NAME'],
'COMMAND': ["packet-ingest"]
}
Loading