Skip to content

Commit

Permalink
Dependency Association updates (#416)
Browse files Browse the repository at this point in the history
- created dependency class to store valid values
- refactoring dependency code into it's own file and updated batch starter lambda code
- added TODO for other scenario in batch starter lambda code
- wrote dependency code to return reponse like API for future work
  • Loading branch information
tech3371 authored Dec 17, 2024
1 parent 7bf4cb4 commit 94793a3
Show file tree
Hide file tree
Showing 6 changed files with 679 additions and 196 deletions.
276 changes: 163 additions & 113 deletions sds_data_manager/lambda_code/SDSCode/pipeline_lambdas/batch_starter.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,13 +5,16 @@
from datetime import datetime

import boto3
import imap_data_access
from imap_data_access import ScienceFilePath
from sqlalchemy import select
from sqlalchemy.exc import IntegrityError

from ..database import database as db
from ..database import models
from . import dependency_config

# import dependency
from ..pipeline_lambdas import dependency

# Logger setup
logger = logging.getLogger(__name__)
Expand All @@ -21,38 +24,6 @@
BATCH_CLIENT = boto3.client("batch", region_name="us-west-2")


def get_dependencies(node, direction, relationship):
"""Lookup the dependencies for the given ``node``.
A ``node`` is an identifier of the data product, which can be an
(instrument, data_level, descriptor) tuple, SPICE file identifiers,
or ancillary data file identifiers.
Parameters
----------
node : tuple
Quantities that uniquely identify a data product.
direction : str
Whether it's UPSTREAM or DOWNSTREAM dependency.
relationship : str
Whether it's HARD or SOFT dependency.
HARD means it's required and SOFT means it's nice to have.
Returns
-------
dependencies : list
List of dictionary containing the dependency information.
"""
dependencies = dependency_config.DEPENDENCIES[relationship][direction].get(node, [])
# Add keys for a dict-like representation
dependencies = [
{"instrument": dep[0], "data_level": dep[1], "descriptor": dep[2]}
for dep in dependencies
]

return dependencies


def get_file(session, instrument, data_level, descriptor, start_date, version):
"""Query to database to get the first ScienceFiles record.
Expand Down Expand Up @@ -99,45 +70,6 @@ def get_file(session, instrument, data_level, descriptor, start_date, version):
return record


def get_downstream_dependencies(session, filename_components):
"""Get information of downstream dependents.
Parameters
----------
session : orm session
Database session.
filename_components : dict
Dictionary containing components of the filename.
Returns
-------
downstream_dependents : list of dict
Dictionary containing components with dates and versions appended.
"""
# Get downstream dependency data
downstream_dependents = get_dependencies(
node=(
filename_components["instrument"],
filename_components["data_level"],
filename_components["descriptor"],
),
direction="DOWNSTREAM",
relationship="HARD",
)

for dependent in downstream_dependents:
# TODO: query the version table here for appropriate version
# of each downstream_dependent.
dependent["version"] = filename_components["version"] # placeholder

# TODO: add repointing table query if dependent is ENA or GLOWS
# Use start_date to query repointing table.
# Add pointing number to dependent.
dependent["start_date"] = filename_components["start_date"]

return downstream_dependents


def is_job_in_processing_table(
session: db.Session,
instrument: str,
Expand Down Expand Up @@ -187,7 +119,7 @@ def is_job_in_processing_table(
return False


def try_to_submit_job(session, job_info):
def try_to_submit_job(session, job_info, start_date, version):
"""Try to submit a batch job with the given job information.
Go through the job information to retrieve all necessary input files
Expand All @@ -200,17 +132,19 @@ def try_to_submit_job(session, job_info):
Database session.
job_info : dict
Dictionary containing components with dates and versions appended.
start_date : str
Start date of the data.
version : str
Version of the data.
Returns
-------
bool
Whether or not this job is ready to be processed.
"""
instrument = job_info["instrument"]
data_level = job_info["data_level"]
instrument = job_info["data_source"]
data_level = job_info["data_type"]
descriptor = job_info["descriptor"]
start_date = job_info["start_date"]
version = job_info["version"]

logger.info("Checking for job in progress before looking for dependencies.")

Expand All @@ -229,38 +163,45 @@ def try_to_submit_job(session, job_info):
return

# Find the files that this job depends on
upstream_dependencies = get_dependencies(
node=(instrument, data_level, descriptor),
direction="UPSTREAM",
relationship="HARD",
)
dependency_event_msg = {
"data_source": instrument,
"data_type": data_level,
"descriptor": descriptor,
"dependency_type": "UPSTREAM",
"relationship": "HARD",
}

# TODO: call dependency lambda when it's implemented
dependency_response = dependency.lambda_handler(dependency_event_msg, None)

upstream_dependencies = json.loads(dependency_response["body"])

if dependency_response["statusCode"] != 200:
logger.error(f"Dependency query failed with {upstream_dependencies}")
return {"statusCode": 500, "body": "Dependency query failed"}

for upstream_dependency in upstream_dependencies:
upstream_instrument = upstream_dependency["instrument"]
upstream_data_level = upstream_dependency["data_level"]
upstream_source = upstream_dependency["data_source"]
upstream_data_type = upstream_dependency["data_type"]
upstream_descriptor = upstream_dependency["descriptor"]

# Check to see if each upstream dependency file is available
# TODO: Update start_date / version request to be more specific
# Currently we are using the same as the job product, but the versions
# may not match exactly if one dependency updates before another
upstream_start_date = start_date
upstream_version = version
upstream_dependency.update(
{"start_date": upstream_start_date, "version": upstream_version}
)
record = get_file(
session,
upstream_instrument,
upstream_data_level,
upstream_source,
upstream_data_type,
upstream_descriptor,
upstream_start_date,
upstream_version,
)
if record is None:
logger.info(
f"Dependency not found: {upstream_instrument}, "
f"{upstream_data_level}, "
f"Dependency not found: {upstream_source}, "
f"{upstream_data_type}, "
f"{upstream_descriptor}, "
f"{upstream_start_date}, "
f"{upstream_version}"
Expand Down Expand Up @@ -292,14 +233,46 @@ def try_to_submit_job(session, job_info):
f"Wrote job INPROGRESS to Processing Jobs Table with id: {processing_job.id}"
)

# FYI, these are the keys the upstream_dependencies should contain:
# {
# TODO: change the upstream dependency keys as needed in the future based
# on needs. Right now, we are keeping same as before to reduce complexity.
# FYI, upstream_dependencies in the command below should contain these keys:
# 'instrument',
# 'data_level',
# 'descriptor',
# 'start_date',
# 'version'
# Example list of upstream_dependencies in the command below:
# [
# {
# 'instrument': 'swe',
# 'data_level': 'l0',
# 'descriptor': 'lveng-hk',
# 'data_level': 'l1b',
# 'descriptor': 'sci',
# 'start_date': '20231212',
# 'version': 'v001',
# },
# },
# {
# 'instrument': 'sc_attitude',
# 'data_level': 'spice',
# 'descriptor': 'historical',
# 'start_date': '20231212',
# 'version': '01',
# },
# ]

# Reformat the upstream dependencies from dependency call to match
# what batch job expects. Change 'data_source' to 'instrument' and
# 'data_type' to 'data_level'.
upstream_dependencies = [
{
"instrument": dep["data_source"],
"data_level": dep["data_type"],
"descriptor": dep["descriptor"],
"start_date": dep["start_date"],
"version": dep["version"],
}
for dep in upstream_dependencies
]

batch_command = [
"--instrument",
instrument,
Expand Down Expand Up @@ -337,27 +310,104 @@ def try_to_submit_job(session, job_info):


def lambda_handler(events: dict, context):
"""Lambda handler."""
"""Lambda handler.
This lambda is triggered by different events.
1. Event of a new science or ancillary file arrival from indexer lambda.
Example event:
{
"DetailType": "Processed File",
"Source": "imap.lambda",
"Detail": {
"object": {
"key": str,
"instrument": str,
}
}
}
TODO: We will need to add checks for ancillary files.
2. Event of a new spice file arrival from spice indexer lambda.
TODO: This will be implemented in the future.
3. Event of a new science reprocessing.
TODO: This will be implemented in the future.
4. Event of bulk processing of science in normal processing.
TODO: This will be implemented in the future.
Parameters
----------
events : dict
Event input
context : LambdaContext
Lambda context object
"""
logger.info(f"Events: {events}")
logger.info(f"Context: {context}")

with db.Session() as session:
# Since the SQS events can be batched together, we need to loop through
# each event. In this loop, "event" represents one file landing.
for event in events["Records"]:
# Event details:
logger.info(f"Individual event: {event}")
body = json.loads(event["body"])
# Since the SQS events can be batched together, we need to loop through
# each event. In this loop, "event" represents one file landing.
for event in events["Records"]:
# Event details:
logger.info(f"Individual event: {event}")
body = json.loads(event["body"])

filename = body["detail"]["object"]["key"]
logger.info(f"Retrieved filename: {filename}")
components = ScienceFilePath.extract_filename_components(filename)
filename = body["detail"]["object"]["key"]
logger.info(f"Retrieved filename: {filename}")

# Potential jobs are the instruments that depend on the current file.
potential_jobs = get_downstream_dependencies(session, components)
logger.info(
f"Potential jobs found [{len(potential_jobs)}]: {potential_jobs}"
dependency_event_msg = {
"dependency_type": "DOWNSTREAM",
"relationship": "HARD",
}

# TODO: decide how we want to set start date and version
# for SPICE or ancillary files or sciece files
# during reprocessing or bulk processing. Should we bring back
# end_date?
start_date = ""
version = ""

# TODO: How to handle repointing

# Try to create a science file first
file_obj = None

try:
file_obj = imap_data_access.ScienceFilePath(filename)
components = ScienceFilePath.extract_filename_components(filename)
start_date = components["start_date"]
version = components["version"]

dependency_event_msg.update(
{
"data_source": components["instrument"],
"data_type": components["data_level"],
"descriptor": components["descriptor"],
}
)
except imap_data_access.ScienceFilePath.InvalidScienceFileError as e:
# No science file type matched, return an error with the
# exception message indicating how to fix it to the user
logger.error(str(e))
pass

# TODO: add ancillary file handling here
if file_obj is None:
raise ValueError(f"File handling {filename} is not implemented yet")

logger.info(f"Sending this event to dependency query: {dependency_event_msg}")
# Potential jobs are the instruments that depend on the current file,
# which are the downstream dependencies.
# TODO: call dependency lambda when it's implemented
dependency_response = dependency.lambda_handler(dependency_event_msg, None)

logger.info(f"Dependency query response: {dependency_response}")
potential_jobs = json.loads(dependency_response["body"])

if dependency_response["statusCode"] != 200:
logger.error(f"Dependency query failed with {potential_jobs}")
raise ValueError("Dependency query failed")

logger.info(f"Potential jobs found [{len(potential_jobs)}]: {potential_jobs}")

with db.Session() as session:
for job in potential_jobs:
try_to_submit_job(session, job)
try_to_submit_job(session, job, start_date, version)
Loading

0 comments on commit 94793a3

Please sign in to comment.