Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[develop] First implementation of run_WE2E_tests.py #558

Merged
Show file tree
Hide file tree
Changes from 31 commits
Commits
Show all changes
45 commits
Select commit Hold shift + click to select a range
6846d25
Initial version, takes file with list of tests and checks they are al…
mkavulich Nov 10, 2022
f9fdd50
Add argparse
mkavulich Nov 10, 2022
24e3b63
- Add more command line options, format help message better
mkavulich Nov 10, 2022
a912c39
Use direct calls to logging, use args.debug and logging.debug to mana…
mkavulich Nov 22, 2022
17de277
Add more command line arguments from original script, update config.y…
mkavulich Nov 22, 2022
d3cca9f
Almost ready to call generate_FV3LAM_wflow! Added some section-specif…
mkavulich Nov 30, 2022
e7ed371
SUCCESS! Script now runs on a card-coded test input file. Next step is
mkavulich Jan 17, 2023
b36bb7d
Implemented test specification as a single, unified command-line argu…
mkavulich Dec 3, 2022
8ae8327
Fix test filename bug
mkavulich Jan 17, 2023
2e614e3
Updates to logging to ensure log files are written correctly and in the
mkavulich Jan 18, 2023
7d7501f
Improvements to workflow generation
mkavulich Jan 19, 2023
3cce1d7
Finish up initial version of run_WE2E_tests.py, start work on
mkavulich Jan 19, 2023
2274c0d
Continue to populate monitor_jobs.py
mkavulich Jan 20, 2023
3d53ed7
monitor_jobs.py nearly complete!
mkavulich Jan 20, 2023
ef072f9
If RUN_TASK_GET_EXTRN_ICS and/or RUN_TASK_GET_EXTRN_LBCS are false, d…
mkavulich Jan 21, 2023
d2a4cac
Ladies and Gentlemen and Smizmars...we have a working prototype!
mkavulich Jan 21, 2023
18e1355
Fixing bug in specify_template_filenames test: files were re-named in…
mkavulich Jan 21, 2023
39083c7
Address lists correctly so --tests argument works correctly
mkavulich Jan 21, 2023
c62c8ce
Revert "If RUN_TASK_GET_EXTRN_ICS and/or RUN_TASK_GET_EXTRN_LBCS are …
mkavulich Jan 21, 2023
374ed9e
Remove sections "task_get_extrn_ics" and "task_get_extrn_lbcs" from c…
mkavulich Jan 21, 2023
f439b63
In monitor_jobs, need to make a copy of running_expts explicitly ( = …
mkavulich Jan 21, 2023
e48fc4b
Script now runs most of Cheyenne's fundamental tests...still need to …
mkavulich Jan 21, 2023
3774fa6
Make things a bit more verbose, especially for debug mode
mkavulich Jan 23, 2023
ee995fb
Implement command-line submission of monitor_jobs.py. This is
mkavulich Jan 23, 2023
e447cba
Had more discussion with Chris Harrop about nature of rocotorun issue…
mkavulich Jan 24, 2023
64944eb
Improvements to run_WE2E_tests.py
mkavulich Jan 24, 2023
213720d
More sprucing up of the new script
mkavulich Jan 24, 2023
786ad00
add 'quiet' option to suppress test generation output
mkavulich Jan 24, 2023
05c06b7
Remove some old debug prints
mkavulich Jan 24, 2023
d0c7a17
Cleanup from pylint
mkavulich Jan 24, 2023
faa711a
Forgot to add example monitor_jobs.yaml
mkavulich Jan 25, 2023
f6bd027
Merge remote-tracking branch 'origin/develop' into feature/issue_462_…
mkavulich Jan 26, 2023
cc9ffb6
Revert changes to config.specify_template_filenames.yaml. This will s…
mkavulich Jan 27, 2023
f4a0b0a
Add check that there are no duplicate test file names
mkavulich Jan 31, 2023
3145ee0
Consolidate rocotorun_cmd into a defined variable for clarity/compact…
mkavulich Jan 31, 2023
70a477d
Fix bug with debug flag causing pernicious I/O errors
mkavulich Feb 1, 2023
74d9608
Format improvement suggested by Christina
mkavulich Feb 1, 2023
dd86024
Merge commit '25418e86315c1df1e20ca2fd7e096124c28efec4' into feature/…
mkavulich Feb 1, 2023
61c5940
Address more review comments:
mkavulich Feb 1, 2023
cfaca28
Convert --machine argument to lowercase, use os.path.join for paths
mkavulich Feb 1, 2023
3dee9a3
Forgot to commit this fix for bad directory name
mkavulich Feb 1, 2023
8378507
In monitor_jobs.py:
mkavulich Feb 1, 2023
5bcd011
Per review comments, use .get() method in dictionaries to simplify logic
mkavulich Feb 1, 2023
cbcfe5c
Address more review comments
mkavulich Feb 1, 2023
a6e6608
Address final (?) reviewer comments
mkavulich Feb 1, 2023
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
280 changes: 280 additions & 0 deletions tests/WE2E/monitor_jobs.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,280 @@
#!/usr/bin/env python3

import sys
import argparse
import logging
import subprocess
import sqlite3
import time
from textwrap import dedent
from datetime import datetime
from contextlib import closing

sys.path.append("../../ush")

from python_utils import (
load_config_file,
cfg_to_yaml_str
)

from check_python_version import check_python_version


def monitor_jobs(expt_dict: dict, monitor_file: str = '', debug: bool = False) -> str:
"""Function to monitor and run jobs for the specified experiment using Rocoto

Args:
expt_dict (dict): A dictionary containing the information needed to run
one or more experiments. See example file monitor_jobs.yaml
monitor_file (str): [optional]
debug (bool): [optional] Enable extra output for debugging
Returns:
monitor_file (str): The name of the file used for job monitoring (when script
is finish, this contains results/summary)

"""

starttime = datetime.now()
# Write monitor_file, which will contain information on each monitored experiment
if not monitor_file:
monitor_file = f'monitor_jobs_{starttime.strftime("%Y%m%d%H%M%S")}.yaml'
logging.info(f"Writing information for all experiments to {monitor_file}")

write_monitor_file(monitor_file,expt_dict)

# Perform initial setup for each experiment
logging.info("Checking tests available for monitoring...")
num_expts = 0
for expt in expt_dict:
logging.info(f"Starting experiment {expt} running")
num_expts += 1
rocoto_db = f"{expt_dict[expt]['expt_dir']}/FV3LAM_wflow.db"
subprocess.run(["rocotorun", f"-w {expt_dict[expt]['expt_dir']}/FV3LAM_wflow.xml", f"-d {rocoto_db}"])
logging.debug(f"Reading database for experiment {expt}, populating experiment dictionary")
try:
db = sqlite_read(rocoto_db,'SELECT taskname,cycle,state from jobs')
except:
logging.warning(f"Unable to read database {rocoto_db}\nWill not track experiment {expt}")
expt_dict[expt]["status"] = "ERROR"
num_expts -= 1
continue
for task in db:
# For each entry from rocoto database, store that under a dictionary key named TASKNAME_CYCLE
# Cycle comes from the database in Unix Time (seconds), so convert to human-readable
cycle = datetime.utcfromtimestamp(task[1]).strftime('%Y%m%d%H%M')
expt_dict[expt][f"{task[0]}_{cycle}"] = task[2]
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

task[2] here depends on your SELECT statement never changing. It might be helpful for the user if you made a named tuple to hold the information in the task so that you don't have to worry about order here and losing track.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I do not see a way to convert the output from fetchall() to a list of namedtuples without significantly increasing the complexity of the code. I'd rather just handle this with a comment.

expt_dict[expt] = update_expt_status(expt_dict[expt], expt)
#Run rocotorun again to get around rocotobqserver proliferation issue
subprocess.run(["rocotorun", f"-w {expt_dict[expt]['expt_dir']}/FV3LAM_wflow.xml", f"-d {rocoto_db}"])
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Will the calls to subprocess like this wait until rocotorun completes before control is returned to the Python script? If not, you might be over-taxing the system here.

Also, is rocotorun also running in a cron job at this point?

Copy link
Collaborator Author

@mkavulich mkavulich Jan 31, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

subprocess.run waits for the subprocess to complete before continuing (python documentation here). I decided not to try to capture the output from the subprocess because this seems to slow things down drastically (like, orders of magnitude slower).

The monitor_jobs() function is only called by run_WE2E_tests.py if the --use_cron_to_relaunch option is not specified, so this should never be run in conjunction with crontab running. But I can't control what users choose to do; theoretically this should work just fine if someone also had a crontab entry for this job and called this script manually.


write_monitor_file(monitor_file,expt_dict)

logging.info(f'Setup complete; monitoring {num_expts} experiments')

#Make a copy of experiment dictionary; will use this copy to monitor active experiments
running_expts = expt_dict.copy()

i = 0
while running_expts:
i += 1
for expt in running_expts.copy():
logging.debug(f"Updating status of {expt}")
rocoto_db = f"{running_expts[expt]['expt_dir']}/FV3LAM_wflow.db"
try:
db = sqlite_read(rocoto_db,'SELECT taskname,cycle,state from jobs')
except:
logging.warning(f"Unable to read database {rocoto_db}\nWill not track experiment {expt}")
expt_dict[expt]["status"] = "ERROR"
continue
# Run "rocotorun" here to give the database time to be fully written
subprocess.run(["rocotorun", f"-w {expt_dict[expt]['expt_dir']}/FV3LAM_wflow.xml", f"-d {rocoto_db}"])
#Run rocotorun again to get around rocotobqserver proliferation issue
subprocess.run(["rocotorun", f"-w {expt_dict[expt]['expt_dir']}/FV3LAM_wflow.xml", f"-d {rocoto_db}"])
for task in db:
# For each entry from rocoto database, store that under a dictionary key named TASKNAME_CYCLE
# Cycle comes from the database in Unix Time (seconds), so convert to human-readable
cycle = datetime.utcfromtimestamp(task[1]).strftime('%Y%m%d%H%M')
expt_dict[expt][f"{task[0]}_{cycle}"] = task[2]
expt_dict[expt] = update_expt_status(expt_dict[expt], expt)
running_expts[expt] = expt_dict[expt]
if running_expts[expt]["status"] in ['DEAD','ERROR','COMPLETE']:
logging.info(f'Experiment {expt} is {running_expts[expt]["status"]}; will no longer monitor.')
running_expts.pop(expt)
continue
logging.debug(f'Experiment {expt} status is {expt_dict[expt]["status"]}')


write_monitor_file(monitor_file,expt_dict)
endtime = datetime.now()
total_walltime = endtime - starttime

logging.debug(f"Finished loop {i}\nWalltime so far is {str(total_walltime)}")

#Slow things down just a tad between loops so experiments behave better
time.sleep(5)


endtime = datetime.now()
total_walltime = endtime - starttime

logging.info(f'All {num_expts} experiments finished in {str(total_walltime)}')

return monitor_file


def update_expt_status(expt: dict, name: str) -> dict:
"""
This function reads the dictionary showing the status of a given experiment (as read from the
rocoto database file) and uses a simple set of rules to combine the statuses of every task into
a useful "status" for the whole experiment.

Experiment "status" levels explained:
CREATED: The experiments have been created, but the monitor script has not yet processed them.
This is immediately overwritten at the beginning of the "monitor_jobs" function, so we
should never see this status in this function. Including just for completeness sake.
SUBMITTING: All jobs are in status SUBMITTING or SUCCEEDED. This is a normal state; we will
continue to monitor this experiment.
DYING: One or more tasks have died (status "DEAD"), so this experiment has had an error.
We will continue to monitor this experiment until all tasks are either status DEAD or
status SUCCEEDED (see next entry).
DEAD: One or more tasks are at status DEAD, and the rest are either DEAD or SUCCEEDED. We
will no longer monitor this experiment.
ERROR: One or more tasks are at status UNKNOWN, meaning that rocoto has failed to track the
job associated with that task. This will require manual intervention to solve, so we
will no longer monitor this experiment.
This status may also appear if we fail to read the rocoto database file.
RUNNING: One or more jobs are at status RUNNING, and the rest are either status QUEUED, SUBMITTED,
or SUCCEEDED. This is a normal state; we will continue to monitor this experiment.
QUEUED: One or more jobs are at status QUEUED, and some others may be at status SUBMITTED or
SUCCEEDED.
This is a normal state; we will continue to monitor this experiment.
SUCCEEDED: All jobs are status SUCCEEDED; we will monitor for one more cycle in case there are
unsubmitted jobs remaining.
COMPLETE:All jobs are status SUCCEEDED, and we have monitored this job for an additional cycle
to ensure there are no un-submitted jobs. We will no longer monitor this experiment.
"""

#If we are no longer tracking this experiment, return unchanged
if expt["status"] in ['DEAD','ERROR','COMPLETE']:
return expt
statuses = list()
for task in expt:
# Skip non-task entries
if task in ["expt_dir","status"]:
continue
statuses.append(expt[task])

if "DEAD" in statuses:
if ( "RUNNING" in statuses ) or ( "SUBMITTING" in statuses ) or ( "QUEUED" in statuses ):
expt["status"] = "DYING"
else:
expt["status"] = "DEAD"
return expt

if "UNKNOWN" in statuses:
expt["status"] = "ERROR"

if "RUNNING" in statuses:
expt["status"] = "RUNNING"
elif "QUEUED" in statuses:
expt["status"] = "QUEUED"
elif "SUBMITTING" in statuses:
expt["status"] = "SUBMITTING"
elif "SUCCEEDED" in statuses:
if expt["status"] == "SUCCEEDED":
expt["status"] = "COMPLETE"
else:
expt["status"] = "SUCCEEDED"
else:
logging.fatal("Some kind of horrible thing has happened")
raise ValueError(dedent(f"""Some kind of horrible thing has happened to the experiment status
for experiment {name}
status is {expt["status"]}
all task statuses are {statuses}"""))

return expt


def write_monitor_file(monitor_file: str, expt_dict: dict):
try:
with open(monitor_file,"w") as f:
f.write("### WARNING ###\n")
f.write("### THIS FILE IS AUTO_GENERATED AND REGULARLY OVER-WRITTEN BY monitor_jobs.py\n")
f.write("### EDITS MAY RESULT IN MISBEHAVIOR OF EXPERIMENTS RUNNING\n")
f.writelines(cfg_to_yaml_str(expt_dict))
except:
logging.fatal("\n********************************\n")
logging.fatal(f"WARNING WARNING WARNING\nFailure occurred while writing monitor file {monitor_file}")
logging.fatal("File may be corrupt or invalid for re-run!!")
logging.fatal("\n********************************\n")
raise

def sqlite_read(db: str, ex: str) -> list:
# # Create Named Tuple for better data organization
# Task = namedtuple("taskname","cycle","state")

with closing(sqlite3.connect(db)) as connection:
with closing(connection.cursor()) as cur:
db = cur.execute(ex).fetchall()
return db
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Any chance this is a problem if a user calls this script for a real-time experiment whose first cycle is a LONG time ago and the cycles since then are all inactive?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This script is not designed with real-time experiments in mind, just the WE2E tests. If realtime functionality is desired and the current logic is insufficient then those considerations can be taken into account in future updates.



def setup_logging(logfile: str = "log.run_WE2E_tests", debug: bool = False) -> None:
"""
Sets up logging, printing high-priority (INFO and higher) messages to screen, and printing all
messages with detailed timing and routine info in the specified text file.
"""
logging.getLogger().setLevel(logging.DEBUG)

formatter = logging.Formatter("%(name)-16s %(levelname)-8s %(message)s")

fh = logging.FileHandler(logfile, mode='w')
fh.setLevel(logging.DEBUG)
fh.setFormatter(formatter)
logging.getLogger().addHandler(fh)

logging.debug(f"Finished setting up debug file logging in {logfile}")
console = logging.StreamHandler()
if debug:
console.setLevel(logging.DEBUG)
else:
console.setLevel(logging.INFO)
logging.getLogger().addHandler(console)
logging.debug("Logging set up successfully")


if __name__ == "__main__":

check_python_version()

logfile='log.monitor_jobs'

#Parse arguments
parser = argparse.ArgumentParser(description="Script for monitoring and running jobs in a specified experiment, as specified in a yaml configuration file\n")

parser.add_argument('-y', '--yaml_file', type=str, help='YAML-format file specifying the information of jobs to be run; for an example file, see monitor_jobs.yaml', required=True)
parser.add_argument('-d', '--debug', action='store_true', help='Script will be run in debug mode with more verbose output')

args = parser.parse_args()

setup_logging(logfile,args.debug)

expt_dict = load_config_file(args.yaml_file)

#Call main function

try:
monitor_jobs(expt_dict,args.yaml_file, args.debug)
except:
logging.exception(
dedent(
f"""
*********************************************************************
FATAL ERROR:
An error occurred. See the error message(s) printed below.
For more detailed information, check the log file from the workflow
generation script: {logfile}
*********************************************************************\n
"""
)
)
54 changes: 54 additions & 0 deletions tests/WE2E/monitor_jobs.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,54 @@
# This is an example yaml file showing the various entries that can be created for tracking jobs by monitor_jobs.py
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How should this file be updated when changes are made to monitor_jobs.py such that this file is modified?

Can that be a GitHub action?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It is unclear to me how that would be handled in an automatic way. Is it worth the effort for something that will likely not change very often, if at all?

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It could be nice just to drop in a comment on how this appeared here so that future you is not completely baffled. ;)

# Any valid file created by monitor_jobs.py (unless corrupted) can be re-submitted for continued tracking if any
# experiments are yet to be completed.
# If an experiment with status: COMPLETE, DEAD, or ERROR is read by monitor_jobs,py, it will be ignored.
#First example: an experiment that has been created by generate_FV3LAM_workflow.py but has not yet started running
custom_ESGgrid:
expt_dir: /some_directory/expt_dirs/custom_ESGgrid
status: CREATED
#Second example: an experiment that has just been submitted
custom_ESGgrid:
expt_dir: /some_directory/expt_dirs/custom_ESGgrid
status: SUBMITTING
make_grid_201907010000: SUBMITTING
get_extrn_ics_201907010000: SUBMITTING
get_extrn_lbcs_201907010000: SUBMITTING
#Third example: an experiment with a mix of successful and running tasks
custom_ESGgrid:
expt_dir: /some_directory/expt_dirs/custom_ESGgrid
status: RUNNING
make_grid_201907010000: SUCCEEDED
get_extrn_ics_201907010000: SUCCEEDED
get_extrn_lbcs_201907010000: SUCCEEDED
make_orog_201907010000: SUCCEEDED
make_sfc_climo_201907010000: SUCCEEDED
make_ics_201907010000: RUNNING
make_lbcs_201907010000: RUNNING
#Fourth example: an experiment that has completed successfully
custom_ESGgrid:
expt_dir: /some_directory/expt_dirs/custom_ESGgrid
status: COMPLETE
make_grid_201907010000: SUCCEEDED
get_extrn_ics_201907010000: SUCCEEDED
get_extrn_lbcs_201907010000: SUCCEEDED
make_orog_201907010000: SUCCEEDED
make_sfc_climo_201907010000: SUCCEEDED
make_ics_201907010000: SUCCEEDED
make_lbcs_201907010000: SUCCEEDED
run_fcst_201907010000: SUCCEEDED
run_post_f000_201907010000: SUCCEEDED
run_post_f001_201907010000: SUCCEEDED
run_post_f002_201907010000: SUCCEEDED
run_post_f003_201907010000: SUCCEEDED
run_post_f004_201907010000: SUCCEEDED
run_post_f005_201907010000: SUCCEEDED
run_post_f006_201907010000: SUCCEEDED
#Fifth example: an experiment that has died due to a failed task.
custom_ESGgrid:
expt_dir: /some_directory/expt_dirs/custom_ESGgrid
status: DEAD
make_grid_201907010000: SUCCEEDED
get_extrn_ics_201907010000: SUCCEEDED
get_extrn_lbcs_201907010000: SUCCEEDED
make_orog_201907010000: DEAD

Loading