Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Indexer lambda update post demo #253

Merged
14 changes: 9 additions & 5 deletions sds_data_manager/lambda_code/SDSCode/database/models.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,6 @@
Boolean,
Column,
DateTime,
ForeignKey,
Identity,
Integer,
String,
Expand Down Expand Up @@ -114,8 +113,15 @@ class StatusTracking(Base):
id = Column(Integer, Identity(start=1, increment=1), primary_key=True)
file_path_to_create = Column(String, nullable=False)
status = Column(STATUSES, nullable=False)
# TODO:
# Didn't make it required field yet. Revisit this
# post discussion
instrument = Column(INSTRUMENTS)
data_level = Column(DATA_LEVELS)
job_definition = Column(String)
ingestion_date = Column(DateTime)
job_log_stream_id = Column(String)
container_image = Column(String)
container_command = Column(String)


class FileCatalog(Base):
Expand Down Expand Up @@ -144,9 +150,7 @@ class FileCatalog(Base):
end_date = Column(DateTime, nullable=False)
version = Column(String(8), nullable=False)
extension = Column(EXTENSIONS, nullable=False)
status_tracking_id = Column(
greglucas marked this conversation as resolved.
Show resolved Hide resolved
Integer, ForeignKey("status_tracking.id"), nullable=False
)
ingestion_date = Column(DateTime)


class PreProcessingDependency(Base):
Expand Down
246 changes: 93 additions & 153 deletions sds_data_manager/lambda_code/SDSCode/indexer.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,3 @@
import datetime
import json
import logging
import os
Expand Down Expand Up @@ -36,17 +35,13 @@

# Retrieve the metadata of the object
bucket_name = os.environ.get("S3_DATA_BUCKET")
key = file_path
logger.info(f"bucket_name: {bucket_name}")
logger.info(f"key: {key}")
logger.info(f"looking up ingestion date for {file_path}")

response = s3_client.head_object(Bucket=bucket_name, Key=key)
response = s3_client.head_object(Bucket=bucket_name, Key=file_path)
file_creation_date = response["LastModified"]

# time looks like this:
# LastModified looks like this:
# 2024-01-25 23:35:26+00:00
# Formats the datetime object to a string with the format "%Y%m%d".
# ingestion_data_str = file_creation_date.strftime("%Y%m%d")
return file_creation_date


Expand Down Expand Up @@ -168,6 +163,10 @@
def s3_event_handler(event):
"""Handler function for S3 events.

S3 event handler takes s3 event and then writes information to
file catalog table. It also sends event to the batch starter
lambda once it finishes writing information to database.

Parameters
----------
event : dict
Expand All @@ -186,72 +185,36 @@
# TODO: add checks for SPICE or other
# data types

try:
science_file = ScienceFilepathManager(filename)
except InvalidScienceFileError as e:
logger.error(str(e))
return http_response(status_code=400, body=str(e))

# Check if the file is a valid science file or not
# TODO: change these lines once filename validator
# is implemented on sds-data-access repo and released
science_file = ScienceFilepathManager(filename)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is now available in imap-data-access v0.3.0! If you bump the required version in pyproject.toml you can use that class instead.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thank you! I will do that in follow-up PR.

# setup a dictionary of metadata parameters to unpack in the
# file catalog table
# file catalog table. Eg.
# {
# "file_path": None,
# "instrument": self.instrument,
# "data_level": self.data_level,
# "descriptor": self.descriptor,
# "start_date": datetime.strptime(self.startdate, "%Y%m%d"),
# "end_date": datetime.strptime(self.enddate, "%Y%m%d"),
# "version": self.version,
# "extension": self.extension,
# "ingestion_date": date_object,
# }
metadata_params = science_file.get_file_metadata_params()
metadata_params["file_path"] = s3_filepath

if metadata_params["data_level"] != "l0":
logger.error("Invalid data level. Expected l0")
return http_response(status_code=400, body="Invalid data level")

# Add data to the file catalog and status tables
# create status params
# Event time looks like this:
# "time": "2024-01-16T17:35:08Z"
# Parses the time string from the event to a datetime object.
ingestion_date = datetime.datetime.strptime(event["time"], "%Y-%m-%dT%H:%M:%SZ")
# Formats the datetime object to a string with the format "%Y%m%d".
ingestion_data_str = ingestion_date.strftime("%Y%m%d")
# Parses the newly formatted string back into a datetime
# object with the desired structure.
ingestion_date_object = datetime.datetime.strptime(ingestion_data_str, "%Y%m%d")
status_params = {
"file_path_to_create": s3_filepath,
"status": models.Status.SUCCEEDED,
"job_definition": None,
"ingestion_date": ingestion_date_object,
}
try:
logger.info(f"Inserting {filename} into database")
update_status_table(status_params)
logger.info("Wrote data to status table")
except Exception as e:
logger.error(str(e))
return http_response(status_code=400, body=str(e))

# Query to get foreign key id for catalog table
status_tracking = None
with Session(db.get_engine()) as session:
# Query to get foreign key id for catalog table
query = select(models.StatusTracking.__table__).where(
models.StatusTracking.file_path_to_create == s3_filepath
)

status_tracking = session.execute(query).first()
ingestion_date_object = get_file_creation_date(s3_filepath)

if status_tracking is None:
logger.error("No status tracking record found")
return http_response(status_code=400, body="No status tracking record found")
logger.info(f"Found record in status table associated with - {filename}")
try:
metadata_params["status_tracking_id"] = status_tracking.id
update_file_catalog_table(metadata_params)
logger.info("Wrote data to file catalog table")
except Exception as e:
logger.error(str(e))
return http_response(status_code=400, body=str(e))
metadata_params["ingestion_date"] = ingestion_date_object
update_file_catalog_table(metadata_params)
logger.info("Wrote data to file catalog table")
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

info() calls should probably contain some more information about what is going on, like the filepath, ingestion date you created or something like that if you wanted to know that. This seems more like a debug level that we don't really care about in normal logs, but may want to see how it progresses if we increase the logging level.

Suggested change
logger.info("Wrote data to file catalog table")
logger.debug("Wrote data to file catalog table")


# Send L0 event from this lambda for Batch starter
# Send event from this lambda for Batch starter
# lambda
send_event_from_indexer(filename)
return http_response(status_code=200, body="Success")
logger.debug("S3 event handler complete")


def batch_event_handler(event):
Expand Down Expand Up @@ -297,77 +260,48 @@
"""
command = event["detail"]["container"]["command"]

# Get event inputs ready
file_path = command[5]
filename = os.path.basename(file_path)

# TODO: post demo, revisit this and improve it
if event["detail"]["status"] == "SUCCEEDED":
# Frist write to status table and then
# write to file catalog with foreign key
# information
try:
# query and update status table record with new
# information from batch
with Session(db.get_engine()) as session:
# Had to query this way because the select statement
# returns a RowProxy object when it executes it,
# not the actual StatusTracking model instance,
# which is why it can't update table row directly.
result = (
session.query(models.StatusTracking)
.filter(models.StatusTracking.file_path_to_create == file_path)
.first()
)
# Get filename from batch job command
file_path_to_create = command[5]
# Get job status
job_status = (
models.Status.SUCCEEDED
if event["detail"]["status"] == "SUCCEEDED"
else models.Status.FAILED
)

# update three fields with updated information
result.status = models.Status.SUCCEEDED
result.job_definition = event["detail"]["jobDefinition"]
result.ingestion_date = get_file_creation_date(file_path)
session.commit()

# Then write to file catalog table
sci_file = ScienceFilepathManager(filename)
metadata_params = sci_file.get_file_metadata_params()
metadata_params["file_path"] = file_path
metadata_params["status_tracking_id"] = result.id
update_file_catalog_table(metadata_params)
# Send event from this lambda for Batch starter
# lambda
response = send_event_from_indexer(filename)
logger.info(f"Sent event to EventBridge - {response}")

except Exception as e:
logger.error(str(e))
return http_response(status_code=400, body=str(e))

elif event["detail"]["status"] == "FAILED":
try:
# Update only status table with Failed status
with Session(db.get_engine()) as session:
# Had to query this way because the select statement
# returns a RowProxy object when it executes it,
# not the actual StatusTracking model instance,
# which is why it can't update table row directly.
result = (
session.query(models.StatusTracking)
.filter(models.StatusTracking.file_path_to_create == file_path)
.first()
)
with Session(db.get_engine()) as session:
# Had to query this way because the select statement
# returns a RowProxy object when it executes it,
# not the actual StatusTracking model instance,
# which is why it can't update table row directly.
result = (
session.query(models.StatusTracking)
.filter(models.StatusTracking.file_path_to_create == file_path_to_create)
.first()
)

# update two fields with updated information
result.status = models.Status.FAILED
result.job_definition = event["detail"]["jobDefinition"]
session.commit()
except Exception as e:
logger.error(str(e))
return http_response(status_code=400, body=str(e))
if result is None:
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If you try to insert into the db, does it fail and return the object that is already there because it wasn't unique? I'm wondering if you could try and insert the object into the database right away and then just update whatever object comes out of it rather than having to query() to find the object?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yes. I tried to avoid query and it will fail to update.

logger.info(
"No existing record found, creating"
" new record for {file_path_to_create}"
)
status_params = {
"file_path_to_create": file_path_to_create,
"status": job_status,
}
update_status_table(status_params)
result = (
session.query(models.StatusTracking)
.filter(
models.StatusTracking.file_path_to_create == file_path_to_create
)
.first()
)

else:
# Technically, we shouldn't get other job status since event
# bridge filters out only succeeded or failed status.
logger.error("Unknown batch job status")
return http_response(status_code=400, body="Unknown batch job status")
result.status = job_status
result.job_definition = event["detail"]["jobDefinition"]
# TODO: get other information post discussion
session.commit()

return http_response(status_code=200, body="Success")

Expand Down Expand Up @@ -403,28 +337,39 @@
filename = os.path.basename(file_path_to_create)
logger.info(f"Attempting to insert {filename} into database")

try:
_ = ScienceFilepathManager(filename)
except InvalidScienceFileError as e:
logger.error(str(e))
return http_response(status_code=400, body=str(e))
ScienceFilepathManager(filename)

# Write event information to status tracking table.
logger.info(f"Inserting {filename} into database")
status_params = {
"file_path_to_create": file_path_to_create,
"status": models.Status.INPROGRESS,
"job_definition": None,
"ingestion_date": None,
}
try:
update_status_table(status_params)
except Exception as e:
return http_response(status_code=400, body=str(e))
update_status_table(status_params)

logger.debug("Wrote data to status tracking table")
return http_response(status_code=200, body="Success")


# Handlers mapping
event_handlers = {
"aws.s3": s3_event_handler,
"aws.batch": batch_event_handler,
"imap.lambda": custom_event_handler,
}


def handle_event(event, handler):
"""Common event handling logic."""
try:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nice, I like this change

handler(event)
return http_response(status_code=200, body="Success")
except InvalidScienceFileError as e:
logger.error(str(e))
return http_response(status_code=400, body=str(e))

Check warning on line 370 in sds_data_manager/lambda_code/SDSCode/indexer.py

View check run for this annotation

Codecov / codecov/patch

sds_data_manager/lambda_code/SDSCode/indexer.py#L368-L370

Added lines #L368 - L370 were not covered by tests


def lambda_handler(event, context):
"""Handler function for creating metadata, adding it to the database.

Expand All @@ -443,16 +388,11 @@
and runtime environment.
"""
logger.info("Received event: " + json.dumps(event, indent=2))
source = event.get("source")

# L0 data or other data type such as SPICE data ingestion
if event["source"] == "aws.s3":
return s3_event_handler(event)
# Data processing status
elif event["source"] == "aws.batch":
return batch_event_handler(event)
# Data processing initiation
elif event["source"] == "imap.lambda":
return custom_event_handler(event)
handler = event_handlers.get(source)
if handler:
return handle_event(event, handler)
else:
logger.error("Unknown event source")
return http_response(status_code=400, body="Unknown event source")
6 changes: 4 additions & 2 deletions sds_data_manager/lambda_code/SDSCode/query_api.py
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ def lambda_handler(event, context):
valid_parameters = [
column.key
for column in models.FileCatalog.__table__.columns
if column.key not in ["id", "status_tracking_id"]
if column.key not in ["id"]
]
# go through each query parameter to set up sqlalchemy query conditions
for param, value in query_params.items():
Expand Down Expand Up @@ -90,8 +90,10 @@ def lambda_handler(event, context):
for result in search_results:
result["start_date"] = result["start_date"].strftime("%Y%m%d")
result["end_date"] = result["end_date"].strftime("%Y%m%d")
result["ingestion_date"] = result["ingestion_date"].strftime(
"%Y-%m-%d %H:%M:%S%z"
)
del result["id"]
del result["status_tracking_id"]

logger.info(
"Found [%s] Query Search Results: %s", len(search_results), str(search_results)
Expand Down
Loading
Loading