Skip to content

Commit

Permalink
fix(pipeline): fixed errors handling null work.parameters and tests
Browse files Browse the repository at this point in the history
  • Loading branch information
shinybrar committed Nov 8, 2022
1 parent 811b820 commit b549aa7
Showing 1 changed file with 19 additions and 10 deletions.
29 changes: 19 additions & 10 deletions chime_frb_api/workflow/pipeline.py
Original file line number Diff line number Diff line change
Expand Up @@ -75,8 +75,8 @@ def run(
base_urls: List[str],
site: str,
log_level: str,
) -> None:
"""Run a workflow pipeline.
) -> bool:
r"""Run a workflow pipeline.
Performs the following steps: \n
\t1. Withdraws `Work` from appropriate pipeline.\t\t\t\t
Expand Down Expand Up @@ -139,7 +139,7 @@ def run(
lifetime -= 1
logger.debug(f"Sleeping: {sleep_time} seconds")
time.sleep(sleep_time)
return None
return True


def attempt_work(name: str, user_func: FUNC_TYPE, base_url: str, site: str) -> bool:
Expand Down Expand Up @@ -180,20 +180,29 @@ def attempt_work(name: str, user_func: FUNC_TYPE, base_url: str, site: str) -> b
# If the function is a click command, gather all the default options
defaults: Dict[Any, Any] = {}
if isinstance(user_func, click.Command):
logger.info(f"CLI Detected: {CHECKMARK}")
logger.debug("gathering default options not specified in work.parameters")
logger.info(f"Click CLI Detected: {CHECKMARK}")
logger.debug("Gathering CLI Defaults")
# Get default options from the click command
known: List[Any] = list(work.parameters.keys()) if work.parameters else []
for parameter in user_func.params:
if parameter.name not in work.parameters.keys(): # type: ignore
if parameter.name not in known: # type: ignore
defaults[parameter.name] = parameter.default
if defaults:
logger.info(f"CLI Defaults: {CHECKMARK}")
logger.debug(f"CLI Defaults: {defaults}")
user_func = user_func.callback # type: ignore
else:
logger.info(f"CLI Detected: {CIRCLE}")

# Merge the defaults with the work parameters
parameters: Dict[Any, Any] = {**defaults, **work.parameters} # type: ignore
logger.info(f"Work Parameters: {parameters}")
# If work.parameters is empty, merge an empty dict with the defaults
# Otherwise, merge the work.parameters with the defaults
parameters: Dict[str, Any] = {}
if work.parameters:
parameters = {**work.parameters, **defaults}
else:
parameters = defaults
logger.info(f"Parameters: {CHECKMARK}")
logger.debug(f"Parameters: {parameters}")

# Execute the user function
try:
Expand Down Expand Up @@ -239,7 +248,7 @@ def attempt_work(name: str, user_func: FUNC_TYPE, base_url: str, site: str) -> b
logger.info(f"Work Updated: {CHECKMARK}")
updated = True
break
except requests.RequestException as error:
except requests.RequestException:
logger.debug("retrying work update...")
time.sleep(1)
if not updated:
Expand Down

0 comments on commit b549aa7

Please sign in to comment.