Skip to content

Commit

Permalink
merge develop
Browse files Browse the repository at this point in the history
  • Loading branch information
sliu008 committed Nov 7, 2024
2 parents 74be66b + 9a9c523 commit 87ae143
Show file tree
Hide file tree
Showing 11 changed files with 299 additions and 173 deletions.
5 changes: 5 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,11 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
## [Unreleased]

### Added
- Added a new input argument *granule-list-file* to input a specific list of granules to process,
and ignore start-date, end-date, cycles, etc
- List can be a list of GranuleURs or granule concept-IDs
- Update db size from t2.micro to t3.micro
- Made argument *--cumulus-configurations* optional in preview mode
### Deprecated
### Removed
### Fixed
Expand Down
9 changes: 8 additions & 1 deletion podaac/hitide_backfill_tool/args.py
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,13 @@ def create_parser():
parser.add_argument("--launchpad-token")
parser.add_argument("--cmr-search-after")

parser.add_argument(
"--granule-list-file",
help=("Process only this list of granuleURs or concept-IDs, and ignore start-date, "
"end-date, cycles, etc."),
default=None
)

parser.add_argument("-g", "--geometry", dest="geometries",
action="append", default=None)
parser.add_argument("--footprint", choices=["on", "off", "force"])
Expand All @@ -57,7 +64,7 @@ def create_parser():

parser.add_argument("--cumulus", choices=["ops", "uat", "sit",
"swot-sit", "swot-uat", "swot-ops"])
parser.add_argument("--cumulus-configurations")
parser.add_argument("--cumulus-configurations", default=None)

parser.add_argument("--preview", action="store_true", default=None)
parser.add_argument("--sns-arn")
Expand Down
159 changes: 106 additions & 53 deletions podaac/hitide_backfill_tool/cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
import copy
import json
from datetime import datetime, timezone
import traceback
import requests

from podaac.hitide_backfill_tool.cmr.search import GranuleSearch
Expand Down Expand Up @@ -79,11 +80,14 @@ def granule_search_from_args(args, logger):
def message_writer_from_args(args, logger):
"""Return configured message writer from parsed cli args and logger."""

message_config = get_message_config(args.cumulus, args.default_message_config)
collection_config = get_collection_config(
args.cumulus_configurations, args.collection, args.cumulus, logger)
message_writer = CnmMessageWriter(message_config, collection_config,
args.start_date, args.end_date, args.provider, args.cli_execution_id, args.user)
message_writer = None

if args.cumulus_configurations:
message_config = get_message_config(args.cumulus, args.default_message_config)
collection_config = get_collection_config(
args.cumulus_configurations, args.collection, args.cumulus, logger)
message_writer = CnmMessageWriter(message_config, collection_config,
args.start_date, args.end_date, args.provider, args.cli_execution_id, args.user)
return message_writer


Expand Down Expand Up @@ -123,11 +127,10 @@ class Backfiller:
# Disable broad-except since many types of error indicate the absense
# of data when attempting access (e.g. TypeError, IndexError, KeyError, ...)
# pylint: disable=broad-except

# pylint: disable=too-many-instance-attributes,too-many-arguments
# pylint: disable=too-many-instance-attributes,too-many-arguments,too-many-positional-arguments

def __init__(self, search, message_writer, message_senders, granule_options, logger,
message_limit, cli_execution_id, s3, collection):
message_limit, cli_execution_id, s3, collection, granule_list_file):
# pylint: disable=C0103

# dependencies
Expand All @@ -140,6 +143,8 @@ def __init__(self, search, message_writer, message_senders, granule_options, log
self.cli_execution_id = cli_execution_id
self.s3 = s3
self.collection = collection
self.granule_list_file = granule_list_file
self.granule_list = None

# statistics
self.granules_analyzed = 0
Expand Down Expand Up @@ -178,28 +183,53 @@ def __init__(self, search, message_writer, message_senders, granule_options, log
destination_message.append('nowhere')
self.destination_message = f"Messages being sent to {', '.join(destination_message)}"

if self.granule_list_file:
self.read_granule_list_file()

# for thread-safe operations
self.lock = Lock()

def read_granule_list_file(self):
"""Read the granule_list_file and store contents in array granule_list
The items in the list are one granule per line:
Can be either a GranuleUR or a granlue concept ID, and all items must the same type.
Example:
20240702060501-JPL-L2P_GHRSST-SSTskin-MODIS_A-D-v02.0-fv01.0
20240702083501-JPL-L2P_GHRSST-SSTskin-MODIS_A-N-v02.0-fv01.0
20240702174000-JPL-L2P_GHRSST-SSTskin-MODIS_A-D-v02.0-fv01.0
OR
G3141860732-POCLOUD
G3142056458-POCLOUD
G3142846484-POCLOUD
"""

with open(self.granule_list_file, 'r', encoding='utf-8') as file:
self.granule_list = [line.strip() for line in file]

def process_granules(self):
"""Loop through granules (in parallel) from granule-search and call the process_one_granule() method."""

while self.search.get_next_page():
print("Processing granules...", end='', flush=True)
if self.granule_list:
print('Processing granules from granule list file...', end='', flush=True)
granules = self.search.get_granules_in_list(self.granule_list)

with ThreadPoolExecutor() as executor:
executor.map(self.process_one_granule, self.search.granules())
executor.map(self.process_one_granule, granules)
print("done.")
if self.message_limit_reached():
self.logger.info("\n**** Message limit reached ****")
return
self.log_stats()
else:
while self.search.get_next_page():
print("Processing granules...", end='', flush=True)
with ThreadPoolExecutor() as executor:
executor.map(self.process_one_granule, self.search.granules())
print("done.")
if self.message_limit_reached():
self.logger.info("\n**** Message limit reached ****")
return
self.log_stats()

def print_monthly_results_table(self):
"""Function to print out monthly stats"""

if not self.message_senders:
print("** NOTE: When in preview mode, the messages sent count may not be accurate since it's only simulating sending messages. ** \n")

print("Monthly Counts Summary:\n")
header = f"{'Date':<10} {'Granules':<10} {'Need Image':<12} {'Need Footprint':<16} {'Both FP & BBox':<16} {'Need DMRPP':<12}"

Expand Down Expand Up @@ -274,6 +304,7 @@ def process_one_granule(self, umm_granule):
self.granules_analyzed += 1
except Exception as exc:
self.logger.error(f"Error: {str(exc)}\n")
traceback.print_exc()

def update_image(self, granule):
"""Create and send messages for one granule's image update."""
Expand All @@ -288,11 +319,12 @@ def update_image(self, granule):
if not self.message_limit_reached():
with self.lock:
self.image_messages_sent += 1
message = self.message_writer.write(granule, needs_footprint=False,
needs_image=True, needs_dmrpp=False,
skip_cmr_opendap_update=True)
for sender in self.message_senders:
sender.send(message)
if self.message_writer:
message = self.message_writer.write(granule, needs_footprint=False,
needs_image=True, needs_dmrpp=False,
skip_cmr_opendap_update=True)
for sender in self.message_senders:
sender.send(message)
else:
with self.lock:
self.images_that_couldnt_be_processed += 1
Expand All @@ -313,11 +345,12 @@ def update_footprint(self, granule):
if not self.message_limit_reached():
with self.lock:
self.footprint_messages_sent += 1
message = self.message_writer.write(granule, needs_footprint=True,
needs_image=False, needs_dmrpp=False,
skip_cmr_opendap_update=True)
for sender in self.message_senders:
sender.send(message)
if self.message_writer:
message = self.message_writer.write(granule, needs_footprint=True,
needs_image=False, needs_dmrpp=False,
skip_cmr_opendap_update=True)
for sender in self.message_senders:
sender.send(message)
else:
with self.lock:
self.footprints_that_couldnt_be_processed += 1
Expand Down Expand Up @@ -370,11 +403,12 @@ def update_dmrpp(self, granule):
if not self.message_limit_reached():
with self.lock:
self.dmrpp_messages_sent += 1
message = self.message_writer.write(granule, needs_footprint=False,
needs_image=False, needs_dmrpp=True,
skip_cmr_opendap_update=skip_cmr_opendap_update)
for sender in self.message_senders:
sender.send(message)
if self.message_writer:
message = self.message_writer.write(granule, needs_footprint=False,
needs_image=False, needs_dmrpp=True,
skip_cmr_opendap_update=skip_cmr_opendap_update)
for sender in self.message_senders:
sender.send(message)
else:
with self.lock:
self.dmrpp_that_couldnt_be_processed += 1
Expand Down Expand Up @@ -420,6 +454,9 @@ def log_stats(self):
f"-- {self.destination_message} --\n"
"==============================================================\n"
)
if not self.message_senders:
print("** NOTE: When in preview mode, the messages sent count may not be accurate since it's only simulating sending messages. ** \n")

if len(self.concept_ids_needing_image) > 0:
self.logger.info(f"Granule IDs needing images (showing first 100):\n"
f" {self.concept_ids_needing_image[:100]}\n"
Expand Down Expand Up @@ -452,6 +489,36 @@ def get_forge_tig_configuration(self):
self.forge_tig_configuration = None


def verify_inputs(args, granule_options, message_writer, backfiller):
"""Verify inputs from parsed cli args, and raise an exception if any are invalid."""

if args.cumulus_configurations is None and not args.preview:
raise Exception("Please specify --cumulus-configurations path")

# Check forge configurations before running backfill
backfiller.get_forge_tig_configuration()

if granule_options['footprint_processing'] != "off":
if backfiller.forge_tig_configuration is None:
raise Exception("Cannot find forge tig configuration for this collection")
footprint_settings = backfiller.forge_tig_configuration.get('footprint')
if not footprint_settings:
raise Exception("There is no footprint setting for this collection, please disable footprint for backfilling")

if granule_options['dmrpp_processing'] != "off":
if message_writer is None:
raise Exception("Either disable dmrpp processing or specify --cumulus-configurations path")

files = message_writer.collection_config.get('files', [])
has_dmrpp_regex = False
for file in files:
if file.get('regex', "").endswith(".dmrpp$"):
has_dmrpp_regex = True
break
if has_dmrpp_regex is False:
raise Exception(f"There is no DMRPP regex in cumulus collection configuration for {message_writer.collection_name}")


def main(args=None):
"""Main script for backfilling from the cli"""

Expand Down Expand Up @@ -483,27 +550,13 @@ def main(args=None):

# setup backfiller
backfiller = Backfiller(search, message_writer, message_senders,
granule_options, logger, args.message_limit, args.cli_execution_id, s3, collection)

# Check forge configurations before running backfill
backfiller.get_forge_tig_configuration()

if granule_options['footprint_processing'] != "off":
if backfiller.forge_tig_configuration is None:
raise Exception("There is no footprint settings for this collection, please disable footprint for backfilling")
footprint_settings = backfiller.forge_tig_configuration.get('footprint')
if not footprint_settings:
raise Exception("There is no footprint settings for this collection, please disable footprint for backfilling")
granule_options, logger, args.message_limit, args.cli_execution_id, s3, collection, args.granule_list_file)

if granule_options['dmrpp_processing'] != "off":
files = message_writer.collection_config.get('files', [])
has_dmrpp_regex = False
for file in files:
if file.get('regex', "").endswith(".dmrpp$"):
has_dmrpp_regex = True
break
if has_dmrpp_regex is False:
raise Exception(f"There is no DMRPP regex in cumulus collection configuration for {message_writer.collection_name}")
try:
verify_inputs(args, granule_options, message_writer, backfiller)
except Exception as exc:
logger.error(exc)
return

# run backfiller
try:
Expand Down
2 changes: 1 addition & 1 deletion podaac/hitide_backfill_tool/cmr/cmr_granule.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ class CmrGranule:
# of data when attempting access (e.g. TypeError, IndexError, KeyError, ...)
# pylint: disable=broad-except

# pylint: disable=too-many-instance-attributes
# pylint: disable=too-many-instance-attributes,too-many-positional-arguments
# pylint: disable-next=too-many-arguments
def __init__(self,
umm_granule,
Expand Down
66 changes: 65 additions & 1 deletion podaac/hitide_backfill_tool/cmr/search.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,12 @@
"""Search for CMR granules"""

# pylint: disable=line-too-long

from concurrent.futures import ThreadPoolExecutor
import ast
import json
import logging
import re
from requests import Session
from requests.exceptions import RequestException
from requests.adapters import HTTPAdapter
Expand All @@ -13,7 +17,7 @@
class GranuleSearch:
"""Searches for CMR granules, with paging"""

# pylint: disable=too-many-instance-attributes,too-many-arguments,too-many-locals
# pylint: disable=too-many-instance-attributes,too-many-arguments,too-many-locals,too-many-positional-arguments

def __init__(self,
base_url,
Expand Down Expand Up @@ -192,6 +196,66 @@ def pages_loaded(self):
"""Return the total number of granule search pages that have been loaded up to this point"""
return self._pages_loaded

def get_one_granule(self, granule_name):
"""Request a single granule from CMR using granule_name or concept_id"""

url = f"{self._base_url}/search/granules.umm_json?"

# Regex for granule concept id
pattern = r"^G\d{10}-"
url += (f"concept_id={granule_name}" if re.match(pattern, granule_name) else
f"provider={self._provider}&short_name={self._collection_short_name}&readable_granule_name={granule_name}")

headers = {}
if self._edl_token:
headers["Authorization"] = f"Bearer {self._edl_token}"
elif self._launchpad_token:
headers["Authorization"] = self._launchpad_token

body = {}
try:
response = self.session.get(url, headers=headers)
response.raise_for_status()

body = json.loads(response.text)
except RequestException as exc:
self._logger.error(f"Error requesting CMR: {exc}")
except json.JSONDecodeError as exc:
self._logger.error(f"Error decoding CMR JSON response: {exc}")

# Error message if there is a problem
if response.status_code >= 400 or body.get("hits") is None or body.get("items") is None or len(body.get("items")) == 0:
granule_message = f"Granule not found: {granule_name}"
self._logger.error(
f"\nCMR problem:\n"
f"url: {url}\n"
f"----------\n"
f"http_code: {response.status_code}\n"
f"body: {response.text}\n"
f"----------\n"
f"{granule_message}\n"
)
raise Exception("CMR error")

return body["items"][0]

def get_granules_in_list(self, granule_list):
"""Iterate through granule_list, get cmr for each item in parallel, and return a list of umm granule json"""

# pylint: disable=broad-except
def safe_get_granule(granule_name):
"""Safely get one granule, catching exceptions"""
try:
return self.get_one_granule(granule_name)
except Exception:
return None

with ThreadPoolExecutor() as executor:
granules = list(executor.map(safe_get_granule, granule_list))

# Filter out any None values due to exceptions
return [granule for granule in granules if granule is not None]

#
# Helpers
#
Expand Down
Loading

0 comments on commit 87ae143

Please sign in to comment.