Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
76 changes: 59 additions & 17 deletions cumulus_lambda_functions/daac_archiver/daac_archiver_logic.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@

from mdps_ds_lib.lib.aws.aws_message_transformers import AwsMessageTransformers
from mdps_ds_lib.lib.utils.json_validator import JsonValidator
from mdps_ds_lib.stac_fast_api_client.sfa_client_factory import SFAClientFactory

from cumulus_lambda_functions.lib.uds_db.granules_db_index import GranulesDbIndex
from mdps_ds_lib.lib.aws.aws_sns import AwsSns
Expand Down Expand Up @@ -147,25 +148,46 @@ def send_to_daac(self, event: dict):
self.send_to_daac_internal(uds_cnm_json)
return

def receive_from_daac(self, event: dict):
LOGGER.debug(f'receive_from_daac#event: {event}')
sns_msg = AwsMessageTransformers().sqs_sns(event)
LOGGER.debug(f'sns_msg: {sns_msg}')
cnm_notification_msg = sns_msg
def update_stac(self, cnm_notification_msg):
update_type = os.getenv('ARCHIVAL_STATUS_MECHANISM', '')
if not any([k for k in ['UDS', 'FAST_STAC'] if k == update_type]):
raise ValueError(f"missing ARCHIVAL_STATUS_MECHANISM environment variable or value is not {['UDS', 'FAST_STAC']}")
if update_type == 'UDS':
return self.update_stac_uds(cnm_notification_msg)
return self.update_stac_fast_api(cnm_notification_msg)

cnm_msg_schema = requests.get('https://raw.githubusercontent.com/podaac/cloud-notification-message-schema/v1.6.1/cumulus_sns_schema.json')
cnm_msg_schema.raise_for_status()
cnm_msg_schema = json.loads(cnm_msg_schema.text)
result = JsonValidator(cnm_msg_schema).validate(cnm_notification_msg)
if result is not None:
raise ValueError(f'input cnm event has cnm_msg_schema validation errors: {result}')
if 'response' not in cnm_notification_msg:
raise ValueError(f'missing response in {cnm_notification_msg}')
def update_stac_fast_api(self, cnm_notification_msg):
sfa_client = SFAClientFactory().get_instance_from_env()
collection_id, granule_id = ':'.join(cnm_notification_msg['identifier'].split(':')[:-1]), cnm_notification_msg['identifier']
# TODO assuming granule ID is URN:NASA:VENUE:TENANT:VENUE:COLLECTION_ID:COLLECTION_ID
existing_item = sfa_client.get_item(collection_id, granule_id)
# TODO handle error when no existing_item. Currently, it is requests.HTTPError with 404
if cnm_notification_msg['response']['status'] == 'SUCCESS':
latest_daac_status = {
'archive_status': 'cnm_r_success',
'archive_error_message': '',
'archive_error_code': '',
}
else:
latest_daac_status = {
'archive_status': 'cnm_r_failed',
'archive_error_message': cnm_notification_msg['response']['errorMessage'] if 'errorMessage' in cnm_notification_msg['response'] else 'unknown',
'archive_error_code': cnm_notification_msg['response']['errorCode'] if 'errorCode' in cnm_notification_msg['response'] else 'unknown',
}
latest_daac_status['event_time'] = TimeUtils.get_current_time()
existing_item['properties']['archival_statuses'] = existing_item['properties']['archival_statuses'] + [latest_daac_status] if 'archival_statuses' in existing_item['properties'] else [latest_daac_status]
updated_item = sfa_client.update_item(collection_id, granule_id, existing_item, update_whole=True) # TODO partial update via patch is not working at this moment.
return

def update_stac_uds(self, cnm_notification_msg):
granule_identifier = UdsCollections.decode_identifier(cnm_notification_msg['identifier']) # This is normally meant to be for collection. Since our granule ID also has collection id prefix. we can use this.
try:
existing_granule_object = self.__granules_index.get_entry(granule_identifier.tenant, granule_identifier.venue, cnm_notification_msg['identifier'])
existing_granule_object = self.__granules_index.get_entry(granule_identifier.tenant,
granule_identifier.venue,
cnm_notification_msg['identifier'])
except Exception as e:
LOGGER.exception(f"error while attempting to retrieve existing record: {cnm_notification_msg['identifier']}, not continuing")
LOGGER.exception(
f"error while attempting to retrieve existing record: {cnm_notification_msg['identifier']}, not continuing")
return
LOGGER.debug(f'existing_granule_object: {existing_granule_object}')
if cnm_notification_msg['response']['status'] == 'SUCCESS':
Expand All @@ -177,7 +199,27 @@ def receive_from_daac(self, event: dict):
return
self.__granules_index.update_entry(granule_identifier.tenant, granule_identifier.venue, {
'archive_status': 'cnm_r_failed',
'archive_error_message': cnm_notification_msg['response']['errorMessage'] if 'errorMessage' in cnm_notification_msg['response'] else 'unknown',
'archive_error_code': cnm_notification_msg['response']['errorCode'] if 'errorCode' in cnm_notification_msg['response'] else 'unknown',
'archive_error_message': cnm_notification_msg['response']['errorMessage'] if 'errorMessage' in
cnm_notification_msg[
'response'] else 'unknown',
'archive_error_code': cnm_notification_msg['response']['errorCode'] if 'errorCode' in cnm_notification_msg[
'response'] else 'unknown',
}, cnm_notification_msg['identifier'])
return

def receive_from_daac(self, event: dict):
LOGGER.debug(f'receive_from_daac#event: {event}')
sns_msg = AwsMessageTransformers().sqs_sns(event)
LOGGER.debug(f'sns_msg: {sns_msg}')
cnm_notification_msg = sns_msg

cnm_msg_schema = requests.get('https://raw.githubusercontent.com/podaac/cloud-notification-message-schema/v1.6.1/cumulus_sns_schema.json')
cnm_msg_schema.raise_for_status()
cnm_msg_schema = json.loads(cnm_msg_schema.text)
result = JsonValidator(cnm_msg_schema).validate(cnm_notification_msg)
if result is not None:
raise ValueError(f'input cnm event has cnm_msg_schema validation errors: {result}')
if 'response' not in cnm_notification_msg:
raise ValueError(f'missing response in {cnm_notification_msg}')
self.update_stac(cnm_notification_msg)
return
7 changes: 3 additions & 4 deletions requirements.txt
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@ certifi==2024.8.30
charset-normalizer==3.3.2
click==8.1.7
dateparser==1.2.0
elasticsearch==7.13.4
exceptiongroup==1.2.2
fastapi==0.115.0
fastjsonschema==2.20.0
Expand All @@ -15,7 +14,7 @@ jsonschema==4.23.0
jsonschema-specifications==2023.12.1
lark==0.12.0
mangum==0.18.0
mdps-ds-lib==1.1.1.dev800
mdps-ds-lib==1.2.0.dev100
pydantic==2.9.2
pydantic_core==2.23.4
pygeofilter==0.2.4
Expand All @@ -26,7 +25,7 @@ python-dotenv==1.0.1
pytz==2024.2
referencing==0.35.1
regex==2024.9.11
requests==2.31.0
requests==2.32.5
requests-aws4auth==1.2.3
rpds-py==0.20.0
six==1.16.0
Expand All @@ -35,6 +34,6 @@ starlette==0.38.6
tenacity==8.2.3
typing_extensions==4.12.2
tzlocal==5.2
urllib3==1.26.11
urllib3==1.26.20
uvicorn==0.30.6
xmltodict==0.13.0
7 changes: 7 additions & 0 deletions tf-module/unity-cumulus/daac_archiver.tf
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,13 @@ resource "aws_lambda_function" "daac_archiver_response" {
LOG_LEVEL = var.log_level
ES_URL = aws_elasticsearch_domain.uds-es.endpoint
ES_PORT = 443
ARCHIVAL_STATUS_MECHANISM = "UDS" # UDS or FAST_STAC
DS_URL = 'TODO'
SFA_USERNAME = 'TODO'
SFA_PASSWORD = 'TODO'
SFA_AUTH_KEY = 'TODO'
SFA_AUTH_VALUE = 'TODO'
SFA_BEARER_TOKEN = 'TODO'
}
}

Expand Down
Loading