1111from mdps_ds_lib .lib .aws .aws_message_transformers import AwsMessageTransformers
1212from mdps_ds_lib .lib .utils .json_validator import JsonValidator
1313from pystac import Item
14+ from mdps_ds_lib .stac_fast_api_client .sfa_client_factory import SFAClientFactory
1415
1516from cumulus_lambda_functions .lib .uds_db .granules_db_index import GranulesDbIndex
1617from mdps_ds_lib .lib .aws .aws_sns import AwsSns
@@ -228,25 +229,46 @@ def send_to_daac_maap(self, granules_json):
228229 }
229230 return
230231
231- def receive_from_daac (self , event : dict ):
232- LOGGER .debug (f'receive_from_daac#event: { event } ' )
233- sns_msg = AwsMessageTransformers ().sqs_sns (event )
234- LOGGER .debug (f'sns_msg: { sns_msg } ' )
235- cnm_notification_msg = sns_msg
232+ def update_stac (self , cnm_notification_msg ):
233+ update_type = os .getenv ('ARCHIVAL_STATUS_MECHANISM' , '' )
234+ if not any ([k for k in ['UDS' , 'FAST_STAC' ] if k == update_type ]):
235+ raise ValueError (f"missing ARCHIVAL_STATUS_MECHANISM environment variable or value is not { ['UDS' , 'FAST_STAC' ]} " )
236+ if update_type == 'UDS' :
237+ return self .update_stac_uds (cnm_notification_msg )
238+ return self .update_stac_fast_api (cnm_notification_msg )
236239
237- cnm_msg_schema = requests .get ('https://raw.githubusercontent.com/podaac/cloud-notification-message-schema/v1.6.1/cumulus_sns_schema.json' )
238- cnm_msg_schema .raise_for_status ()
239- cnm_msg_schema = json .loads (cnm_msg_schema .text )
240- result = JsonValidator (cnm_msg_schema ).validate (cnm_notification_msg )
241- if result is not None :
242- raise ValueError (f'input cnm event has cnm_msg_schema validation errors: { result } ' )
243- if 'response' not in cnm_notification_msg :
244- raise ValueError (f'missing response in { cnm_notification_msg } ' )
240+ def update_stac_fast_api (self , cnm_notification_msg ):
241+ sfa_client = SFAClientFactory ().get_instance_from_env ()
242+ collection_id , granule_id = ':' .join (cnm_notification_msg ['identifier' ].split (':' )[:- 1 ]), cnm_notification_msg ['identifier' ]
243+ # TODO assuming granule ID is URN:NASA:VENUE:TENANT:VENUE:COLLECTION_ID:COLLECTION_ID
244+ existing_item = sfa_client .get_item (collection_id , granule_id )
245+ # TODO handle error when no existing_item. Currently, it is requests.HTTPError with 404
246+ if cnm_notification_msg ['response' ]['status' ] == 'SUCCESS' :
247+ latest_daac_status = {
248+ 'archive_status' : 'cnm_r_success' ,
249+ 'archive_error_message' : '' ,
250+ 'archive_error_code' : '' ,
251+ }
252+ else :
253+ latest_daac_status = {
254+ 'archive_status' : 'cnm_r_failed' ,
255+ 'archive_error_message' : cnm_notification_msg ['response' ]['errorMessage' ] if 'errorMessage' in cnm_notification_msg ['response' ] else 'unknown' ,
256+ 'archive_error_code' : cnm_notification_msg ['response' ]['errorCode' ] if 'errorCode' in cnm_notification_msg ['response' ] else 'unknown' ,
257+ }
258+ latest_daac_status ['event_time' ] = TimeUtils .get_current_time ()
259+ existing_item ['properties' ]['archival_statuses' ] = existing_item ['properties' ]['archival_statuses' ] + [latest_daac_status ] if 'archival_statuses' in existing_item ['properties' ] else [latest_daac_status ]
260+ updated_item = sfa_client .update_item (collection_id , granule_id , existing_item , update_whole = True ) # TODO partial update via patch is not working at this moment.
261+ return
262+
263+ def update_stac_uds (self , cnm_notification_msg ):
245264 granule_identifier = UdsCollections .decode_identifier (cnm_notification_msg ['identifier' ]) # This is normally meant to be for collection. Since our granule ID also has collection id prefix. we can use this.
246265 try :
247- existing_granule_object = self .__granules_index .get_entry (granule_identifier .tenant , granule_identifier .venue , cnm_notification_msg ['identifier' ])
266+ existing_granule_object = self .__granules_index .get_entry (granule_identifier .tenant ,
267+ granule_identifier .venue ,
268+ cnm_notification_msg ['identifier' ])
248269 except Exception as e :
249- LOGGER .exception (f"error while attempting to retrieve existing record: { cnm_notification_msg ['identifier' ]} , not continuing" )
270+ LOGGER .exception (
271+ f"error while attempting to retrieve existing record: { cnm_notification_msg ['identifier' ]} , not continuing" )
250272 return
251273 LOGGER .debug (f'existing_granule_object: { existing_granule_object } ' )
252274 if cnm_notification_msg ['response' ]['status' ] == 'SUCCESS' :
@@ -258,7 +280,27 @@ def receive_from_daac(self, event: dict):
258280 return
259281 self .__granules_index .update_entry (granule_identifier .tenant , granule_identifier .venue , {
260282 'archive_status' : 'cnm_r_failed' ,
261- 'archive_error_message' : cnm_notification_msg ['response' ]['errorMessage' ] if 'errorMessage' in cnm_notification_msg ['response' ] else 'unknown' ,
262- 'archive_error_code' : cnm_notification_msg ['response' ]['errorCode' ] if 'errorCode' in cnm_notification_msg ['response' ] else 'unknown' ,
283+ 'archive_error_message' : cnm_notification_msg ['response' ]['errorMessage' ] if 'errorMessage' in
284+ cnm_notification_msg [
285+ 'response' ] else 'unknown' ,
286+ 'archive_error_code' : cnm_notification_msg ['response' ]['errorCode' ] if 'errorCode' in cnm_notification_msg [
287+ 'response' ] else 'unknown' ,
263288 }, cnm_notification_msg ['identifier' ])
264289 return
290+
291+ def receive_from_daac (self , event : dict ):
292+ LOGGER .debug (f'receive_from_daac#event: { event } ' )
293+ sns_msg = AwsMessageTransformers ().sqs_sns (event )
294+ LOGGER .debug (f'sns_msg: { sns_msg } ' )
295+ cnm_notification_msg = sns_msg
296+
297+ cnm_msg_schema = requests .get ('https://raw.githubusercontent.com/podaac/cloud-notification-message-schema/v1.6.1/cumulus_sns_schema.json' )
298+ cnm_msg_schema .raise_for_status ()
299+ cnm_msg_schema = json .loads (cnm_msg_schema .text )
300+ result = JsonValidator (cnm_msg_schema ).validate (cnm_notification_msg )
301+ if result is not None :
302+ raise ValueError (f'input cnm event has cnm_msg_schema validation errors: { result } ' )
303+ if 'response' not in cnm_notification_msg :
304+ raise ValueError (f'missing response in { cnm_notification_msg } ' )
305+ self .update_stac (cnm_notification_msg )
306+ return
0 commit comments