Skip to content

Commit

Permalink
Add Metadata Ingest (#210)
Browse files Browse the repository at this point in the history
* added initial metadata schema

* fixed inheritance for common metadata table

* added file ingest to database

* added FileParser error status handling in indexer

* added string cap for level and instrument

* added more to table TODO

* updated comments

* changed parameter name in indexer for RDS

* fixed table naming issues

* created model lookup dict
  • Loading branch information
sdhoyt authored Jan 8, 2024
1 parent 6d8ea17 commit 47b8a9a
Show file tree
Hide file tree
Showing 3 changed files with 51 additions and 15 deletions.
8 changes: 4 additions & 4 deletions sds_data_manager/lambda_code/SDSCode/database/models.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
"""Main file to store schema definition"""
from sqlalchemy import Boolean, Column, DateTime, Integer, String
from sqlalchemy import Boolean, Column, DateTime, Identity, Integer, String
from sqlalchemy.orm import DeclarativeBase


Expand All @@ -25,18 +25,18 @@ class UniversalSpinTable(Base):


class FileCatalogTable:
"""Common File Catalog table"""
"""Common file catalog table"""

# TODO: determine cap for strings
id = Column(Integer, primary_key=True)
id = Column(Integer, Identity(start=1, increment=1), primary_key=True)
file_path = Column(String, nullable=False)
instrument = Column(String(6), nullable=False)
data_level = Column(String(3), nullable=False)
descriptor = Column(String, nullable=False)
start_date = Column(DateTime(timezone=True), nullable=False)
end_date = Column(DateTime(timezone=True), nullable=False)
ingestion_date = Column(DateTime(timezone=True), nullable=False)
version = Column(Integer, nullable=False)
version = Column(String, nullable=False)
extension = Column(String, nullable=False)


Expand Down
57 changes: 46 additions & 11 deletions sds_data_manager/lambda_code/SDSCode/indexer.py
Original file line number Diff line number Diff line change
@@ -1,11 +1,15 @@
# Standard
import datetime
import json
import logging
import os
import sys

# Installed
import boto3
from SDSCode.database import models
from SDSCode.database.database import engine
from sqlalchemy.orm import Session

# Local
from .path_helper import FilenameParser
Expand Down Expand Up @@ -47,17 +51,48 @@ def lambda_handler(event, context):
filename = record["s3"]["object"]["key"]

logger.info(f"Attempting to insert {os.path.basename(filename)} into database")
# TODO: change below logics to use new FilenameParser
# when we create schema and write file metadata to DB
filename_parsed = FilenameParser(filename)
filename_parsed.upload_filepath()
metadata = None
filepath = filename_parsed.upload_filepath()

# TODO: remove this check since upload api validates filename?
# Found nothing. This should probably send out an error notification
# to the team, because how did it make its way onto the SDS?
if metadata is None:
logger.info("Found no matching file types to index this file against.")
return None
# confirm that the file is valid
if filepath["statusCode"] != 200:
logger.error(filepath["body"])
break

logger.info("Found the following metadata to index: " + str(metadata))
# setup a dictionary of metadata parameters to unpack in the
# instrument table
metadata_params = {
"file_path": filepath["body"],
"instrument": filename_parsed.instrument,
"data_level": filename_parsed.data_level,
"descriptor": filename_parsed.descriptor,
"start_date": filename_parsed.startdate,
"end_date": filename_parsed.enddate,
"ingestion_date": datetime.datetime.now(datetime.timezone.utc),
"version": filename_parsed.version,
"extension": filename_parsed.extension,
}

# The model lookup is used to match the instrument data
# to the correct postgres table based on the instrument name.
model_lookup = {
"lo": models.LoTable,
"hi": models.HiTable,
"ultra": models.UltraTable,
"hit": models.HITTable,
"idex": models.IDEXTable,
"swapi": models.SWAPITable,
"swe": models.SWETable,
"codice": models.CoDICETable,
"mag": models.MAGTable,
"glows": models.GLOWSTable,
}

# FileParser already confirmed that the file has a valid
# instrument name.
data = model_lookup[filename_parsed.instrument](**metadata_params)

# Add data to the corresponding instrument database
with Session(engine) as session:
session.add(data)
session.commit()
1 change: 1 addition & 0 deletions sds_data_manager/stacks/sds_data_manager_stack.py
Original file line number Diff line number Diff line change
Expand Up @@ -169,6 +169,7 @@ def __init__(
environment={
"DATA_TRACKER_INDEX": "data_tracker",
"S3_DATA_BUCKET": self.data_bucket.s3_url_for_object(),
"SECRET_NAME": db_secret_name,
},
)

Expand Down

0 comments on commit 47b8a9a

Please sign in to comment.