|
1 | 1 | import os |
2 | 2 | import time |
| 3 | +from concurrent.futures import ThreadPoolExecutor, as_completed |
3 | 4 |
|
4 | 5 | from mdps_ds_lib.lib.aws.aws_message_transformers import AwsMessageTransformers |
5 | 6 | from cumulus_lambda_functions.lib.uds_db.uds_collections import UdsCollections |
|
10 | 11 |
|
11 | 12 | from mdps_ds_lib.lib.cumulus_stac.unity_collection_stac import UnityCollectionStac |
12 | 13 | from cumulus_lambda_functions.uds_api.dapa.collections_dapa_creation import CollectionDapaCreation |
13 | | -from mdps_ds_lib.lib.cumulus_stac.item_transformer import ItemTransformer |
14 | 14 | from pystac import ItemCollection, Item |
15 | 15 | from mdps_ds_lib.lib.utils.file_utils import FileUtils |
16 | 16 | from cumulus_lambda_functions.lib.lambda_logger_generator import LambdaLoggerGenerator |
@@ -118,41 +118,58 @@ def extract_collection_id(self): |
118 | 118 | if len(self.successful_features.items) < 1: |
119 | 119 | LOGGER.error(f'not required to process. No Granules: {self.successful_features.to_dict(False)}') |
120 | 120 | return |
121 | | - self.collection_id = self.successful_features.items[0].collection_id |
| 121 | + self.collection_id = list(set([k.collection_id for k in self.successful_features.items])) |
122 | 122 | return |
123 | 123 |
|
124 | | - def has_collection(self): |
125 | | - uds_collection_result = self.__uds_collection.get_collection(self.collection_id) |
| 124 | + def has_collection(self, collection_id_custom=None): |
| 125 | + collection_id_custom = collection_id_custom if collection_id_custom is not None else self.collection_id |
| 126 | + uds_collection_result = self.__uds_collection.get_collection(collection_id_custom) |
126 | 127 | return len(uds_collection_result) > 0 |
127 | 128 |
|
128 | | - def create_collection(self): |
| 129 | + def create_one_collection(self, collection_id): |
| 130 | + try: |
| 131 | + if collection_id is None: |
| 132 | + raise RuntimeError(f'NULL collection_id') |
| 133 | + if self.has_collection(collection_id): |
| 134 | + LOGGER.debug(f'{collection_id} already exists. continuing..') |
| 135 | + return {'status': 'success'} |
| 136 | + # ref: https://github.com/unity-sds/unity-py/blob/0.4.0/unity_sds_client/services/data_service.py |
| 137 | + dapa_collection = UnityCollectionStac() \ |
| 138 | + .with_id(collection_id) \ |
| 139 | + .with_graule_id_regex("^test_file.*$") \ |
| 140 | + .with_granule_id_extraction_regex("(^test_file.*)(\\.nc|\\.nc\\.cas|\\.cmr\\.xml)") \ |
| 141 | + .with_title(f'Collection: {collection_id}') \ |
| 142 | + .with_process('stac') \ |
| 143 | + .with_provider(self.__default_provider) \ |
| 144 | + .add_file_type("test_file01.nc", "^test_file.*\\.nc$", 'unknown_bucket', 'application/json', 'root') \ |
| 145 | + .add_file_type("test_file01.nc", "^test_file.*\\.nc$", 'protected', 'data', 'item') \ |
| 146 | + .add_file_type("test_file01.nc.cas", "^test_file.*\\.nc.cas$", 'protected', 'metadata', 'item') \ |
| 147 | + .add_file_type("test_file01.nc.cmr.xml", "^test_file.*\\.nc.cmr.xml$", 'protected', 'metadata', 'item') \ |
| 148 | + .add_file_type("test_file01.nc.stac.json", "^test_file.*\\.nc.stac.json$", 'protected', 'metadata', |
| 149 | + 'item') |
| 150 | + |
| 151 | + stac_collection = dapa_collection.start() |
| 152 | + creation_result = CollectionDapaCreation(stac_collection).create() |
| 153 | + if creation_result['statusCode'] >= 400: |
| 154 | + raise RuntimeError( |
| 155 | + f'failed to create collection: {collection_id}. details: {creation_result["body"]}') |
| 156 | + time.sleep(3) # cool off period before checking DB |
| 157 | + if not self.has_collection(collection_id): |
| 158 | + LOGGER.error(f'missing collection. (failed to create): {collection_id}') |
| 159 | + raise ValueError(f'missing collection. (failed to create): {collection_id}') |
| 160 | + except Exception as e: |
| 161 | + return {'status': 'error', 'details': str(e)} |
| 162 | + return {'status': 'success'} |
| 163 | + |
| 164 | + def create_collection_async(self): |
129 | 165 | if self.collection_id is None: |
130 | 166 | raise RuntimeError(f'NULL collection_id') |
131 | | - if self.has_collection(): |
132 | | - LOGGER.debug(f'{self.collection_id} already exists. continuing..') |
133 | | - return |
134 | | - # ref: https://github.com/unity-sds/unity-py/blob/0.4.0/unity_sds_client/services/data_service.py |
135 | | - dapa_collection = UnityCollectionStac() \ |
136 | | - .with_id(self.collection_id) \ |
137 | | - .with_graule_id_regex("^test_file.*$") \ |
138 | | - .with_granule_id_extraction_regex("(^test_file.*)(\\.nc|\\.nc\\.cas|\\.cmr\\.xml)") \ |
139 | | - .with_title(f'Collection: {self.collection_id}') \ |
140 | | - .with_process('stac') \ |
141 | | - .with_provider(self.__default_provider) \ |
142 | | - .add_file_type("test_file01.nc", "^test_file.*\\.nc$", 'unknown_bucket', 'application/json', 'root') \ |
143 | | - .add_file_type("test_file01.nc", "^test_file.*\\.nc$", 'protected', 'data', 'item') \ |
144 | | - .add_file_type("test_file01.nc.cas", "^test_file.*\\.nc.cas$", 'protected', 'metadata', 'item') \ |
145 | | - .add_file_type("test_file01.nc.cmr.xml", "^test_file.*\\.nc.cmr.xml$", 'protected', 'metadata', 'item') \ |
146 | | - .add_file_type("test_file01.nc.stac.json", "^test_file.*\\.nc.stac.json$", 'protected', 'metadata', 'item') |
147 | | - |
148 | | - stac_collection = dapa_collection.start() |
149 | | - creation_result = CollectionDapaCreation(stac_collection).create() |
150 | | - if creation_result['statusCode'] >= 400: |
151 | | - raise RuntimeError(f'failed to create collection: {self.collection_id}. details: {creation_result["body"]}') |
152 | | - time.sleep(3) # cool off period before checking DB |
153 | | - if not self.has_collection(): |
154 | | - LOGGER.error(f'missing collection. (failed to create): {self.collection_id}') |
155 | | - raise ValueError(f'missing collection. (failed to create): {self.collection_id}') |
| 167 | + with ThreadPoolExecutor() as executor: |
| 168 | + futures = [executor.submit(self.create_one_collection, collection_id) for collection_id in self.collection_id] |
| 169 | + results = [future.result() for future in as_completed(futures)] |
| 170 | + errors = [k['details'] for k in results if k['status'] == 'error'] |
| 171 | + if len(errors) > 0: |
| 172 | + raise ValueError(f'error while creating collections: {errors}') |
156 | 173 | return |
157 | 174 |
|
158 | 175 | def send_cnm_msg(self): |
@@ -188,6 +205,6 @@ def start(self, event): |
188 | 205 | self.load_successful_features_s3(s3_url) |
189 | 206 | self.validate_granules() |
190 | 207 | self.extract_collection_id() |
191 | | - self.create_collection() |
| 208 | + self.create_collection_async() |
192 | 209 | self.send_cnm_msg() |
193 | 210 | return |
0 commit comments