diff --git a/cime/machines-acme/config_machines.xml b/cime/machines-acme/config_machines.xml
index fb071429cfe6..3169d8e79998 100644
--- a/cime/machines-acme/config_machines.xml
+++ b/cime/machines-acme/config_machines.xml
@@ -298,7 +298,7 @@
jgfouca at sandia dot gov
4
- 8
+ 16
1
TRUE
diff --git a/cime/scripts-acme/bless_test_results b/cime/scripts-acme/bless_test_results
index c937507fd211..180dc2b3c644 100755
--- a/cime/scripts-acme/bless_test_results
+++ b/cime/scripts-acme/bless_test_results
@@ -148,8 +148,8 @@ def bless_test_results(baseline_name, test_root, compiler, namelists_only=False,
expect(create_test_impl.NAMELIST_PHASE in test_result,
"Test '%s' had no namelist phase" % test_name)
- run_phase_pass = test_result[wait_for_tests.RUN_PHASE] == wait_for_tests.TEST_PASSED_STATUS
- nl_pass = test_result[create_test_impl.NAMELIST_PHASE] == wait_for_tests.TEST_PASSED_STATUS
+ run_phase_pass = test_result[wait_for_tests.RUN_PHASE] == wait_for_tests.TEST_PASS_STATUS
+ nl_pass = test_result[create_test_impl.NAMELIST_PHASE] == wait_for_tests.TEST_PASS_STATUS
if (not run_phase_pass):
warning("Test '%s' did not run successfully, it is not safe to bless results" % test_name)
@@ -157,7 +157,7 @@ def bless_test_results(baseline_name, test_root, compiler, namelists_only=False,
else:
expect(wait_for_tests.HIST_COMPARE_PHASE in test_result,
"Test '%s' had no history compare phase" % test_name)
- hist_pass = test_result[wait_for_tests.HIST_COMPARE_PHASE] == wait_for_tests.TEST_PASSED_STATUS
+ hist_pass = test_result[wait_for_tests.HIST_COMPARE_PHASE] == wait_for_tests.TEST_PASS_STATUS
if ( (nl_pass and hist_pass) or (nl_pass and namelists_only) or (hist_pass and hist_only) ):
print "Nothing to bless for test:", test_name, " overall status:", overall_result
diff --git a/cime/scripts-acme/create_test b/cime/scripts-acme/create_test
index 77c874faa0b7..3123837eb322 100755
--- a/cime/scripts-acme/create_test
+++ b/cime/scripts-acme/create_test
@@ -111,6 +111,10 @@ formatter_class=argparse.ArgumentDefaultsHelpFormatter
"If no testid is specified, then a time stamp will be"
"used.")
+ parser.add_argument("-j", "--parallel-jobs", type=int, default=None,
+ help="Number of tasks create_test should perform simultaneously. Default "
+ "will be min(num_cores, num_tests).")
+
parser.add_argument("--old", action="store_true", help="Use CIME Perl impl")
args = parser.parse_args(args[1:])
@@ -123,6 +127,9 @@ formatter_class=argparse.ArgumentDefaultsHelpFormatter
"Provided baseline name but did not specify compare or generate")
expect(not (args.namelists_only and not (args.generate or args.compare)),
"Must provide either --compare or --generate with --namelists-only")
+ if (args.parallel_jobs is not None):
+ expect(args.parallel_jobs > 0,
+ "Invalid value for parallel_jobs: %d" % args.parallel_jobs)
if (args.no_build):
args.no_run = True
@@ -163,7 +170,7 @@ formatter_class=argparse.ArgumentDefaultsHelpFormatter
args.test_id = acme_util.get_utc_timestamp()
return args.testargs, args.compiler, args.no_run, args.no_build, args.no_batch, args.test_root, args.baseline_root, \
- args.clean, args.compare, args.generate, args.baseline_name, args.namelists_only, args.project, args.test_id, args.old
+ args.clean, args.compare, args.generate, args.baseline_name, args.namelists_only, args.project, args.test_id, args.old, args.parallel_jobs
###############################################################################
def get_tests_from_args(testargs, machine, compiler):
@@ -205,12 +212,15 @@ def get_tests_from_args(testargs, machine, compiler):
###############################################################################
def create_test(testargs, compiler, no_run, no_build, no_batch, test_root,
baseline_root, clean, compare, generate,
- baseline_name, namelists_only, project, test_id, old):
+ baseline_name, namelists_only, project, test_id, old, parallel_jobs):
###############################################################################
machine = acme_util.probe_machine_name()
tests_to_run = get_tests_from_args(testargs, machine, compiler)
+ if (parallel_jobs is None):
+ parallel_jobs = min(len(tests_to_run), int(acme_util.get_machine_info("MAX_TASKS_PER_NODE")))
+
expect(len(tests_to_run) > 0, "No tests to run")
if (not old):
@@ -220,7 +230,7 @@ def create_test(testargs, compiler, no_run, no_build, no_batch, test_root,
baseline_root, baseline_name,
clean,
compare, generate, namelists_only,
- project)
+ project, parallel_jobs)
return 0 if impl.create_test() else 1
else:
@@ -274,11 +284,11 @@ def _main_func(description):
acme_util.stop_buffering_output()
testargs, compiler, no_run, no_build, no_batch, test_root, baseline_root, clean, \
- compare, generate, baseline_name, namelists_only, project, test_id, old = \
+ compare, generate, baseline_name, namelists_only, project, test_id, old, parallel_jobs = \
parse_command_line(sys.argv, description)
sys.exit(create_test(testargs, compiler, no_run, no_build, no_batch, test_root, baseline_root, clean,
- compare, generate, baseline_name, namelists_only, project, test_id, old))
+ compare, generate, baseline_name, namelists_only, project, test_id, old, parallel_jobs))
###############################################################################
diff --git a/cime/scripts-acme/create_test_impl.py b/cime/scripts-acme/create_test_impl.py
index 90bcf4d3c38a..d793626462e8 100644
--- a/cime/scripts-acme/create_test_impl.py
+++ b/cime/scripts-acme/create_test_impl.py
@@ -2,19 +2,21 @@
Implementation of create_test functionality from CIME
"""
-import sys, os, shutil, traceback, stat, glob
+import sys, os, shutil, traceback, stat, glob, threading, Queue, time
import acme_util, compare_namelists, wait_for_tests
from acme_util import expect, warning, verbose_print, run_cmd
-from wait_for_tests import TEST_PASSED_STATUS, TEST_FAIL_STATUS, TEST_PENDING_STATUS, TEST_STATUS_FILENAME, NAMELIST_FAIL_STATUS, RUN_PHASE, NAMELIST_PHASE
+from wait_for_tests import TEST_PASS_STATUS, TEST_FAIL_STATUS, TEST_PENDING_STATUS, TEST_STATUS_FILENAME, NAMELIST_FAIL_STATUS, RUN_PHASE, NAMELIST_PHASE
-INITIAL_PHASE = "INITIAL_PHASE"
+INITIAL_PHASE = "INIT"
CREATE_NEWCASE_PHASE = "CREATE_NEWCASE"
-XML_PHASE = "XML_PHASE"
+XML_PHASE = "XML"
SETUP_PHASE = "SETUP"
BUILD_PHASE = "BUILD"
+TEST_STATUS_PHASE = "TEST_STATUS"
PHASES = [INITIAL_PHASE, CREATE_NEWCASE_PHASE, XML_PHASE, SETUP_PHASE, NAMELIST_PHASE, BUILD_PHASE, RUN_PHASE] # Order matters
+CONTINUE = [TEST_PASS_STATUS, NAMELIST_FAIL_STATUS]
###############################################################################
class CreateTest(object):
@@ -27,7 +29,7 @@ def __init__(self, test_names,
baseline_root, baseline_name,
clean,
compare, generate, namelists_only,
- project):
+ project, parallel_jobs):
###########################################################################
self._test_names = test_names
self._no_run = no_run
@@ -42,14 +44,32 @@ def __init__(self, test_names,
self._generate = generate
self._namelists_only = namelists_only
self._project = project
+ self._parallel_jobs = parallel_jobs
self._cime_root = acme_util.get_cime_root()
- self._test_states = [ (INITIAL_PHASE, TEST_PASSED_STATUS) ] * len(test_names)
+ # Oversubscribe by 1/4
+ pes = int(acme_util.get_machine_info("MAX_TASKS_PER_NODE"))
+
+ # Threads members must be protected with mutex
+ self._test_states = [ (INITIAL_PHASE, TEST_PASS_STATUS) ] * len(test_names)
+ self._proc_pool = int(pes * 1.25)
# Since the name-list phase can fail without aborting later phases, we
# need some extra state to remember tests that had namelist problems
- self._tests_with_nl_problems = []
+ self._tests_with_nl_problems = [None] * len(test_names)
+
+ self._mutex = threading.Lock()
+ self._available_work = Queue.Queue()
+
+ # Setup phases
+ self._phases = list(PHASES)
+ if (no_build):
+ self._phases.remove(BUILD_PHASE)
+ if (no_run):
+ self._phases.remove(RUN_PHASE)
+ if (not self._compare and not self._generate):
+ self._phases.remove(NAMELIST_PHASE)
# Validate any assumptions that were not caught by the arg parser
@@ -87,15 +107,27 @@ def _get_test_dir(self, test_name):
return os.path.join(self._test_root, self._get_case_id(test_name))
###########################################################################
- def _get_test_data(self, test_name, idx):
+ def _get_test_data(self, test_name, idx=None):
###########################################################################
+ assert self._mutex.locked()
+
state_idx = self._test_names.index(test_name)
- return self._test_states[state_idx][idx]
+ if (idx is None):
+ return self._test_states[state_idx]
+ else:
+ return self._test_states[state_idx][idx]
###########################################################################
def _is_broken(self, test_name):
###########################################################################
- return self._get_test_status(test_name) not in [TEST_PASSED_STATUS, NAMELIST_FAIL_STATUS, TEST_PENDING_STATUS]
+ status = self._get_test_status(test_name)
+ return status not in CONTINUE and status != TEST_PENDING_STATUS
+
+ ###########################################################################
+ def _work_remains(self, test_name):
+ ###########################################################################
+ test_phase, test_status = self._get_test_data(test_name)
+ return (test_status in CONTINUE or test_status == TEST_PENDING_STATUS) and test_phase != self._phases[-1]
###########################################################################
def _get_test_status(self, test_name, phase=None):
@@ -105,10 +137,10 @@ def _get_test_status(self, test_name, phase=None):
elif (phase is None or phase == self._get_test_phase(test_name)):
return self._get_test_data(test_name, 1)
else:
- expect(phase is None or PHASES.index(phase) < PHASES.index(self._get_test_phase(test_name)),
+ expect(phase is None or self._phases.index(phase) < self._phases.index(self._get_test_phase(test_name)),
"Tried to see the future")
# Assume all older phases PASSed
- return TEST_PASSED_STATUS
+ return TEST_PASS_STATUS
###########################################################################
def _get_test_phase(self, test_name):
@@ -118,18 +150,27 @@ def _get_test_phase(self, test_name):
###########################################################################
def _update_test_status(self, test_name, phase, status):
###########################################################################
- state_idx = self._test_names.index(test_name)
- phase_idx = PHASES.index(phase)
+ assert self._mutex.locked()
- expect(not self._is_broken(test_name),
- "Why did we move on to next phase when prior phase did not pass?")
- expect(PHASES.index(self._get_test_phase(test_name)) == phase_idx - 1,
- "Skipped phase?")
+ state_idx = self._test_names.index(test_name)
+ phase_idx = self._phases.index(phase)
+ old_phase, old_status = self._test_states[state_idx]
+
+ if (old_phase == phase):
+ expect(old_status == TEST_PENDING_STATUS,
+ "Only valid to transition from PENDING to something else, found '%s'" % old_status)
+ expect(status != TEST_PENDING_STATUS,
+ "Cannot transition from PEND -> PEND")
+ else:
+ expect(old_status in CONTINUE,
+ "Why did we move on to next phase when prior phase did not pass?")
+ expect(self._phases.index(old_phase) == phase_idx - 1,
+ "Skipped phase?")
self._test_states[state_idx] = (phase, status)
###########################################################################
- def _run_phase_command(self, test_name, cmd, phase, from_dir=None, pass_status=TEST_PASSED_STATUS):
+ def _run_phase_command(self, test_name, cmd, phase, from_dir=None):
###########################################################################
rc, output, errput = run_cmd(cmd, ok_to_fail=True, from_dir=from_dir)
if (rc != 0):
@@ -141,40 +182,40 @@ def _run_phase_command(self, test_name, cmd, phase, from_dir=None, pass_status=T
"%s PASSED for test '%s'.\nCommand: %s\nOutput: %s\n\nErrput: %s" %
(phase, test_name, cmd, output, errput))
- self._update_test_status(test_name, phase, pass_status if rc == 0 else TEST_FAIL_STATUS)
return rc == 0
###########################################################################
- def _create_newcase(self, test_name):
+ def _create_newcase_phase(self, test_name):
###########################################################################
- print "Creating case for", test_name
+ test_dir = self._get_test_dir(test_name)
test_case, case_opts, grid, compset, machine, compiler, test_mods = acme_util.parse_test_name(test_name)
- scratch_dir = acme_util.get_machine_info("CESMSCRATCHROOT", machine=machine, project=self._project)
-
- test_dir = self._get_test_dir(test_name)
+ if (self._parallel_jobs == 1):
+ scratch_dir = acme_util.get_machine_info("CESMSCRATCHROOT", machine=machine, project=self._project)
+ sharedlibroot = os.path.join(scratch_dir, "sharedlibroot.%s" % self._test_id)
+ else:
+ # Parallelizing builds introduces potential sync problems with sharedlibroot
+ # Just let every case build it's own
+ sharedlibroot = os.path.join(test_dir, "sharedlibroot.%s" % self._test_id)
create_newcase_cmd = "%s -silent -case %s -res %s -mach %s -compiler %s -compset %s -testname %s -project %s -nosavetiming -sharedlibroot %s" % \
(os.path.join(self._cime_root, "scripts", "create_newcase"),
test_dir, grid, machine, compiler, compset, test_case, self._project,
- os.path.join(scratch_dir, "sharedlibroot.%s" % self._test_id))
+ sharedlibroot)
if (case_opts is not None):
create_newcase_cmd += " -confopts _%s" % ("_".join(case_opts))
if (test_mods is not None):
test_mod_file = os.path.join(self._cime_root, "scripts", "Testing", "Testlistxml", "testmods_dirs", test_mods)
if (not os.path.exists(test_mod_file)):
self._log_output(test_name, "Missing testmod file '%s'" % test_mod_file)
- self._update_test_status(test_name, CREATE_NEWCASE_PHASE, TEST_FAIL_STATUS)
return False
create_newcase_cmd += " -user_mods_dir %s" % test_mod_file
return self._run_phase_command(test_name, create_newcase_cmd, CREATE_NEWCASE_PHASE)
###########################################################################
- def _set_up_xml(self, test_name):
+ def _xml_phase(self, test_name):
###########################################################################
- print "Setting up XML for", test_name
-
test_case, _, _, _, machine, _, _ = acme_util.parse_test_name(test_name)
xml_file = os.path.join(self._get_test_dir(test_name), "env_test.xml")
@@ -213,10 +254,8 @@ def _set_up_xml(self, test_name):
return self._run_phase_command(test_name, xml_bridge_cmd, XML_PHASE)
###########################################################################
- def _setup_test(self, test_name):
+ def _setup_phase(self, test_name):
###########################################################################
- print "Setting up test case", test_name
-
test_case = acme_util.parse_test_name(test_name)[0]
test_dir = self._get_test_dir(test_name)
test_case_definition_dir = os.path.join(self._cime_root, "scripts", "Testing", "Testcases")
@@ -230,11 +269,8 @@ def _setup_test(self, test_name):
return self._run_phase_command(test_name, "./cesm_setup", SETUP_PHASE, from_dir=test_dir)
###########################################################################
- def _process_namelists(self, test_name):
+ def _nlcomp_phase(self, test_name):
###########################################################################
- if (self._compare or self._generate):
- print "Processing namelists for test case", test_name
-
test_dir = self._get_test_dir(test_name)
casedoc_dir = os.path.join(test_dir, "CaseDocs")
baseline_dir = os.path.join(self._baseline_root, self._baseline_name, test_name)
@@ -267,7 +303,8 @@ def _process_namelists(self, test_name):
self._log_output(test_name, output)
if (has_fails):
- self._tests_with_nl_problems.append(test_name)
+ idx = self._test_names.index(test_name)
+ self._tests_with_nl_problems[idx] = test_name
elif (self._generate):
if (not os.path.isdir(baseline_dir)):
@@ -280,39 +317,39 @@ def _process_namelists(self, test_name):
shutil.copy2(item, baseline_dir)
# Always mark as passed unless we hit exception
- self._update_test_status(test_name, NAMELIST_PHASE, TEST_PASSED_STATUS)
return True
###########################################################################
- def _build_test(self, test_name):
+ def _build_phase(self, test_name):
###########################################################################
- print "Building test case", test_name
-
case_id = self._get_case_id(test_name)
test_dir = self._get_test_dir(test_name)
return self._run_phase_command(test_name, "./%s.test_build" % case_id, BUILD_PHASE, from_dir=test_dir)
###########################################################################
- def _run_test(self, test_name):
+ def _run_phase(self, test_name):
###########################################################################
- print "Running test case", test_name
-
case_id = self._get_case_id(test_name)
test_dir = self._get_test_dir(test_name)
if (self._no_batch):
- return self._run_phase_command(test_name, "./%s.test" % case_id, RUN_PHASE, from_dir=test_dir, pass_status=TEST_PASSED_STATUS)
+ return self._run_phase_command(test_name, "./%s.test" % case_id, RUN_PHASE, from_dir=test_dir)
else:
- return self._run_phase_command(test_name, "./%s.submit" % case_id, RUN_PHASE, from_dir=test_dir, pass_status=TEST_PENDING_STATUS)
+ return self._run_phase_command(test_name, "./%s.submit" % case_id, RUN_PHASE, from_dir=test_dir)
###########################################################################
- def _test_status_phase(self, test_name):
+ def _update_test_status_file(self, test_name):
###########################################################################
+ # TODO: The run scripts heavily use the TestStatus file. So we write out
+ # the phases we have taken care of and then let the run scrips go from there
+ # Eventually, it would be nice to have TestStatus management encapsulated
+ # into a single place.
+
str_to_write = ""
- made_it_to_phase = PHASES.index(self._get_test_phase(test_name))
- for phase in PHASES[0:made_it_to_phase+1]:
+ made_it_to_phase = self._phases.index(self._get_test_phase(test_name))
+ for phase in self._phases[0:made_it_to_phase+1]:
str_to_write += "%s %s %s\n" % (self._get_test_status(test_name, phase), test_name, phase)
- if (not self._no_run and not self._is_broken(test_name)):
+ if (not self._no_run and not self._is_broken(test_name) and made_it_to_phase == BUILD_PHASE):
# Ensure PEND state always gets added to TestStatus file if we are
# about to run test
str_to_write += "%s %s %s\n" % (TEST_PENDING_STATUS, test_name, RUN_PHASE)
@@ -325,19 +362,117 @@ def _test_status_phase(self, test_name):
self._log_output(test_name, "VERY BAD! Could not make TestStatus file '%s': '%s'" % (test_status_file, str(e)))
###########################################################################
- def _run_catch_exceptions(self, phase, run):
+ def _run_catch_exceptions(self, test_name, phase, run):
###########################################################################
- for test_name in self._test_names:
+ try:
+ return run(test_name)
+ except Exception as e:
+ exc_tb = sys.exc_info()[2]
+ errput = "Test '%s' failed in phase '%s' with exception '%s'" % (test_name, phase, str(e))
+ self._log_output(test_name, errput)
+ warning("Caught exception: %s" % str(e))
+ traceback.print_tb(exc_tb)
+ return False
+
+ ###########################################################################
+ def _get_procs_needed(self, test_name, phase):
+ ###########################################################################
+ if (phase == RUN_PHASE and self._no_batch):
+ test_dir = self._get_test_dir(test_name)
+ out = run_cmd("./xmlquery TOTALPES", from_dir=test_dir)
+ return int(out.split()[-1])
+ else:
+ return 1
+
+ ###########################################################################
+ def _handle_test_status_file(self, test_name, test_phase, success):
+ ###########################################################################
+ #
+ # This complexity is due to sharing of TestStatus responsibilities
+ #
+
+ if (test_phase != RUN_PHASE and
+ (not success or test_phase == BUILD_PHASE or test_phase == self._phases[-1])):
+ self._update_test_status_file(test_name)
+
+ # If we failed VERY early on in the run phase, it's possible that
+ # the CIME scripts never got a chance to set the state.
+ elif (test_phase == RUN_PHASE and not success):
+ test_status_file = os.path.join(self._get_test_dir(test_name), TEST_STATUS_FILENAME)
+
try:
- if (not self._is_broken(test_name)):
- run(test_name)
+ statuses = wait_for_tests.parse_test_status_file(test_status_file)[0]
+ if (RUN_PHASE not in statuses):
+ self._update_test_status_file(test_name)
+ elif (statuses[RUN_PHASE] in [TEST_PASS_STATUS, TEST_PENDING_STATUS]):
+ self._log_output(test_name,
+ "VERY BAD! How was infrastructure able to log a TestState but not change it to FAIL?")
+
except Exception as e:
- exc_tb = sys.exc_info()[2]
- errput = "Test '%s' failed in phase '%s' with exception '%s'" % (test_name, phase, str(e))
- self._log_output(test_name, errput)
- warning("Caught exception: %s" % str(e))
- traceback.print_tb(exc_tb)
- self._update_test_status(test_name, phase, TEST_FAIL_STATUS)
+ self._log_output(test_name, "VERY BAD! Could not read TestStatus file '%s': '%s'" % (test_status_file, str(e)))
+
+ ###########################################################################
+ def _consumer(self):
+ ###########################################################################
+ while (True):
+ found_work = False
+ with self._mutex:
+ if (not self._available_work.empty()):
+ test_name, test_phase, phase_method, procs_needed = self._available_work.get()
+ found_work = True
+
+ if (found_work):
+ before_time = time.time()
+ success = self._run_catch_exceptions(test_name, test_phase, phase_method)
+ elapsed_time = time.time() - before_time
+ status = (TEST_PENDING_STATUS if test_phase == RUN_PHASE and not self._no_batch else TEST_PASS_STATUS) if success else TEST_FAIL_STATUS
+
+ with self._mutex:
+ self._update_test_status(test_name, test_phase, status)
+ self._proc_pool += procs_needed
+ self._handle_test_status_file(test_name, test_phase, success)
+
+ sys.stdout.write("Finished %s for test %s in %f seconds (%s)\n" % (test_phase, test_name, elapsed_time, status))
+
+ else:
+ # Check if this thread is still needed
+ with self._mutex:
+ num_tests_that_still_have_work = 0
+ for test_name in self._test_names:
+ if (self._work_remains(test_name)):
+ num_tests_that_still_have_work += 1
+
+ if (num_tests_that_still_have_work < self._parallel_jobs):
+ self._parallel_jobs -= 1
+ break
+
+ time.sleep(5)
+
+ sys.stdout.write("CONSUMER THREAD EXITING\n")
+
+ ###########################################################################
+ def _producer(self):
+ ###########################################################################
+ work_to_do = True
+ while (work_to_do):
+ work_to_do = False
+ with self._mutex:
+ for test_name in self._test_names:
+ test_phase, test_status = self._get_test_data(test_name)
+ if (self._work_remains(test_name)):
+ work_to_do = True
+ if (test_status != TEST_PENDING_STATUS):
+ next_phase = self._phases[self._phases.index(test_phase) + 1]
+ procs_needed = self._get_procs_needed(test_name, next_phase)
+ if (procs_needed <= self._proc_pool):
+ self._proc_pool -= procs_needed
+ sys.stdout.write("Starting %s for test %s with %d procs\n" % (next_phase, test_name, procs_needed)) # Necessary when multi threads
+ self._update_test_status(test_name, next_phase, TEST_PENDING_STATUS)
+ self._available_work.put( (test_name, next_phase, getattr(self, "_%s_phase" % next_phase.lower()), procs_needed ) )
+
+ time.sleep(1)
+
+ sys.stdout.write("MAIN THREAD EXITING\n")
###########################################################################
def _setup_cs_files(self):
@@ -374,76 +509,42 @@ def create_test(self):
"""
Main API for this class.
"""
+ start_time = time.time()
+
# Tell user what will be run
print "RUNNING TESTS:"
for test_name in self._test_names:
print " ", test_name
- # TODO - do in parallel
# TODO - unit tests where possible
# TODO - documentation
- # Create cases
- self._run_catch_exceptions(CREATE_NEWCASE_PHASE, self._create_newcase)
-
- # Set up XML
- self._run_catch_exceptions(XML_PHASE, self._set_up_xml)
-
- # Setup
- self._run_catch_exceptions(SETUP_PHASE, self._setup_test)
-
- # Namelists
- self._run_catch_exceptions(NAMELIST_PHASE, self._process_namelists)
-
- # Build
- if (not self._no_build):
- self._run_catch_exceptions(BUILD_PHASE, self._build_test)
-
- # TODO: The run scripts heavily use the TestStatus file. So we write out
- # the phases we have taken care of and then let the run scrips go from there
- # Eventually, it would be nice to have TestStatus management encapsulated
- # into a single place.
+ for _ in xrange(self._parallel_jobs):
+ t = threading.Thread(target=self._consumer)
+ t.start()
- # Make status files
- for test_name in self._test_names:
- self._test_status_phase(test_name)
+ self._producer()
- # Run
- if (not self._no_run):
- self._run_catch_exceptions(RUN_PHASE, self._run_test)
+ while (threading.active_count() > 1):
+ time.sleep(1)
# Setup cs files
self._setup_cs_files()
- # If we failed VERY early on in the run phase, it's possible that
- # the CIME scripts never got a chance to set the state.
- if (not self._no_run):
- for test_name in self._test_names:
- if (self._get_test_phase(test_name) == RUN_PHASE and
- self._is_broken(test_name)):
- try:
- test_status_file = os.path.join(self._get_test_dir(test_name), TEST_STATUS_FILENAME)
- statuses = wait_for_tests.parse_test_status_file(test_status_file)[0]
- if (RUN_PHASE not in statuses):
- self._test_status_phase(test_name)
- elif (statuses[RUN_PHASE] in [TEST_PASSED_STATUS, TEST_PENDING_STATUS]):
- self._log_output(test_name,
- "VERY BAD! How was infrastructure able to log a TestState but not change it to FAIL?")
- except Exception as e:
- self._log_output(test_name, "VERY BAD! Could not read TestStatus file '%s': '%s'" % (test_status_file, str(e)))
-
# Return True if all tests passed
print "At create_test close, state is:"
rv = True
for idx, test_name in enumerate(self._test_names):
phase, status = self._test_states[idx]
- if (status not in [TEST_PASSED_STATUS, TEST_PENDING_STATUS]):
+ if (status not in [TEST_PASS_STATUS, TEST_PENDING_STATUS]):
print "%s %s (phase %s)" % (status, test_name, phase)
rv = False
elif (test_name in self._tests_with_nl_problems):
print "%s %s (but otherwise OK)" % (NAMELIST_FAIL_STATUS, test_name)
rv = False
else:
- print status, test_name
+ print status, test_name, phase
+
+ print "create_test took", time.time() - start_time, "seconds"
return rv
diff --git a/cime/scripts-acme/tests/scripts_regression_tests b/cime/scripts-acme/tests/scripts_regression_tests
index 7c5869c98754..724364876070 100755
--- a/cime/scripts-acme/tests/scripts_regression_tests
+++ b/cime/scripts-acme/tests/scripts_regression_tests
@@ -368,6 +368,9 @@ class TestJenkinsGenericJob(unittest.TestCase):
self._thread_error = None
self._unset_proxy = setup_proxy()
self._baseline_name = "fake_testing_only_%s" % acme_util.get_utc_timestamp()
+ self._fake_test_id = "fakest_testing_only_%s" % acme_util.get_utc_timestamp()
+ self._testroot = acme_util.get_machine_info()[4]
+ self._jenkins_root = os.path.join(self._testroot, "jenkins")
# wait_for_tests could blow away our dart config, need to back it up
if (os.path.exists(DART_CONFIG)):
@@ -387,6 +390,11 @@ class TestJenkinsGenericJob(unittest.TestCase):
if (os.path.isdir(baselines)):
shutil.rmtree(baselines)
+ for root in [self._testroot, self._jenkins_root]:
+ for test_id in [self._baseline_name, self._fake_test_id]:
+ for leftover in glob.glob(os.path.join(root, "%s*" % test_id)):
+ shutil.rmtree(leftover)
+
if (os.path.exists(DART_BACKUP)):
shutil.move(DART_BACKUP, DART_CONFIG)
@@ -410,23 +418,43 @@ class TestJenkinsGenericJob(unittest.TestCase):
###########################################################################
def assert_no_sentinel(self):
###########################################################################
- testroot = acme_util.get_machine_info("CESMSCRATCHROOT")
- self.assertFalse(os.path.isfile(os.path.join(testroot, "ONGOING_TEST")),
+ self.assertFalse(os.path.isfile(os.path.join(self._testroot, "ONGOING_TEST")),
"job did not cleanup successfully")
+ ###########################################################################
+ def assert_num_leftovers(self, test_id=self._baseline_name):
+ ###########################################################################
+ # There should only be two directories matching the test_id in both
+ # the testroot (bld/run dump area) and jenkins root
+ num_tests_in_tiny = len(update_acme_tests.get_test_suite("acme_tiny"))
+
+ self.assertEqual(num_tests_in_tiny, len(glob.glob("%s/%s*" % (self._jenkins_root, test_id))),
+ msg="Wrong number of leftover directories in %s" % self._jenkins_root)
+
+ self.assertEqual(num_tests_in_tiny, len(glob.glob("%s/%s*" % (self._testroot, self._test_id))),
+ msg="Wrong number of leftover directories in %s" % self._test_root)
+
###########################################################################
def test_jenkins_generic_job(self):
###########################################################################
# Unfortunately, this test is very long-running
+ # Make some fake results to simulate a run of a different branch
+ for idx in xrange(len(update_acme_tests.get_test_suite("acme_tiny"))):
+ os.makedirs(os.path.join(self._testroot, self._fake_test_id, "_%d" % idx))
+ os.makedirs(os.path.join(self._jenkins_root, self._fake_test_id, "_%d" % idx))
+
# Generate fresh baselines so that this test is not impacted by
# unresolved diffs
self.simple_test(True, "-g -b %s" % self._baseline_name)
+ self.assert_num_leftovers()
build_name = "jenkins_generic_job_pass_%s" % acme_util.get_utc_timestamp()
self.simple_test(True, "-p ACME_test -b %s -d -c %s --cdash-build-group=Nightly" % (self._baseline_name, build_name))
+ self.assert_num_leftovers()
self.assert_no_sentinel()
+ self.assert_num_leftovers(test_id=self._fake_test_id)
assert_dashboard_has_build(self, build_name)
###########################################################################
diff --git a/cime/scripts-acme/wait_for_tests.py b/cime/scripts-acme/wait_for_tests.py
index 5c8eab93b52f..9650bbc43014 100644
--- a/cime/scripts-acme/wait_for_tests.py
+++ b/cime/scripts-acme/wait_for_tests.py
@@ -8,7 +8,7 @@
TEST_STATUS_FILENAME = "TestStatus"
TEST_PENDING_STATUS = "PEND"
-TEST_PASSED_STATUS = "PASS"
+TEST_PASS_STATUS = "PASS"
TEST_FAIL_STATUS = "FAIL"
TEST_DIFF_STATUS = "DIFF"
NAMELIST_FAIL_STATUS = "NLFAIL"
@@ -150,7 +150,7 @@ def create_cdash_xml(results, cdash_build_name, cdash_project, cdash_build_group
for test_name in sorted(results):
test_path, test_status = results[test_name]
- test_passed = test_status == TEST_PASSED_STATUS
+ test_passed = test_status == TEST_PASS_STATUS
test_norm_path = test_path if os.path.isdir(test_path) else os.path.dirname(test_path)
full_test_elem = xmlet.SubElement(testing_elem, "Test")
@@ -213,21 +213,21 @@ def reduce_stati(stati, check_throughput=False, check_memory=False, ignore_namel
is given to unfinished stati since we don't want to stop waiting for a test
that hasn't finished. Namelist diffs are given the lowest precedence.
"""
- rv = TEST_PASSED_STATUS
+ rv = TEST_PASS_STATUS
for phase, status in stati.iteritems():
if (status == TEST_PENDING_STATUS):
return status
- elif (status != TEST_PASSED_STATUS):
+ elif (status != TEST_PASS_STATUS):
if ( (not check_throughput and THROUGHPUT_TEST_STR in phase) or
(not check_memory and MEMORY_TEST_STR in phase) or
(ignore_namelists and phase == NAMELIST_PHASE) ):
continue
- if (status == NAMELIST_FAIL_STATUS and rv == TEST_PASSED_STATUS):
+ if (status == NAMELIST_FAIL_STATUS and rv == TEST_PASS_STATUS):
rv = NAMELIST_FAIL_STATUS
- elif (rv in [NAMELIST_FAIL_STATUS, TEST_PASSED_STATUS] and phase == HIST_COMPARE_PHASE):
+ elif (rv in [NAMELIST_FAIL_STATUS, TEST_PASS_STATUS] and phase == HIST_COMPARE_PHASE):
rv = TEST_DIFF_STATUS
else:
@@ -382,7 +382,7 @@ def wait_for_tests(test_paths,
test_path, test_status = test_data
print "Test '%s' finished with status '%s'" % (test_name, test_status)
verbose_print(" Path: %s" % test_path)
- all_pass &= test_status == TEST_PASSED_STATUS
+ all_pass &= test_status == TEST_PASS_STATUS
if (cdash_build_name):
create_cdash_xml(test_results, cdash_build_name, cdash_project, cdash_build_group)