Skip to content

Commit 91582ac

Browse files
committed
feat: daac maap archive granule endpoint
1 parent 850ca46 commit 91582ac

File tree

7 files changed

+224
-0
lines changed

7 files changed

+224
-0
lines changed

cumulus_lambda_functions/cumulus_es_setup/es_setup.py

Lines changed: 33 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,8 @@
11
import os
22

3+
import requests
4+
5+
from cumulus_lambda_functions.granules_to_es.granules_index_mapping import GranulesIndexMapping
36
from cumulus_lambda_functions.lib.lambda_logger_generator import LambdaLoggerGenerator
47

58
from mdps_ds_lib.lib.aws.es_abstract import ESAbstract
@@ -23,6 +26,36 @@ def __init__(self):
2326
port=int(os.getenv('ES_PORT', '443'))
2427
)
2528

29+
def setup_maap_daac_index(self):
30+
stac_fast_version = '6.0.0'
31+
url = f"https://raw.githubusercontent.com/stac-utils/stac-fastapi-elasticsearch-opensearch/refs/tags/v{stac_fast_version}/stac_fastapi/sfeos_helpers/stac_fastapi/sfeos_helpers/mappings.py"
32+
resp = requests.get(url)
33+
resp.raise_for_status()
34+
35+
code = resp.text
36+
namespace = {}
37+
exec(code, namespace)
38+
es_items_mappings = namespace["ES_ITEMS_MAPPINGS"]
39+
LOGGER.debug(f'stac fast API es_items_mappings: {es_items_mappings}')
40+
es_items_mappings['properties'] = {
41+
**GranulesIndexMapping.percolator_mappings,
42+
**es_items_mappings['properties'],
43+
}
44+
index_mapping = {
45+
"settings": {
46+
"number_of_shards": 3,
47+
"number_of_replicas": 2
48+
},
49+
"mappings": es_items_mappings
50+
}
51+
index_name = f'{GranulesIndexMapping.daac_percolator_name}--{stac_fast_version.replace(".", "-")}'
52+
try:
53+
self.__es.create_index(index_name, index_mapping)
54+
self.__es.create_alias(index_name, GranulesIndexMapping.daac_percolator_name)
55+
except:
56+
LOGGER.exception(f'failed to create index / alias for: {GranulesIndexMapping.daac_percolator_name}')
57+
return self
58+
2659
def get_index_mapping(self, index_name: str):
2760
if not hasattr(es_mappings, index_name):
2861
raise ValueError(f'missing index_name: {index_name}')

cumulus_lambda_functions/daac_archiver/daac_archiver_logic.py

Lines changed: 81 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3,12 +3,14 @@
33
from time import sleep
44

55
import requests
6+
from mdps_ds_lib.lib.cumulus_stac.item_transformer import ItemTransformer
67
from mdps_ds_lib.lib.utils.file_utils import FileUtils
78

89
from mdps_ds_lib.lib.aws.aws_s3 import AwsS3
910

1011
from mdps_ds_lib.lib.aws.aws_message_transformers import AwsMessageTransformers
1112
from mdps_ds_lib.lib.utils.json_validator import JsonValidator
13+
from pystac import Item
1214

1315
from cumulus_lambda_functions.lib.uds_db.granules_db_index import GranulesDbIndex
1416
from mdps_ds_lib.lib.aws.aws_sns import AwsSns
@@ -147,6 +149,85 @@ def send_to_daac(self, event: dict):
147149
self.send_to_daac_internal(uds_cnm_json)
148150
return
149151

152+
def __extract_files_maap(self, asset_dict, daac_config):
153+
result_files = []
154+
# https://github.com/podaac/cloud-notification-message-schema
155+
for k, v in asset_dict.items():
156+
# if v.roles[0]['type'] not in archiving_types:
157+
# continue
158+
temp = {
159+
'type': v.roles[0],
160+
'uri': self.revert_to_s3_url(v.href),
161+
'name': os.path.basename(v.href),
162+
'checksumType': 'md5',
163+
'checksum': v.extra_fields['file:checksum'],
164+
'size': v.extra_fields['file:size']
165+
}
166+
result_files.append(temp) # TODO remove missing md5?
167+
168+
if 'archiving_types' not in daac_config or len(daac_config['archiving_types']) < 1:
169+
return result_files # TODO remove missing md5?
170+
archiving_types = {k['data_type']: [] if 'file_extension' not in k else k['file_extension'] for k in daac_config['archiving_types']}
171+
result_files1 = []
172+
for each_file in result_files:
173+
if each_file['type'] not in archiving_types:
174+
continue
175+
file_extensions = archiving_types[each_file['type']]
176+
if len(file_extensions) < 1:
177+
result_files1.append(each_file) # TODO remove missing md5?
178+
continue
179+
temp_filename = each_file['name'].upper().strip()
180+
if any([temp_filename.endswith(k.upper()) for k in file_extensions]):
181+
result_files1.append(each_file) # TODO remove missing md5?
182+
return result_files1
183+
184+
def send_to_daac_maap(self, granules_json):
185+
daac_configs = self.__archive_index_logic.percolate_maap_document(granules_json)
186+
if daac_configs is None or len(daac_configs) < 1:
187+
LOGGER.debug(f'this granule is not configured for archival: {granules_json}')
188+
return
189+
granules_item = Item.from_dict(granules_json)
190+
errors = []
191+
for each_daac_config in daac_configs:
192+
LOGGER.debug(f'working on {each_daac_config}')
193+
result = JsonValidator(UdsArchiveConfigIndex.db_record_schema).validate(each_daac_config)
194+
if result is not None:
195+
errors.append(f'each_daac_config does not have valid schema. Pls re-add the daac config: {result} for {each_daac_config}')
196+
continue
197+
try:
198+
self.__sns.set_topic_arn(each_daac_config['daac_sns_topic_arn'])
199+
daac_cnm_message = {
200+
"collection": {
201+
'name': each_daac_config['daac_collection_name'],
202+
'version': each_daac_config['daac_data_version'],
203+
},
204+
"identifier": granules_item.id,
205+
"submissionTime": f'{TimeUtils.get_current_time()}Z',
206+
"provider": each_daac_config['daac_provider'],
207+
"version": "1.6.0", # TODO this is hardcoded?
208+
"product": {
209+
"name": granules_item.id,
210+
# "dataVersion": daac_config['daac_data_version'],
211+
'files': self.__extract_files_maap(granules_item.assets, each_daac_config),
212+
}
213+
}
214+
LOGGER.debug(f'daac_cnm_message: {daac_cnm_message}')
215+
self.__sns.set_external_role(each_daac_config['daac_role_arn'],
216+
each_daac_config['daac_role_session_name']).publish_message(
217+
json.dumps(daac_cnm_message), True)
218+
return {
219+
'archive_status': 'cnm_s_success',
220+
'archive_error_message': '',
221+
'archive_error_code': '',
222+
}
223+
except Exception as e:
224+
LOGGER.exception(f'failed during archival process')
225+
return {
226+
'archive_status': 'cnm_s_failed',
227+
'archive_error_message': str(e),
228+
}
229+
return
230+
150231
def receive_from_daac(self, event: dict):
151232
LOGGER.debug(f'receive_from_daac#event: {event}')
152233
sns_msg = AwsMessageTransformers().sqs_sns(event)

cumulus_lambda_functions/granules_to_es/granules_index_mapping.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,5 @@
11
class GranulesIndexMapping:
2+
daac_percolator_name = 'uds_maap_percolator'
23
archiving_keys = [
34
'archive_status', 'archive_error_message', 'archive_error_code'
45
]

cumulus_lambda_functions/lib/uds_db/archive_index.py

Lines changed: 24 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
from copy import deepcopy
22

3+
from cumulus_lambda_functions.granules_to_es.granules_index_mapping import GranulesIndexMapping
34
from cumulus_lambda_functions.lib.lambda_logger_generator import LambdaLoggerGenerator
45

56
from mdps_ds_lib.lib.utils.json_validator import JsonValidator
@@ -56,6 +57,29 @@ def __init__(self, es_url, es_port=443, es_type='AWS', use_ssl=True):
5657
port=es_port)
5758
self.__tenant, self.__venue = '', ''
5859

60+
def percolate_maap_document(self, document):
61+
dsl = {
62+
'size': 9999,
63+
# '_source': ['ss_name', 'ss_type', 'ss_username'],
64+
'query': {
65+
'percolate': {
66+
'field': 'ss_query',
67+
'document': document,
68+
}
69+
},
70+
# 'sort': [{'ss_name': {'order': 'asc'}}]
71+
}
72+
try:
73+
percolated_result = self.__es.query(dsl, querying_index=GranulesIndexMapping.daac_percolator_name)
74+
except Exception as e:
75+
if e.error == 'resource_not_found_exception':
76+
LOGGER.debug(f'unable to find document: {document} on index: {GranulesIndexMapping.daac_percolator_name}')
77+
return None
78+
LOGGER.exception(f'error while percolating')
79+
raise e
80+
percolated_result = [k['_source'] for k in percolated_result['hits']['hits']]
81+
return percolated_result
82+
5983
def percolate_document(self, document_id):
6084
write_alias_name = f'{DBConstants.granules_write_alias_prefix}_{self.__tenant}_{self.__venue}'.lower().strip()
6185
current_alias = self.__es.get_alias(write_alias_name)

cumulus_lambda_functions/uds_api/granules_archive_api.py

Lines changed: 26 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
import json
22
import os
33

4+
from cumulus_lambda_functions.daac_archiver.daac_archiver_logic import DaacArchiverLogic
45
from cumulus_lambda_functions.uds_api.dapa.granules_dapa_query_es import GranulesDapaQueryEs
56
from cumulus_lambda_functions.uds_api.dapa.pagination_links_generator import PaginationLinksGenerator
67

@@ -21,6 +22,7 @@
2122

2223
from fastapi import APIRouter, HTTPException, Request
2324

25+
from cumulus_lambda_functions.uds_api.granules_api import StacGranuleModel
2426
from cumulus_lambda_functions.uds_api.web_service_constants import WebServiceConstants
2527

2628
LOGGER = LambdaLoggerGenerator.get_logger(__name__, LambdaLoggerGenerator.get_level_from_env())
@@ -143,6 +145,30 @@ async def dapa_archive_get_config(request: Request, collection_id: str):
143145
return add_result['body']
144146
raise HTTPException(status_code=add_result['statusCode'], detail=add_result['body'])
145147

148+
@router.post("/{collection_id}/archive/{granule_id}")
149+
@router.post("/{collection_id}/archive/{granule_id}/")
150+
async def archive_single_granule_dapa(request: Request, collection_id: str, granule_id: str, granule: StacGranuleModel):
151+
authorizer: UDSAuthorizorAbstract = UDSAuthorizerFactory() \
152+
.get_instance(UDSAuthorizerFactory.cognito,
153+
es_url=os.getenv('ES_URL'),
154+
es_port=int(os.getenv('ES_PORT', '443'))
155+
)
156+
auth_info = FastApiUtils.get_authorization_info(request)
157+
collection_identifier = UdsCollections.decode_identifier(collection_id)
158+
if not authorizer.is_authorized_for_collection(DBConstants.read, collection_id,
159+
auth_info['ldap_groups'],
160+
collection_identifier.tenant,
161+
collection_identifier.venue):
162+
LOGGER.debug(f'user: {auth_info["username"]} is not authorized for {collection_id}')
163+
raise HTTPException(status_code=403, detail=json.dumps({
164+
'message': 'not authorized to execute this action'
165+
}))
166+
new_granule = granule.model_dump()
167+
update_result = DaacArchiverLogic().send_to_daac_maap(new_granule)
168+
169+
return
170+
171+
146172
@router.put("/{collection_id}/archive/{granule_id}")
147173
@router.put("/{collection_id}/archive/{granule_id}/")
148174
async def archive_single_granule_dapa(request: Request, collection_id: str, granule_id: str):

cumulus_lambda_functions/uds_api/system_admin_api.py

Lines changed: 22 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -36,3 +36,25 @@ async def es_setup(request: Request, tenant: Union[str, None]=None, venue: Union
3636
LOGGER.exception(f'')
3737
raise HTTPException(status_code=500, detail=str(e))
3838
return {'message': 'successful'}
39+
40+
41+
@router.put("/maap_daac_config_setup")
42+
@router.put("/maap_daac_config_setup/")
43+
async def maap_daac_config_setup(request: Request, tenant: Union[str, None]=None, venue: Union[str, None]=None, group_names: Union[str, None]=None):
44+
LOGGER.debug(f'started maap_daac_config_setup')
45+
auth_info = FastApiUtils.get_authorization_info(request)
46+
query_body = {
47+
'tenant': tenant,
48+
'venue': venue,
49+
'ldap_group_names': group_names if group_names is None else [k.strip() for k in group_names.split(',')],
50+
}
51+
auth_crud = AuthCrud(auth_info, query_body)
52+
is_admin_result = auth_crud.is_admin()
53+
if is_admin_result['statusCode'] != 200:
54+
raise HTTPException(status_code=is_admin_result['statusCode'], detail=is_admin_result['body'])
55+
try:
56+
SetupESIndexAlias().setup_maap_daac_index()
57+
except Exception as e:
58+
LOGGER.exception(f'')
59+
raise HTTPException(status_code=500, detail=str(e))
60+
return {'message': 'successful'}

cumulus_lambda_functions/uds_api/web_service.py

Lines changed: 37 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,6 @@
1+
import httpx
2+
from fastapi import Response
3+
14
from fastapi.staticfiles import StaticFiles
25

36
from cumulus_lambda_functions.uds_api.fast_api_utils import FastApiUtils
@@ -64,6 +67,40 @@ async def get_open_api(request: Request):
6467
default_open_api_doc['paths'].pop(k)
6568
return app.openapi()
6669

70+
71+
# NOTE: This is how you create a proxy in Fast API.
72+
73+
# BACKEND_URL = 'http://localhost:8080/' # TODO make sure it ends with '/'
74+
# @app.api_route("/{full_path:path}", methods=["GET", "POST", "PUT", "DELETE", "PATCH"])
75+
# async def proxy(full_path: str, request: Request):
76+
# # Construct full target URL
77+
# fast_api_path = full_path.replace(f'{api_base_prefix}/', '')
78+
# target_url = f"{BACKEND_URL}{fast_api_path}"
79+
# print(f'full_path = {full_path}')
80+
# print(f'target_url = {target_url}')
81+
# # Prepare the request
82+
# method = request.method
83+
# headers = dict(request.headers)
84+
# body = await request.body()
85+
#
86+
# async with httpx.AsyncClient() as client:
87+
# backend_response = await client.request(
88+
# method,
89+
# target_url,
90+
# content=body,
91+
# headers=headers,
92+
# params=request.query_params
93+
# )
94+
#
95+
# # Return the response from the backend
96+
# return Response(
97+
# content=backend_response.content,
98+
# status_code=backend_response.status_code,
99+
# headers=dict(backend_response.headers),
100+
# )
101+
#
102+
103+
67104
# to make it work with Amazon Lambda, we create a handler object
68105
handler = Mangum(app=app)
69106

0 commit comments

Comments
 (0)