Skip to content

Commit 7ae5ffb

Browse files
authored
fix: Collection deletion - Adding Cumulus Execution Deletions (#597)
* feat: collection deletion * fix: update collection to pass existing code validation * fix: expecting json dict * fix: deleting executions and wait for 10 seconds * fix: normal name to delete executions * fix: need name___version * fix: reduce batch size * fix: brute force delete retry * fix: delete rule first then collection * fix: get executions for collection * fix: delete only executions if existed
1 parent 9365112 commit 7ae5ffb

File tree

2 files changed

+124
-63
lines changed

2 files changed

+124
-63
lines changed

cumulus_lambda_functions/cumulus_wrapper/query_collections.py

Lines changed: 70 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -34,6 +34,7 @@ def with_collections(self, collection_ids: list):
3434
collection_names = [k.split('___')[0] for k in collection_ids]
3535
self._conditions.append(f'{self.__collection_name}__in={",".join(collection_names)}')
3636
return self
37+
3738
def get_size(self, private_api_prefix: str):
3839
query_params = {'field': 'status', 'type': 'collections'}
3940
main_conditions = {k[0]: k[1] for k in [k1.split('=') for k1 in self._conditions]}
@@ -191,6 +192,75 @@ def delete_sqs_rules(self, new_collection: dict, private_api_prefix: str):
191192
return {'server_error': f'error while invoking:{str(e)}'}
192193
return {'status': query_result['message']}
193194

195+
def delete_executions(self, new_collection: dict, private_api_prefix: str):
196+
# $ curl --request DELETE https://example.com/rules/repeat_test --header 'Authorization: Bearer ReplaceWithTheToken'
197+
request_body = {
198+
"collectionId": f'{new_collection["name"]}___{new_collection["version"]}',
199+
"esBatchSize": 10000,
200+
"dbBatchSize": 50000
201+
}
202+
payload = {
203+
'httpMethod': 'POST',
204+
'resource': '/{proxy+}',
205+
'path': f'/executions/bulk-delete-by-collection',
206+
'headers': {
207+
'Content-Type': 'application/json',
208+
},
209+
'body': json.dumps(request_body)
210+
}
211+
LOGGER.debug(f'payload: {payload}')
212+
try:
213+
query_result = self._invoke_api(payload, private_api_prefix)
214+
"""
215+
{'statusCode': 500, 'body': '', 'headers': {}}
216+
"""
217+
if query_result['statusCode'] >= 500:
218+
LOGGER.error(f'server error status code: {query_result["statusCode"]}. details: {query_result}')
219+
return {'server_error': query_result}
220+
if query_result['statusCode'] >= 400:
221+
LOGGER.error(f'client error status code: {query_result["statusCode"]}. details: {query_result}')
222+
return {'client_error': query_result}
223+
query_result = json.loads(query_result['body'])
224+
LOGGER.debug(f'json query_result: {query_result}')
225+
if 'id' not in query_result:
226+
return {'server_error': f'invalid response: {query_result}'}
227+
except Exception as e:
228+
LOGGER.exception('error while invoking')
229+
return {'server_error': f'error while invoking:{str(e)}'}
230+
return {'status': query_result}
231+
232+
def list_executions(self, new_collection: dict, private_api_prefix: str):
233+
# $ curl --request DELETE https://example.com/rules/repeat_test --header 'Authorization: Bearer ReplaceWithTheToken'
234+
payload = {
235+
'httpMethod': 'GET',
236+
'resource': '/{proxy+}',
237+
'path': f'/executions',
238+
'queryStringParameters': {'limit': '100', 'collectionId': f'{new_collection["name"]}___{new_collection["version"]}'},
239+
'headers': {
240+
'Content-Type': 'application/json',
241+
}
242+
}
243+
LOGGER.debug(f'payload: {payload}')
244+
try:
245+
query_result = self._invoke_api(payload, private_api_prefix)
246+
"""
247+
{'statusCode': 500, 'body': '', 'headers': {}}
248+
"""
249+
if query_result['statusCode'] >= 500:
250+
LOGGER.error(f'server error status code: {query_result["statusCode"]}. details: {query_result}')
251+
return {'server_error': query_result}
252+
if query_result['statusCode'] >= 400:
253+
LOGGER.error(f'client error status code: {query_result["statusCode"]}. details: {query_result}')
254+
return {'client_error': query_result}
255+
query_result = json.loads(query_result['body'])
256+
LOGGER.debug(f'json query_result: {query_result}')
257+
if 'results' not in query_result:
258+
return {'server_error': f'invalid response: {query_result}'}
259+
except Exception as e:
260+
LOGGER.exception('error while invoking')
261+
return {'server_error': f'error while invoking:{str(e)}'}
262+
return {'results': query_result['results']}
263+
194264
def create_sqs_rules(self, new_collection: dict, private_api_prefix: str, sqs_url: str, provider_name: str = '', workflow_name: str = 'CatalogGranule', visibility_timeout: int = 1800):
195265
"""
196266
curl --request POST "$CUMULUS_BASEURL/rules" --header "Authorization: Bearer $cumulus_token" --header 'Content-Type: application/json' --data '{

cumulus_lambda_functions/uds_api/dapa/collections_dapa_creation.py

Lines changed: 54 additions & 63 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
import json
22
import os
3+
from time import sleep
34
from typing import Optional
45

56
import pystac
@@ -81,68 +82,17 @@ def __init__(self, request_body):
8182
self.__uds_collection = UdsCollections(es_url=os.getenv('ES_URL'), es_port=int(os.getenv('ES_PORT', '443')), es_type=os.getenv('ES_TYPE', 'AWS'), use_ssl=os.getenv('ES_USE_SSL', 'TRUE').strip() is True)
8283
self.__cumulus_collection_query = CollectionsQuery('', '')
8384

84-
def __delete_collection_cumulus(self, cumulus_collection_doc):
85-
delete_result = self.__cumulus_collection_query.delete_collection(self.__cumulus_lambda_prefix, cumulus_collection_doc['name'], cumulus_collection_doc['version'])
86-
if 'status' not in delete_result:
87-
LOGGER.error(f'status not in creation_result: {delete_result}')
85+
def analyze_cumulus_result(self, cumulus_request_result):
86+
if 'status' not in cumulus_request_result:
87+
LOGGER.error(f'status not in cumulus_request_result: {cumulus_request_result}')
8888
return {
8989
'statusCode': 500,
9090
'body': {
91-
'message': delete_result
91+
'message': cumulus_request_result
9292
}
9393
}, None
94-
return None, delete_result
94+
return None, cumulus_request_result
9595

96-
def __create_collection_cumulus(self, cumulus_collection_doc):
97-
creation_result = self.__cumulus_collection_query.create_collection(cumulus_collection_doc, self.__cumulus_lambda_prefix)
98-
if 'status' not in creation_result:
99-
LOGGER.error(f'status not in creation_result: {creation_result}')
100-
return {
101-
'statusCode': 500,
102-
'body': {
103-
'message': creation_result
104-
}
105-
}, None
106-
return None, creation_result
107-
108-
def __create_rules_cumulus(self, cumulus_collection_doc):
109-
rule_creation_result = self.__cumulus_collection_query.create_sqs_rules(
110-
cumulus_collection_doc,
111-
self.__cumulus_lambda_prefix,
112-
self.__ingest_sqs_url,
113-
self.__provider_id,
114-
self.__workflow_name,
115-
)
116-
if 'status' not in rule_creation_result:
117-
LOGGER.error(f'status not in rule_creation_result. deleting collection: {rule_creation_result}')
118-
delete_collection_result = self.__cumulus_collection_query.delete_collection(self.__cumulus_lambda_prefix,
119-
cumulus_collection_doc['name'],
120-
cumulus_collection_doc['version'])
121-
self.__uds_collection.delete_collection(self.__collection_transformer.get_collection_id())
122-
return {
123-
'statusCode': 500,
124-
'body': {
125-
'message': rule_creation_result,
126-
'details': f'collection deletion result: {delete_collection_result}'
127-
}
128-
}
129-
return None
130-
131-
def __delete_rules_cumulus(self, cumulus_collection_doc):
132-
rule_deletion_result = self.__cumulus_collection_query.delete_sqs_rules(
133-
cumulus_collection_doc,
134-
self.__cumulus_lambda_prefix
135-
)
136-
if 'status' not in rule_deletion_result:
137-
LOGGER.error(f'status not in rule_creation_result. deleting collection: {rule_deletion_result}')
138-
return {
139-
'statusCode': 500,
140-
'body': {
141-
'message': rule_deletion_result,
142-
'details': f'collection deletion result: {rule_deletion_result}'
143-
}
144-
}
145-
return None
14696

14797
def __delete_collection_uds(self):
14898
try:
@@ -189,17 +139,36 @@ def __create_collection_uds(self, cumulus_collection_doc):
189139
def delete(self):
190140
deletion_result = {}
191141
try:
142+
192143
cumulus_collection_doc = self.__collection_transformer.from_stac(self.__request_body)
193144
self.__provider_id = self.__provider_id if self.__collection_transformer.output_provider is None else self.__collection_transformer.output_provider
194145
LOGGER.debug(f'__provider_id: {self.__provider_id}')
195146
creation_result = 'NA'
196147

197148
if self.__include_cumulus:
198-
rules_deletion_result = self.__delete_rules_cumulus(cumulus_collection_doc)
199-
deletion_result['cumulus_rule_deletion'] = rules_deletion_result if rules_deletion_result is not None else 'succeeded'
200-
delete_err, delete_result = self.__delete_collection_cumulus(cumulus_collection_doc)
149+
result = self.__cumulus_collection_query.list_executions(cumulus_collection_doc, self.__cumulus_lambda_prefix)
150+
LOGGER.debug(f'execution list result: {result}')
151+
if len(result['results']) > 0:
152+
self.__delete_collection_execution(cumulus_collection_doc, deletion_result)
153+
return {
154+
'statusCode': 409,
155+
'body': {
156+
'message': f'There are cumulus executions for this collection. Deleting them. Pls try again in a few minutes.',
157+
}
158+
}
159+
# self.__delete_collection_execution(cumulus_collection_doc, deletion_result)
160+
self.__delete_collection_rule(cumulus_collection_doc, deletion_result)
161+
delete_result = self.__cumulus_collection_query.delete_collection(self.__cumulus_lambda_prefix, cumulus_collection_doc['name'], cumulus_collection_doc['version'])
162+
delete_err, delete_result = self.analyze_cumulus_result(delete_result)
163+
if delete_err is not None:
164+
LOGGER.error(f'deleting collection ends in error. Trying again. {delete_err}')
165+
# self.__delete_collection_execution(cumulus_collection_doc, deletion_result)
166+
self.__delete_collection_rule(cumulus_collection_doc, deletion_result)
167+
delete_result = self.__cumulus_collection_query.delete_collection(self.__cumulus_lambda_prefix, cumulus_collection_doc['name'], cumulus_collection_doc['version'])
168+
delete_err, delete_result = self.analyze_cumulus_result(delete_result)
201169
deletion_result['cumulus_collection_deletion'] = delete_err if delete_err is not None else delete_result
202170
else:
171+
deletion_result['cumulus_executions_deletion'] = 'NA'
203172
deletion_result['cumulus_rule_deletion'] = 'NA'
204173
deletion_result['cumulus_collection_deletion'] = 'NA'
205174

@@ -222,23 +191,45 @@ def delete(self):
222191
}
223192
}
224193

194+
def __delete_collection_rule(self, cumulus_collection_doc, deletion_result):
195+
if 'cumulus_rule_deletion' in deletion_result and 'statusCode' not in deletion_result['cumulus_rule_deletion']:
196+
return
197+
rule_deletion_result = self.__cumulus_collection_query.delete_sqs_rules(cumulus_collection_doc, self.__cumulus_lambda_prefix)
198+
rule_delete_err, rule_delete_result = self.analyze_cumulus_result(rule_deletion_result)
199+
deletion_result['cumulus_rule_deletion'] = rule_delete_err if rule_delete_err is not None else rule_delete_result
200+
return
201+
202+
def __delete_collection_execution(self, cumulus_collection_doc, deletion_result):
203+
executions_delete_result = self.__cumulus_collection_query.delete_executions(cumulus_collection_doc, self.__cumulus_lambda_prefix)
204+
exec_delete_err, exec_delete_result = self.analyze_cumulus_result(executions_delete_result)
205+
deletion_result['cumulus_executions_deletion'] = exec_delete_err if exec_delete_err is not None else exec_delete_result
206+
sleep(10)
207+
return
225208
def create(self):
226209
try:
227210
cumulus_collection_doc = self.__collection_transformer.from_stac(self.__request_body)
228211
self.__provider_id = self.__provider_id if self.__collection_transformer.output_provider is None else self.__collection_transformer.output_provider
229212
LOGGER.debug(f'__provider_id: {self.__provider_id}')
230213
creation_result = 'NA'
231214
if self.__include_cumulus:
232-
creation_err, creation_result = self.__create_collection_cumulus(cumulus_collection_doc)
215+
creation_cumulus_result = self.__cumulus_collection_query.create_collection(cumulus_collection_doc, self.__cumulus_lambda_prefix)
216+
creation_err, creation_result = self.analyze_cumulus_result(creation_cumulus_result)
233217
if creation_err is not None:
234218
return creation_err
235219
uds_creation_result = self.__create_collection_uds(cumulus_collection_doc)
236220
if uds_creation_result is not None:
237221
return uds_creation_result
238222
if self.__include_cumulus:
239-
create_rule_result = self.__create_rules_cumulus(cumulus_collection_doc)
240-
if create_rule_result is not None:
241-
return create_rule_result
223+
rule_creation_result = self.__cumulus_collection_query.create_sqs_rules(
224+
cumulus_collection_doc,
225+
self.__cumulus_lambda_prefix,
226+
self.__ingest_sqs_url,
227+
self.__provider_id,
228+
self.__workflow_name,
229+
)
230+
create_rule_err, create_rule_result = self.analyze_cumulus_result(rule_creation_result)
231+
if create_rule_err is not None:
232+
return create_rule_err
242233
# validation_result = pystac.Collection.from_dict(self.__request_body).validate()
243234
# cumulus_collection_query = CollectionsQuery('', '')
244235
#

0 commit comments

Comments
 (0)