diff --git a/mdps_ds_lib/stage_in_out/upload_granules_by_complete_catalog_s3.py b/mdps_ds_lib/stage_in_out/upload_granules_by_complete_catalog_s3.py index 4b9e09c..529fc00 100644 --- a/mdps_ds_lib/stage_in_out/upload_granules_by_complete_catalog_s3.py +++ b/mdps_ds_lib/stage_in_out/upload_granules_by_complete_catalog_s3.py @@ -50,8 +50,22 @@ def validate_job(self, job_obj): # return def __exec_actual_job(self, each_child, lock) -> bool: - current_granule_stac: Item = self.__gc.get_granules_item(each_child) - current_collection_id = current_granule_stac.collection_id.strip() + try: + current_granule_stac: Item = self.__gc.get_granules_item(each_child) + current_collection_id = current_granule_stac.collection_id.strip() + except Exception as e: + LOGGER.exception(f'error while processing: {each_child}') + error_item = Item(id='unknown', + properties={'message': 'unknown error', 'granule': each_child, 'details': str(e)}, + geometry={ + "type": "Point", + "coordinates": [0.0, 0.0] + }, + bbox=[0.0, 0.0, 0.0, 0.0], + datetime=TimeUtils().parse_from_unix(0, True).get_datetime_obj(), + collection='unknown') + self.__error_list.put(error_item.to_dict(False, False)) + return True try: current_collection_id = GranulesCatalog.get_unity_formatted_collection_id(current_collection_id, self.__project_venue_set) LOGGER.debug(f'reformatted current_collection_id: {current_collection_id}') @@ -211,14 +225,19 @@ def __actual_upload(self): FileUtils.write_json(failed_features_file, failed_item_collections.to_dict(False)) if len(failed_item_collections.items) > 0: LOGGER.fatal(f'One or more Failures: {failed_item_collections.to_dict(False)}') + + LOGGER.debug(f'creating response catalog') + catalog_json = GranulesCatalog().update_catalog(catalog_file_path, [successful_features_file, failed_features_file]) + LOGGER.debug(f'catalog_json: {catalog_json}') + if len(successful_item_collections) < 1: # TODO check this. + LOGGER.debug(f'No successful items in Upload: Not uploading successful_features_ to s3://{self._staging_bucket}/{self._result_path_prefix}') + return json.dumps(catalog_json) + s3_url = self.__s3.upload(successful_features_file, self._staging_bucket, self._result_path_prefix, s3_name=f'successful_features_{TimeUtils.get_current_time()}.json', delete_files=self._delete_files) LOGGER.debug(f'uploaded successful features to S3: {s3_url}') - LOGGER.debug(f'creating response catalog') - catalog_json = GranulesCatalog().update_catalog(catalog_file_path, [successful_features_file, failed_features_file]) - LOGGER.debug(f'catalog_json: {catalog_json}') return json.dumps(catalog_json) def __exec_dry_run(self): diff --git a/tests/integration_tests/test_docker_stage_out.py b/tests/integration_tests/test_docker_stage_out.py index cab7d28..72d2809 100644 --- a/tests/integration_tests/test_docker_stage_out.py +++ b/tests/integration_tests/test_docker_stage_out.py @@ -1343,10 +1343,10 @@ def test_03_02_upload_complete_catalog(self): result_key_prefix = result_key.split('.')[0] self.assertTrue(f'{result_key_prefix}.nc.cas' in upload_result['assets'], f'missing assets#metadata asset: {result_key_prefix}.nc.cas') self.assertTrue('href' in upload_result['assets'][f'{result_key_prefix}.nc.cas'], 'missing assets#metadata__cas#href') - self.assertTrue(upload_result['assets'][f'{result_key_prefix}.nc.cas']['href'].startswith(f's3://{os.environ["STAGING_BUCKET"]}/URN:NASA:UNITY:{os.environ["PROJECT"]}:{os.environ["VENUE"]}:NA/')) + self.assertTrue(upload_result['assets'][f'{result_key_prefix}.nc.cas']['href'].startswith(f's3://{os.environ["STAGING_BUCKET"]}/URN:NASA:UNITY:{os.environ["PROJECT"]}:{os.environ["VENUE"]}:NA')) self.assertTrue(f'{result_key_prefix}.nc' in upload_result['assets'], f'missing assets#data: {result_key_prefix}.nc') self.assertTrue('href' in upload_result['assets'][f'{result_key_prefix}.nc'], 'missing assets#data#href') - self.assertTrue(upload_result['assets'][f'{result_key_prefix}.nc']['href'].startswith(f's3://{os.environ["STAGING_BUCKET"]}/URN:NASA:UNITY:{os.environ["PROJECT"]}:{os.environ["VENUE"]}:NA/')) + self.assertTrue(upload_result['assets'][f'{result_key_prefix}.nc']['href'].startswith(f's3://{os.environ["STAGING_BUCKET"]}/URN:NASA:UNITY:{os.environ["PROJECT"]}:{os.environ["VENUE"]}:NA')) """ Example output: { @@ -1383,6 +1383,73 @@ def test_03_02_upload_complete_catalog(self): self.assertEqual(len(successful_feature_collection), total_files, f'wrong length: {successful_feature_collection}') return + def test_03_02_upload_complete_catalog_missing_real_files(self): + os.environ['VERIFY_SSL'] = 'FALSE' + os.environ['PROJECT'] = 'LOCAL' + os.environ['VENUE'] = 'UNIT_TEST' + os.environ['STAGING_BUCKET'] = 'uds-sbx-cumulus-staging' + + os.environ['GRANULES_SEARCH_DOMAIN'] = 'UNITY' + # os.environ['GRANULES_UPLOAD_TYPE'] = 'UPLOAD_S3_BY_STAC_CATALOG' + # defaulted to this value + + if len(argv) > 1: + argv.pop(-1) + argv.append('UPLOAD') + + starting_time = datetime.utcnow().strftime('%Y-%m-%dT%H:%M') + with tempfile.TemporaryDirectory() as tmp_dir_name: + os.environ['OUTPUT_FILE'] = os.path.join(tmp_dir_name, 'some_output', 'output.json') + os.environ['UPLOAD_DIR'] = '' # not needed + os.environ['OUTPUT_DIRECTORY'] = os.path.join(tmp_dir_name, 'output_dir') + FileUtils.mk_dir_p(os.environ.get('OUTPUT_DIRECTORY')) + os.environ['CATALOG_FILE'] = os.path.join(tmp_dir_name, 'catalog.json') + total_files = 10 + # os.environ['PARALLEL_COUNT'] = str(total_files) + granules_dir = os.path.join(tmp_dir_name, 'some_granules') + FileUtils.mk_dir_p(granules_dir) + catalog = Catalog( + id='NA', + description='NA') + catalog.set_self_href(os.environ['CATALOG_FILE']) + + for i in range(1, total_files+1): + filename = f'test_file{i:02d}' + catalog.add_link(Link('item', os.path.join('some_granules', f'{filename}.nc.stac.json'), 'application/json')) + print(json.dumps(catalog.to_dict(False, False))) + with open(os.environ['CATALOG_FILE'], 'w') as ff: + ff.write(json.dumps(catalog.to_dict(False, False))) + + upload_result_str = UploadGranulesFactory().get_class(os.getenv('GRANULES_UPLOAD_TYPE', UploadGranulesFactory.UPLOAD_S3_BY_STAC_CATALOG)).upload() + upload_result = json.loads(upload_result_str) + print(upload_result) + """ + {'type': 'Catalog', 'id': 'NA', 'stac_version': '1.0.0', 'description': 'NA', 'links': [{'rel': 'root', 'href': '/var/folders/33/xhq97d6s0dq78wg4h2smw23m0000gq/T/tmprew515jo/catalog.json', 'type': 'application/json'}, {'rel': 'item', 'href': '/var/folders/33/xhq97d6s0dq78wg4h2smw23m0000gq/T/tmprew515jo/successful_features.json', 'type': 'application/json'}, {'rel': 'item', 'href': '/var/folders/33/xhq97d6s0dq78wg4h2smw23m0000gq/T/tmprew515jo/failed_features.json', 'type': 'application/json'}]} + """ + self.assertTrue('type' in upload_result, 'missing type') + self.assertEqual(upload_result['type'], 'Catalog', 'missing type') + upload_result = Catalog.from_dict(upload_result) + child_links = [k.href for k in upload_result.get_links(rel='item')] + self.assertEqual(len(child_links), 2, f'wrong length: {child_links}') + self.assertTrue(FileUtils.file_exist(child_links[0]), f'missing file: {child_links[0]}') + successful_feature_collection = ItemCollection.from_dict(FileUtils.read_json(child_links[0])) + successful_feature_collection = list(successful_feature_collection.items) + self.assertEqual(len(successful_feature_collection), 0, f'wrong length: {successful_feature_collection}') + + self.assertTrue(FileUtils.file_exist(child_links[1]), f'missing file: {child_links[1]}') + failed_feature_collection = ItemCollection.from_dict(FileUtils.read_json(child_links[1])) + failed_feature_collection = list(failed_feature_collection.items) + self.assertEqual(len(failed_feature_collection), 10, f'wrong length: {failed_feature_collection}') + + s3 = AwsS3() + s3_keys = [k for k in s3.get_child_s3_files(os.environ['STAGING_BUCKET'], + f"{UploadGranulesByCompleteCatalogS3.DEFAULT_RESULT_PATH_PREFIX}/successful_features_{starting_time}", + )] + s3_keys = sorted(s3_keys) + print(f's3_keys: {s3_keys}') + self.assertTrue(len(s3_keys)< 1, f'NO empty files in S3: {s3_keys}') + return + def test_03_03_upload_auxiliary_files(self): temp_collection_id = f'URN:NASA:UNITY:{self.tenant}:{self.tenant_venue}:{self.collection_name}___{self.collection_version}' os.environ['GRANULES_UPLOAD_TYPE'] = 'UPLOAD_AUXILIARY_FILE_AS_GRANULE'