Skip to content

Commit

Permalink
Indexer lambda update post demo (#253)
Browse files Browse the repository at this point in the history
- Update s3 event rule to listen for all imap/ folder
- Decouple StatusTracking and FileCatalog table
- updated event handlers to reflect above changes
  • Loading branch information
tech3371 authored Feb 14, 2024
1 parent 8cc3735 commit 5a057ea
Show file tree
Hide file tree
Showing 7 changed files with 165 additions and 225 deletions.
14 changes: 9 additions & 5 deletions sds_data_manager/lambda_code/SDSCode/database/models.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,6 @@
Boolean,
Column,
DateTime,
ForeignKey,
Identity,
Integer,
String,
Expand Down Expand Up @@ -114,8 +113,15 @@ class StatusTracking(Base):
id = Column(Integer, Identity(start=1, increment=1), primary_key=True)
file_path_to_create = Column(String, nullable=False)
status = Column(STATUSES, nullable=False)
# TODO:
# Didn't make it required field yet. Revisit this
# post discussion
instrument = Column(INSTRUMENTS)
data_level = Column(DATA_LEVELS)
job_definition = Column(String)
ingestion_date = Column(DateTime)
job_log_stream_id = Column(String)
container_image = Column(String)
container_command = Column(String)


class FileCatalog(Base):
Expand Down Expand Up @@ -144,9 +150,7 @@ class FileCatalog(Base):
end_date = Column(DateTime, nullable=False)
version = Column(String(8), nullable=False)
extension = Column(EXTENSIONS, nullable=False)
status_tracking_id = Column(
Integer, ForeignKey("status_tracking.id"), nullable=False
)
ingestion_date = Column(DateTime)


class PreProcessingDependency(Base):
Expand Down
246 changes: 93 additions & 153 deletions sds_data_manager/lambda_code/SDSCode/indexer.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,3 @@
import datetime
import json
import logging
import os
Expand Down Expand Up @@ -36,17 +35,13 @@ def get_file_creation_date(file_path):

# Retrieve the metadata of the object
bucket_name = os.environ.get("S3_DATA_BUCKET")
key = file_path
logger.info(f"bucket_name: {bucket_name}")
logger.info(f"key: {key}")
logger.info(f"looking up ingestion date for {file_path}")

response = s3_client.head_object(Bucket=bucket_name, Key=key)
response = s3_client.head_object(Bucket=bucket_name, Key=file_path)
file_creation_date = response["LastModified"]

# time looks like this:
# LastModified looks like this:
# 2024-01-25 23:35:26+00:00
# Formats the datetime object to a string with the format "%Y%m%d".
# ingestion_data_str = file_creation_date.strftime("%Y%m%d")
return file_creation_date


Expand Down Expand Up @@ -168,6 +163,10 @@ def send_event_from_indexer(filename):
def s3_event_handler(event):
"""Handler function for S3 events.
S3 event handler takes s3 event and then writes information to
file catalog table. It also sends event to the batch starter
lambda once it finishes writing information to database.
Parameters
----------
event : dict
Expand All @@ -186,72 +185,36 @@ def s3_event_handler(event):
# TODO: add checks for SPICE or other
# data types

try:
science_file = ScienceFilepathManager(filename)
except InvalidScienceFileError as e:
logger.error(str(e))
return http_response(status_code=400, body=str(e))

# Check if the file is a valid science file or not
# TODO: change these lines once filename validator
# is implemented on sds-data-access repo and released
science_file = ScienceFilepathManager(filename)
# setup a dictionary of metadata parameters to unpack in the
# file catalog table
# file catalog table. Eg.
# {
# "file_path": None,
# "instrument": self.instrument,
# "data_level": self.data_level,
# "descriptor": self.descriptor,
# "start_date": datetime.strptime(self.startdate, "%Y%m%d"),
# "end_date": datetime.strptime(self.enddate, "%Y%m%d"),
# "version": self.version,
# "extension": self.extension,
# "ingestion_date": date_object,
# }
metadata_params = science_file.get_file_metadata_params()
metadata_params["file_path"] = s3_filepath

if metadata_params["data_level"] != "l0":
logger.error("Invalid data level. Expected l0")
return http_response(status_code=400, body="Invalid data level")

# Add data to the file catalog and status tables
# create status params
# Event time looks like this:
# "time": "2024-01-16T17:35:08Z"
# Parses the time string from the event to a datetime object.
ingestion_date = datetime.datetime.strptime(event["time"], "%Y-%m-%dT%H:%M:%SZ")
# Formats the datetime object to a string with the format "%Y%m%d".
ingestion_data_str = ingestion_date.strftime("%Y%m%d")
# Parses the newly formatted string back into a datetime
# object with the desired structure.
ingestion_date_object = datetime.datetime.strptime(ingestion_data_str, "%Y%m%d")
status_params = {
"file_path_to_create": s3_filepath,
"status": models.Status.SUCCEEDED,
"job_definition": None,
"ingestion_date": ingestion_date_object,
}
try:
logger.info(f"Inserting {filename} into database")
update_status_table(status_params)
logger.info("Wrote data to status table")
except Exception as e:
logger.error(str(e))
return http_response(status_code=400, body=str(e))

# Query to get foreign key id for catalog table
status_tracking = None
with Session(db.get_engine()) as session:
# Query to get foreign key id for catalog table
query = select(models.StatusTracking.__table__).where(
models.StatusTracking.file_path_to_create == s3_filepath
)

status_tracking = session.execute(query).first()
ingestion_date_object = get_file_creation_date(s3_filepath)

if status_tracking is None:
logger.error("No status tracking record found")
return http_response(status_code=400, body="No status tracking record found")
logger.info(f"Found record in status table associated with - {filename}")
try:
metadata_params["status_tracking_id"] = status_tracking.id
update_file_catalog_table(metadata_params)
logger.info("Wrote data to file catalog table")
except Exception as e:
logger.error(str(e))
return http_response(status_code=400, body=str(e))
metadata_params["ingestion_date"] = ingestion_date_object
update_file_catalog_table(metadata_params)
logger.info("Wrote data to file catalog table")

# Send L0 event from this lambda for Batch starter
# Send event from this lambda for Batch starter
# lambda
send_event_from_indexer(filename)
return http_response(status_code=200, body="Success")
logger.debug("S3 event handler complete")


def batch_event_handler(event):
Expand Down Expand Up @@ -297,77 +260,48 @@ def batch_event_handler(event):
"""
command = event["detail"]["container"]["command"]

# Get event inputs ready
file_path = command[5]
filename = os.path.basename(file_path)

# TODO: post demo, revisit this and improve it
if event["detail"]["status"] == "SUCCEEDED":
# Frist write to status table and then
# write to file catalog with foreign key
# information
try:
# query and update status table record with new
# information from batch
with Session(db.get_engine()) as session:
# Had to query this way because the select statement
# returns a RowProxy object when it executes it,
# not the actual StatusTracking model instance,
# which is why it can't update table row directly.
result = (
session.query(models.StatusTracking)
.filter(models.StatusTracking.file_path_to_create == file_path)
.first()
)
# Get filename from batch job command
file_path_to_create = command[5]
# Get job status
job_status = (
models.Status.SUCCEEDED
if event["detail"]["status"] == "SUCCEEDED"
else models.Status.FAILED
)

# update three fields with updated information
result.status = models.Status.SUCCEEDED
result.job_definition = event["detail"]["jobDefinition"]
result.ingestion_date = get_file_creation_date(file_path)
session.commit()

# Then write to file catalog table
sci_file = ScienceFilepathManager(filename)
metadata_params = sci_file.get_file_metadata_params()
metadata_params["file_path"] = file_path
metadata_params["status_tracking_id"] = result.id
update_file_catalog_table(metadata_params)
# Send event from this lambda for Batch starter
# lambda
response = send_event_from_indexer(filename)
logger.info(f"Sent event to EventBridge - {response}")

except Exception as e:
logger.error(str(e))
return http_response(status_code=400, body=str(e))

elif event["detail"]["status"] == "FAILED":
try:
# Update only status table with Failed status
with Session(db.get_engine()) as session:
# Had to query this way because the select statement
# returns a RowProxy object when it executes it,
# not the actual StatusTracking model instance,
# which is why it can't update table row directly.
result = (
session.query(models.StatusTracking)
.filter(models.StatusTracking.file_path_to_create == file_path)
.first()
)
with Session(db.get_engine()) as session:
# Had to query this way because the select statement
# returns a RowProxy object when it executes it,
# not the actual StatusTracking model instance,
# which is why it can't update table row directly.
result = (
session.query(models.StatusTracking)
.filter(models.StatusTracking.file_path_to_create == file_path_to_create)
.first()
)

# update two fields with updated information
result.status = models.Status.FAILED
result.job_definition = event["detail"]["jobDefinition"]
session.commit()
except Exception as e:
logger.error(str(e))
return http_response(status_code=400, body=str(e))
if result is None:
logger.info(
"No existing record found, creating"
" new record for {file_path_to_create}"
)
status_params = {
"file_path_to_create": file_path_to_create,
"status": job_status,
}
update_status_table(status_params)
result = (
session.query(models.StatusTracking)
.filter(
models.StatusTracking.file_path_to_create == file_path_to_create
)
.first()
)

else:
# Technically, we shouldn't get other job status since event
# bridge filters out only succeeded or failed status.
logger.error("Unknown batch job status")
return http_response(status_code=400, body="Unknown batch job status")
result.status = job_status
result.job_definition = event["detail"]["jobDefinition"]
# TODO: get other information post discussion
session.commit()

return http_response(status_code=200, body="Success")

Expand Down Expand Up @@ -403,28 +337,39 @@ def custom_event_handler(event):
filename = os.path.basename(file_path_to_create)
logger.info(f"Attempting to insert {filename} into database")

try:
_ = ScienceFilepathManager(filename)
except InvalidScienceFileError as e:
logger.error(str(e))
return http_response(status_code=400, body=str(e))
ScienceFilepathManager(filename)

# Write event information to status tracking table.
logger.info(f"Inserting {filename} into database")
status_params = {
"file_path_to_create": file_path_to_create,
"status": models.Status.INPROGRESS,
"job_definition": None,
"ingestion_date": None,
}
try:
update_status_table(status_params)
except Exception as e:
return http_response(status_code=400, body=str(e))
update_status_table(status_params)

logger.debug("Wrote data to status tracking table")
return http_response(status_code=200, body="Success")


# Handlers mapping
event_handlers = {
"aws.s3": s3_event_handler,
"aws.batch": batch_event_handler,
"imap.lambda": custom_event_handler,
}


def handle_event(event, handler):
"""Common event handling logic."""
try:
handler(event)
return http_response(status_code=200, body="Success")
except InvalidScienceFileError as e:
logger.error(str(e))
return http_response(status_code=400, body=str(e))


def lambda_handler(event, context):
"""Handler function for creating metadata, adding it to the database.
Expand All @@ -443,16 +388,11 @@ def lambda_handler(event, context):
and runtime environment.
"""
logger.info("Received event: " + json.dumps(event, indent=2))
source = event.get("source")

# L0 data or other data type such as SPICE data ingestion
if event["source"] == "aws.s3":
return s3_event_handler(event)
# Data processing status
elif event["source"] == "aws.batch":
return batch_event_handler(event)
# Data processing initiation
elif event["source"] == "imap.lambda":
return custom_event_handler(event)
handler = event_handlers.get(source)
if handler:
return handle_event(event, handler)
else:
logger.error("Unknown event source")
return http_response(status_code=400, body="Unknown event source")
6 changes: 4 additions & 2 deletions sds_data_manager/lambda_code/SDSCode/query_api.py
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ def lambda_handler(event, context):
valid_parameters = [
column.key
for column in models.FileCatalog.__table__.columns
if column.key not in ["id", "status_tracking_id"]
if column.key not in ["id"]
]
# go through each query parameter to set up sqlalchemy query conditions
for param, value in query_params.items():
Expand Down Expand Up @@ -90,8 +90,10 @@ def lambda_handler(event, context):
for result in search_results:
result["start_date"] = result["start_date"].strftime("%Y%m%d")
result["end_date"] = result["end_date"].strftime("%Y%m%d")
result["ingestion_date"] = result["ingestion_date"].strftime(
"%Y-%m-%d %H:%M:%S%z"
)
del result["id"]
del result["status_tracking_id"]

logger.info(
"Found [%s] Query Search Results: %s", len(search_results), str(search_results)
Expand Down
Loading

0 comments on commit 5a057ea

Please sign in to comment.