Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Feature/add granule list #33

Merged
merged 6 commits into from
Oct 26, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,9 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
## [Unreleased]

### Added
- Added a new input argument *granule-list-file* to input a specific list of granules to process,
and ignore start-date, end-date, cycles, etc
- List can be a list of GranuleURs or granule concept-IDs
### Deprecated
### Removed
### Fixed
Expand Down
7 changes: 7 additions & 0 deletions podaac/hitide_backfill_tool/args.py
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,13 @@ def create_parser():
parser.add_argument("--launchpad-token")
parser.add_argument("--cmr-search-after")

parser.add_argument(
"--granule-list-file",
help=("Process only this list of granuleURs or concept-IDs, and ignore start-date, "
"end-date, cycles, etc."),
default=None
)

parser.add_argument("-g", "--geometry", dest="geometries",
action="append", default=None)
parser.add_argument("--footprint", choices=["on", "off", "force"])
Expand Down
59 changes: 45 additions & 14 deletions podaac/hitide_backfill_tool/cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
import copy
import json
from datetime import datetime, timezone
import traceback
import requests

from podaac.hitide_backfill_tool.cmr.search import GranuleSearch
Expand Down Expand Up @@ -123,11 +124,10 @@ class Backfiller:
# Disable broad-except since many types of error indicate the absense
# of data when attempting access (e.g. TypeError, IndexError, KeyError, ...)
# pylint: disable=broad-except

# pylint: disable=too-many-instance-attributes,too-many-arguments
# pylint: disable=too-many-instance-attributes,too-many-arguments,too-many-positional-arguments

def __init__(self, search, message_writer, message_senders, granule_options, logger,
message_limit, cli_execution_id, s3, collection):
message_limit, cli_execution_id, s3, collection, granule_list_file):
# pylint: disable=C0103

# dependencies
Expand All @@ -140,6 +140,8 @@ def __init__(self, search, message_writer, message_senders, granule_options, log
self.cli_execution_id = cli_execution_id
self.s3 = s3
self.collection = collection
self.granule_list_file = granule_list_file
self.granule_list = None

# statistics
self.granules_analyzed = 0
Expand Down Expand Up @@ -178,28 +180,53 @@ def __init__(self, search, message_writer, message_senders, granule_options, log
destination_message.append('nowhere')
self.destination_message = f"Messages being sent to {', '.join(destination_message)}"

if self.granule_list_file:
self.read_granule_list_file()

# for thread-safe operations
self.lock = Lock()

def read_granule_list_file(self):
"""Read the granule_list_file and store contents in array granule_list
The items in the list are one granule per line:
Can be either a GranuleUR or a granlue concept ID, and all items must the same type.
Example:
20240702060501-JPL-L2P_GHRSST-SSTskin-MODIS_A-D-v02.0-fv01.0
20240702083501-JPL-L2P_GHRSST-SSTskin-MODIS_A-N-v02.0-fv01.0
20240702174000-JPL-L2P_GHRSST-SSTskin-MODIS_A-D-v02.0-fv01.0
OR
G3141860732-POCLOUD
G3142056458-POCLOUD
G3142846484-POCLOUD
"""

with open(self.granule_list_file, 'r', encoding='utf-8') as file:
self.granule_list = [line.strip() for line in file]

def process_granules(self):
"""Loop through granules (in parallel) from granule-search and call the process_one_granule() method."""

while self.search.get_next_page():
print("Processing granules...", end='', flush=True)
if self.granule_list:
print('Processing granules from granule list file...', end='', flush=True)
granules = self.search.get_granules_in_list(self.granule_list)

with ThreadPoolExecutor() as executor:
executor.map(self.process_one_granule, self.search.granules())
executor.map(self.process_one_granule, granules)
print("done.")
if self.message_limit_reached():
self.logger.info("\n**** Message limit reached ****")
return
self.log_stats()
else:
while self.search.get_next_page():
print("Processing granules...", end='', flush=True)
with ThreadPoolExecutor() as executor:
executor.map(self.process_one_granule, self.search.granules())
print("done.")
if self.message_limit_reached():
self.logger.info("\n**** Message limit reached ****")
return
self.log_stats()

def print_monthly_results_table(self):
"""Function to print out monthly stats"""

if not self.message_senders:
print("** NOTE: When in preview mode, the messages sent count may not be accurate since it's only simulating sending messages. ** \n")

print("Monthly Counts Summary:\n")
header = f"{'Date':<10} {'Granules':<10} {'Need Image':<12} {'Need Footprint':<16} {'Both FP & BBox':<16} {'Need DMRPP':<12}"

Expand Down Expand Up @@ -274,6 +301,7 @@ def process_one_granule(self, umm_granule):
self.granules_analyzed += 1
except Exception as exc:
self.logger.error(f"Error: {str(exc)}\n")
traceback.print_exc()

def update_image(self, granule):
"""Create and send messages for one granule's image update."""
Expand Down Expand Up @@ -420,6 +448,9 @@ def log_stats(self):
f"-- {self.destination_message} --\n"
"==============================================================\n"
)
if not self.message_senders:
print("** NOTE: When in preview mode, the messages sent count may not be accurate since it's only simulating sending messages. ** \n")

if len(self.concept_ids_needing_image) > 0:
self.logger.info(f"Granule IDs needing images (showing first 100):\n"
f" {self.concept_ids_needing_image[:100]}\n"
Expand Down Expand Up @@ -483,7 +514,7 @@ def main(args=None):

# setup backfiller
backfiller = Backfiller(search, message_writer, message_senders,
granule_options, logger, args.message_limit, args.cli_execution_id, s3, collection)
granule_options, logger, args.message_limit, args.cli_execution_id, s3, collection, args.granule_list_file)

# Check forge configurations before running backfill
backfiller.get_forge_tig_configuration()
Expand Down
2 changes: 1 addition & 1 deletion podaac/hitide_backfill_tool/cmr/cmr_granule.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ class CmrGranule:
# of data when attempting access (e.g. TypeError, IndexError, KeyError, ...)
# pylint: disable=broad-except

# pylint: disable=too-many-instance-attributes
# pylint: disable=too-many-instance-attributes,too-many-positional-arguments
# pylint: disable-next=too-many-arguments
def __init__(self,
umm_granule,
Expand Down
66 changes: 65 additions & 1 deletion podaac/hitide_backfill_tool/cmr/search.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,12 @@
"""Search for CMR granules"""

# pylint: disable=line-too-long

from concurrent.futures import ThreadPoolExecutor
import ast
import json
import logging
import re
from requests import Session
from requests.exceptions import RequestException
from requests.adapters import HTTPAdapter
Expand All @@ -13,7 +17,7 @@
class GranuleSearch:
"""Searches for CMR granules, with paging"""

# pylint: disable=too-many-instance-attributes,too-many-arguments,too-many-locals
# pylint: disable=too-many-instance-attributes,too-many-arguments,too-many-locals,too-many-positional-arguments

def __init__(self,
base_url,
Expand Down Expand Up @@ -192,6 +196,66 @@ def pages_loaded(self):
"""Return the total number of granule search pages that have been loaded up to this point"""
return self._pages_loaded

def get_one_granule(self, granule_name):
"""Request a single granule from CMR using granule_name or concept_id"""

url = f"{self._base_url}/search/granules.umm_json?"

# Regex for granule concept id
pattern = r"^G\d{10}-"
url += (f"concept_id={granule_name}" if re.match(pattern, granule_name) else
f"provider={self._provider}&short_name={self._collection_short_name}&readable_granule_name={granule_name}")

headers = {}
if self._edl_token:
headers["Authorization"] = f"Bearer {self._edl_token}"
elif self._launchpad_token:
headers["Authorization"] = self._launchpad_token

body = {}
try:
response = self.session.get(url, headers=headers)
response.raise_for_status()

body = json.loads(response.text)
except RequestException as exc:
self._logger.error(f"Error requesting CMR: {exc}")
except json.JSONDecodeError as exc:
self._logger.error(f"Error decoding CMR JSON response: {exc}")

# Error message if there is a problem
if response.status_code >= 400 or body.get("hits") is None or body.get("items") is None or len(body.get("items")) == 0:
granule_message = f"Granule not found: {granule_name}"
self._logger.error(
f"\nCMR problem:\n"
f"url: {url}\n"
f"----------\n"
f"http_code: {response.status_code}\n"
f"body: {response.text}\n"
f"----------\n"
f"{granule_message}\n"
)
raise Exception("CMR error")

return body["items"][0]

def get_granules_in_list(self, granule_list):
"""Iterate through granule_list, get cmr for each item in parallel, and return a list of umm granule json"""

# pylint: disable=broad-except
def safe_get_granule(granule_name):
"""Safely get one granule, catching exceptions"""
try:
return self.get_one_granule(granule_name)
except Exception:
return None

with ThreadPoolExecutor() as executor:
granules = list(executor.map(safe_get_granule, granule_list))

# Filter out any None values due to exceptions
return [granule for granule in granules if granule is not None]

#
# Helpers
#
Expand Down
3 changes: 1 addition & 2 deletions podaac/hitide_backfill_tool/cnm_message_writer.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
class CnmMessageWriter:
"""Creates a Cumulus CNM message from granule."""

# pylint: disable=too-many-instance-attributes,too-many-arguments
# pylint: disable=too-many-instance-attributes,too-many-arguments,too-many-positional-arguments

def __init__(self,
message_config,
Expand Down Expand Up @@ -89,5 +89,4 @@ def write(self, granule, needs_footprint, needs_image, needs_dmrpp, skip_cmr_ope
"username": self.user
}

print(message["payload"])
return json.dumps(message)
2 changes: 1 addition & 1 deletion podaac/hitide_backfill_tool/config_creator.py
Original file line number Diff line number Diff line change
Expand Up @@ -99,7 +99,7 @@ def create_defaults(env):
"aws_profile": "ngap-services-sit"
}

raise f"create_defaults({env}) - env must be ops | uat | sit"
raise Exception(f"create_defaults({env}) - env must be ops | uat | sit")


def create_config(args=None):
Expand Down
Loading