Skip to content

Commit

Permalink
remove and update SPICE files ingestion and how it's written to EFS (#…
Browse files Browse the repository at this point in the history
  • Loading branch information
tech3371 authored Nov 22, 2024
1 parent 9a008ce commit 1b7ae8b
Show file tree
Hide file tree
Showing 2 changed files with 22 additions and 51 deletions.
6 changes: 3 additions & 3 deletions sds_data_manager/constructs/efs_construct.py
Original file line number Diff line number Diff line change
Expand Up @@ -174,7 +174,7 @@ def __init__(
)

# This access point is used by other resources to read from EFS
lambda_mount_path = "/mnt/spice"
spice_mount_path = "/mnt/spice"

self.efs_spice_ingest_lambda = aws_lambda.Function(
self,
Expand All @@ -191,12 +191,12 @@ def __init__(
vpc=vpc,
# Mount EFS access point to /mnt/data within the lambda
filesystem=aws_lambda.FileSystem.from_efs_access_point(
efs_construct.spice_access_point, lambda_mount_path
efs_construct.spice_access_point, spice_mount_path
),
timeout=Duration.minutes(1),
architecture=aws_lambda.Architecture.ARM_64,
environment={
"EFS_MOUNT_PATH": lambda_mount_path,
"EFS_SPICE_MOUNT_PATH": spice_mount_path,
},
)

Expand Down
67 changes: 19 additions & 48 deletions sds_data_manager/lambda_code/efs_lambda/lambda_function.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
"""Functions for EFS lambdas."""
"""Functions to write SPICE ingested files to EFS."""

import logging
import os
Expand All @@ -10,10 +10,7 @@
logger.setLevel(logging.INFO)

# Define the paths
mount_path = Path(os.getenv("EFS_MOUNT_PATH"))

attitude_symlink_path = mount_path / "latest_attitude_kernel.ah.a"
ephemeris_symlink_path = mount_path / "latest_ephemeris_kernel.bsp"
spice_mount_path = Path(os.getenv("EFS_SPICE_MOUNT_PATH")) # Eg. /mnt/spice


def create_symlink(source_path: Path, destination_path: Path) -> None:
Expand Down Expand Up @@ -45,58 +42,32 @@ def write_data_to_efs(s3_key: str, s3_bucket: str):
The S3 bucket
"""
filename = os.path.basename(s3_key)

# Create an S3 client
s3_client = boto3.client("s3")

# Download the file to the mount directory. Eg. /mnt/efs
download_path = mount_path / filename
# Remove 'spice/' prefix from the s3 key. See key example below.
# Eg. spice/spin/imap_2025_122_2025_122_02.spin.csv
# Keep remaining folder path after `spice/` to match the folder structure
# defined in imap-data-access library.
s3_folder_path = os.path.dirname(s3_key).replace("spice/", "")
filename = os.path.basename(s3_key)
# Download path to EFS
efs_spice_path = spice_mount_path / s3_folder_path

try:
# Download the file from S3
s3_client.download_file(s3_bucket, s3_key, download_path)
logger.debug(f"File downloaded: {download_path}")
# Create the folder if it does not exist
efs_spice_path.mkdir(parents=True, exist_ok=True)
# Download file from S3 to the EFS path
s3_client.download_file(s3_bucket, s3_key, efs_spice_path / filename)
logger.info(f"{s3_key} file downloaded successfully")
except Exception as e:
logger.error(f"Error downloading file: {e!s}")

logger.debug("After downloading file: %s", os.listdir(mount_path))

# TODO: we only want historical attitude kernels delivery
# following each track (3/wk)
# Make certain it does not start with imap_pred_ (only imap_)
# https://confluence.lasp.colorado.edu/display/IMAP/IMAP+POC+External+Reference+Documents
# pg 17 of 7516-9163 MOC Data Products Guide
# Historical attitude naming convention is this:
# imap_yyyy_doy_yyyy_doy_##.ah.bc and
# imap_yyyy_doy_yyyy_doy_##.ah.a
if (
filename.endswith(".ah.a")
or filename.endswith(".ah.bc")
and not filename.startswith("imap_pred_")
):
create_symlink(download_path, attitude_symlink_path)

# TODO: reconstructed would be ideal (change to imap_recon)
# Reconstructed delivered 1/wk
# But nom is least ideal (after burn and pred)
# Ephemeris naming convention is this:
# Eg. imap_nom_yyyymmdd_yyyymmdd_v##.bsp
# The reason we check startswith is because other ephemeris
# kernel has different prefix. Eg.
# imap_recon_yyyymmdd_yyyymmdd_v##.bsp
# imap_burn_yyyymmdd_yyyymmdd_v##.bsp
# imap_pred_yyyymmdd_yyyymmdd_v##.bsp
elif filename.startswith("imap_recon") and filename.endswith(".bsp"):
create_symlink(download_path, ephemeris_symlink_path)
elif filename.startswith("imap_burn") and filename.endswith(".bsp"):
create_symlink(download_path, ephemeris_symlink_path)
elif filename.startswith("imap_pred") and filename.endswith(".bsp"):
create_symlink(download_path, ephemeris_symlink_path)
logger.info("File was written to EFS path: %s", efs_spice_path)


def lambda_handler(event, context):
"""Lambda is triggered by eventbridge.
"""Lambda is triggered by eventbridge.
Input looks like this:
{
Expand All @@ -116,7 +87,7 @@ def lambda_handler(event, context):
"name": "sds-data-449431850278"
},
"object": {
"key": "imap_nom_20231024_20231025_v00.bsp",
"key": "spice/spin/imap_2025_122_2025_122_02.spin.csv",
"size": 8,
"etag": "fd33e2e8ad3cb1bdd3ea8f5633fcf5c7",
"version-id": "w9eElv_lFFeEbifMabOBHjtJl9Ori_At",
Expand Down Expand Up @@ -146,7 +117,7 @@ def lambda_handler(event, context):
# Retrieve the S3 bucket and key from the event
s3_bucket = event["detail"]["bucket"]["name"]
s3_key = event["detail"]["object"]["key"]
logger.debug(event)
logger.info(event)

write_data_to_efs(s3_key, s3_bucket)

Expand Down

0 comments on commit 1b7ae8b

Please sign in to comment.