Skip to content

Commit 55c9500

Browse files
authored
feat:Daac response fastapi (#608)
* feat: add logic to fastapi update * fix: update terraform
1 parent 850ca46 commit 55c9500

File tree

3 files changed

+69
-21
lines changed

3 files changed

+69
-21
lines changed

cumulus_lambda_functions/daac_archiver/daac_archiver_logic.py

Lines changed: 59 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@
99

1010
from mdps_ds_lib.lib.aws.aws_message_transformers import AwsMessageTransformers
1111
from mdps_ds_lib.lib.utils.json_validator import JsonValidator
12+
from mdps_ds_lib.stac_fast_api_client.sfa_client_factory import SFAClientFactory
1213

1314
from cumulus_lambda_functions.lib.uds_db.granules_db_index import GranulesDbIndex
1415
from mdps_ds_lib.lib.aws.aws_sns import AwsSns
@@ -147,25 +148,46 @@ def send_to_daac(self, event: dict):
147148
self.send_to_daac_internal(uds_cnm_json)
148149
return
149150

150-
def receive_from_daac(self, event: dict):
151-
LOGGER.debug(f'receive_from_daac#event: {event}')
152-
sns_msg = AwsMessageTransformers().sqs_sns(event)
153-
LOGGER.debug(f'sns_msg: {sns_msg}')
154-
cnm_notification_msg = sns_msg
151+
def update_stac(self, cnm_notification_msg):
152+
update_type = os.getenv('ARCHIVAL_STATUS_MECHANISM', '')
153+
if not any([k for k in ['UDS', 'FAST_STAC'] if k == update_type]):
154+
raise ValueError(f"missing ARCHIVAL_STATUS_MECHANISM environment variable or value is not {['UDS', 'FAST_STAC']}")
155+
if update_type == 'UDS':
156+
return self.update_stac_uds(cnm_notification_msg)
157+
return self.update_stac_fast_api(cnm_notification_msg)
155158

156-
cnm_msg_schema = requests.get('https://raw.githubusercontent.com/podaac/cloud-notification-message-schema/v1.6.1/cumulus_sns_schema.json')
157-
cnm_msg_schema.raise_for_status()
158-
cnm_msg_schema = json.loads(cnm_msg_schema.text)
159-
result = JsonValidator(cnm_msg_schema).validate(cnm_notification_msg)
160-
if result is not None:
161-
raise ValueError(f'input cnm event has cnm_msg_schema validation errors: {result}')
162-
if 'response' not in cnm_notification_msg:
163-
raise ValueError(f'missing response in {cnm_notification_msg}')
159+
def update_stac_fast_api(self, cnm_notification_msg):
160+
sfa_client = SFAClientFactory().get_instance_from_env()
161+
collection_id, granule_id = ':'.join(cnm_notification_msg['identifier'].split(':')[:-1]), cnm_notification_msg['identifier']
162+
# TODO assuming granule ID is URN:NASA:VENUE:TENANT:VENUE:COLLECTION_ID:COLLECTION_ID
163+
existing_item = sfa_client.get_item(collection_id, granule_id)
164+
# TODO handle error when no existing_item. Currently, it is requests.HTTPError with 404
165+
if cnm_notification_msg['response']['status'] == 'SUCCESS':
166+
latest_daac_status = {
167+
'archive_status': 'cnm_r_success',
168+
'archive_error_message': '',
169+
'archive_error_code': '',
170+
}
171+
else:
172+
latest_daac_status = {
173+
'archive_status': 'cnm_r_failed',
174+
'archive_error_message': cnm_notification_msg['response']['errorMessage'] if 'errorMessage' in cnm_notification_msg['response'] else 'unknown',
175+
'archive_error_code': cnm_notification_msg['response']['errorCode'] if 'errorCode' in cnm_notification_msg['response'] else 'unknown',
176+
}
177+
latest_daac_status['event_time'] = TimeUtils.get_current_time()
178+
existing_item['properties']['archival_statuses'] = existing_item['properties']['archival_statuses'] + [latest_daac_status] if 'archival_statuses' in existing_item['properties'] else [latest_daac_status]
179+
updated_item = sfa_client.update_item(collection_id, granule_id, existing_item, update_whole=True) # TODO partial update via patch is not working at this moment.
180+
return
181+
182+
def update_stac_uds(self, cnm_notification_msg):
164183
granule_identifier = UdsCollections.decode_identifier(cnm_notification_msg['identifier']) # This is normally meant to be for collection. Since our granule ID also has collection id prefix. we can use this.
165184
try:
166-
existing_granule_object = self.__granules_index.get_entry(granule_identifier.tenant, granule_identifier.venue, cnm_notification_msg['identifier'])
185+
existing_granule_object = self.__granules_index.get_entry(granule_identifier.tenant,
186+
granule_identifier.venue,
187+
cnm_notification_msg['identifier'])
167188
except Exception as e:
168-
LOGGER.exception(f"error while attempting to retrieve existing record: {cnm_notification_msg['identifier']}, not continuing")
189+
LOGGER.exception(
190+
f"error while attempting to retrieve existing record: {cnm_notification_msg['identifier']}, not continuing")
169191
return
170192
LOGGER.debug(f'existing_granule_object: {existing_granule_object}')
171193
if cnm_notification_msg['response']['status'] == 'SUCCESS':
@@ -177,7 +199,27 @@ def receive_from_daac(self, event: dict):
177199
return
178200
self.__granules_index.update_entry(granule_identifier.tenant, granule_identifier.venue, {
179201
'archive_status': 'cnm_r_failed',
180-
'archive_error_message': cnm_notification_msg['response']['errorMessage'] if 'errorMessage' in cnm_notification_msg['response'] else 'unknown',
181-
'archive_error_code': cnm_notification_msg['response']['errorCode'] if 'errorCode' in cnm_notification_msg['response'] else 'unknown',
202+
'archive_error_message': cnm_notification_msg['response']['errorMessage'] if 'errorMessage' in
203+
cnm_notification_msg[
204+
'response'] else 'unknown',
205+
'archive_error_code': cnm_notification_msg['response']['errorCode'] if 'errorCode' in cnm_notification_msg[
206+
'response'] else 'unknown',
182207
}, cnm_notification_msg['identifier'])
183208
return
209+
210+
def receive_from_daac(self, event: dict):
211+
LOGGER.debug(f'receive_from_daac#event: {event}')
212+
sns_msg = AwsMessageTransformers().sqs_sns(event)
213+
LOGGER.debug(f'sns_msg: {sns_msg}')
214+
cnm_notification_msg = sns_msg
215+
216+
cnm_msg_schema = requests.get('https://raw.githubusercontent.com/podaac/cloud-notification-message-schema/v1.6.1/cumulus_sns_schema.json')
217+
cnm_msg_schema.raise_for_status()
218+
cnm_msg_schema = json.loads(cnm_msg_schema.text)
219+
result = JsonValidator(cnm_msg_schema).validate(cnm_notification_msg)
220+
if result is not None:
221+
raise ValueError(f'input cnm event has cnm_msg_schema validation errors: {result}')
222+
if 'response' not in cnm_notification_msg:
223+
raise ValueError(f'missing response in {cnm_notification_msg}')
224+
self.update_stac(cnm_notification_msg)
225+
return

requirements.txt

Lines changed: 3 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,6 @@ certifi==2024.8.30
55
charset-normalizer==3.3.2
66
click==8.1.7
77
dateparser==1.2.0
8-
elasticsearch==7.13.4
98
exceptiongroup==1.2.2
109
fastapi==0.115.0
1110
fastjsonschema==2.20.0
@@ -15,7 +14,7 @@ jsonschema==4.23.0
1514
jsonschema-specifications==2023.12.1
1615
lark==0.12.0
1716
mangum==0.18.0
18-
mdps-ds-lib==1.1.1.dev800
17+
mdps-ds-lib==1.2.0.dev100
1918
pydantic==2.9.2
2019
pydantic_core==2.23.4
2120
pygeofilter==0.2.4
@@ -26,7 +25,7 @@ python-dotenv==1.0.1
2625
pytz==2024.2
2726
referencing==0.35.1
2827
regex==2024.9.11
29-
requests==2.31.0
28+
requests==2.32.5
3029
requests-aws4auth==1.2.3
3130
rpds-py==0.20.0
3231
six==1.16.0
@@ -35,6 +34,6 @@ starlette==0.38.6
3534
tenacity==8.2.3
3635
typing_extensions==4.12.2
3736
tzlocal==5.2
38-
urllib3==1.26.11
37+
urllib3==1.26.20
3938
uvicorn==0.30.6
4039
xmltodict==0.13.0

tf-module/unity-cumulus/daac_archiver.tf

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -44,6 +44,13 @@ resource "aws_lambda_function" "daac_archiver_response" {
4444
LOG_LEVEL = var.log_level
4545
ES_URL = aws_elasticsearch_domain.uds-es.endpoint
4646
ES_PORT = 443
47+
ARCHIVAL_STATUS_MECHANISM = "UDS" # UDS or FAST_STAC
48+
DS_URL = 'TODO'
49+
SFA_USERNAME = 'TODO'
50+
SFA_PASSWORD = 'TODO'
51+
SFA_AUTH_KEY = 'TODO'
52+
SFA_AUTH_VALUE = 'TODO'
53+
SFA_BEARER_TOKEN = 'TODO'
4754
}
4855
}
4956

0 commit comments

Comments
 (0)