Skip to content

Commit

Permalink
Merge pull request #1275 from ESMCI/jgfouca/titan_restart
Browse files Browse the repository at this point in the history
First try to support titan restart

Test suite: scripts_regression_tests
Test baseline:
Test namelist changes:
Test status: bit for bit

Fixes #1233

User interface changes?: Support for restarting when node fails (if config_machines.xml has the right variables set)

Code review: JimE, Marianna
  • Loading branch information
jgfouca authored Apr 4, 2017
2 parents fbf1b0c + 788e1fc commit eeb1e2c
Show file tree
Hide file tree
Showing 11 changed files with 221 additions and 19 deletions.
2 changes: 2 additions & 0 deletions config/acme/machines/config_machines.xml
Original file line number Diff line number Diff line change
Expand Up @@ -1382,6 +1382,7 @@
<machine MACH="titan">
<DESC>ORNL XK6, os is CNL, 16 pes/node, batch system is PBS</DESC>
<NODENAME_REGEX>titan</NODENAME_REGEX>
<NODE_FAIL_REGEX>Received node event ec_node</NODE_FAIL_REGEX>
<TESTS>acme_developer</TESTS>
<COMPILERS>pgi,pgiacc,intel,cray</COMPILERS>
<MPILIBS>mpich,mpi-serial</MPILIBS>
Expand All @@ -1402,6 +1403,7 @@
<SUPPORTED_BY>acme</SUPPORTED_BY>
<GMAKE_J>8</GMAKE_J>
<MAX_TASKS_PER_NODE>16</MAX_TASKS_PER_NODE>
<PCT_SPARE_NODES>10</PCT_SPARE_NODES>
<PROJECT_REQUIRED>TRUE</PROJECT_REQUIRED>
<PROJECT>cli115</PROJECT>
<PIO_CONFIG_OPTS> -D PIO_BUILD_TIMING:BOOL=ON </PIO_CONFIG_OPTS>
Expand Down
15 changes: 15 additions & 0 deletions config/config_tests.xml
Original file line number Diff line number Diff line change
Expand Up @@ -361,6 +361,21 @@ LII CLM initial condition interpolation test
<DOUT_S>FALSE</DOUT_S>
</test>

<test NAME="NODEFAIL">
<DESC>For testing infra only. Tests restart upon detected node failure</DESC>
<INFO_DBUG>1</INFO_DBUG>
<STOP_OPTION>ndays</STOP_OPTION>
<STOP_N>11</STOP_N>
<REST_N>$STOP_N / 2 + 1</REST_N>
<REST_OPTION>$STOP_OPTION</REST_OPTION>
<HIST_N>$STOP_N</HIST_N>
<HIST_OPTION>$STOP_OPTION</HIST_OPTION>
<CONTINUE_RUN>FALSE</CONTINUE_RUN>
<CHECK_TIMING>FALSE</CHECK_TIMING>
<NODE_FAIL_REGEX>JGF FAKE NODE FAIL</NODE_FAIL_REGEX>
<PCT_SPARE_NODES>300</PCT_SPARE_NODES>
</test>

<test NAME="ICP">
<DESC>cice performance test</DESC>
<INFO_DBUG>1</INFO_DBUG>
Expand Down
68 changes: 68 additions & 0 deletions scripts/lib/CIME/SystemTests/nodefail.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,68 @@
"""
CIME restart upon failed node test.
"""
from CIME.XML.standard_module_setup import *
from CIME.SystemTests.ers import ERS
from CIME.utils import get_model

logger = logging.getLogger(__name__)

class NODEFAIL(ERS):

def __init__(self, case):
"""
initialize an object interface to the ERS system test
"""
ERS.__init__(self, case)

self._fail_sentinel = os.path.join(case.get_value("RUNDIR"), "FAIL_SENTINEL")
self._fail_str = case.get_value("NODE_FAIL_REGEX")

def _restart_fake_phase(self):
# Swap out model.exe for one that emits node failures
rundir = self._case.get_value("RUNDIR")
exeroot = self._case.get_value("EXEROOT")
fake_exe = \
"""#!/bin/bash
fail_sentinel=%s
cpl_log=%s/cpl.log.$LID
model_log=%s/%s.log.$LID
touch $cpl_log
touch $fail_sentinel
declare -i num_fails=$(cat $fail_sentinel | wc -l)
declare -i times_to_fail=${NODEFAIL_NUM_FAILS:-3}
if ((num_fails < times_to_fail)); then
echo FAKE FAIL >> $cpl_log
echo FAIL >> $fail_sentinel
echo '%s' >> $model_log
sleep 1
exit -1
else
echo Insta pass
echo SUCCESSFUL TERMINATION > $cpl_log
fi
""" % (self._fail_sentinel, rundir, rundir, get_model(), self._fail_str)

fake_exe_file = os.path.join(exeroot, "fake.sh")
with open(fake_exe_file, "w") as fd:
fd.write(fake_exe)

os.chmod(fake_exe_file, 0755)

prev_run_exe = self._case.get_value("run_exe")
env_mach_specific = self._case.get_env("mach_specific")
env_mach_specific.set_value("run_exe", fake_exe_file)
self._case.flush(flushall=True)

self.run_indv(suffix=None)

env_mach_specific = self._case.get_env("mach_specific")
env_mach_specific.set_value("run_exe", prev_run_exe)
self._case.flush(flushall=True)

def run_phase(self):
self._ers_first_phase()
self._restart_fake_phase()
self._ers_second_phase()
5 changes: 4 additions & 1 deletion scripts/lib/CIME/XML/env_mach_pes.py
Original file line number Diff line number Diff line change
Expand Up @@ -70,4 +70,7 @@ def get_tasks_per_node(self, total_tasks, max_thread_count):
def get_total_nodes(self, total_tasks, max_thread_count):
tasks_per_node = self.get_tasks_per_node(total_tasks, max_thread_count)
num_nodes = int(math.ceil(float(total_tasks) / tasks_per_node))
return num_nodes
return num_nodes + self.get_spare_nodes(num_nodes)

def get_spare_nodes(self, num_nodes):
return int(math.ceil(float(num_nodes) * (self.get_value("PCT_SPARE_NODES") / 100.0)))
3 changes: 1 addition & 2 deletions scripts/lib/CIME/case.py
Original file line number Diff line number Diff line change
Expand Up @@ -96,7 +96,6 @@ def __init__(self, case_root=None, read_only=True):
self._component_classes = []
self._is_env_loaded = False


self.thread_count = None
self.total_tasks = None
self.tasks_per_node = None
Expand All @@ -107,7 +106,6 @@ def __init__(self, case_root=None, read_only=True):
if self.get_value("CASEROOT") is not None:
self._initialize_derived_attributes()


def check_if_comp_var(self, vid):
vid = vid
comp = None
Expand Down Expand Up @@ -149,6 +147,7 @@ def _initialize_derived_attributes(self):
executable = env_mach_spec.get_mpirun(self, mpi_attribs, job="case.run", exe_only=True)[0]
if executable == "aprun":
self.num_nodes = get_aprun_cmd_for_case(self, "acme.exe")[1]
self.num_nodes = env_mach_pes.get_spare_nodes(self.num_nodes)
else:
self.num_nodes = env_mach_pes.get_total_nodes(self.total_tasks, self.thread_count)

Expand Down
61 changes: 52 additions & 9 deletions scripts/lib/CIME/case_run.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
from CIME.get_timing import get_timing
from CIME.provenance import save_prerun_provenance, save_postrun_provenance
from CIME.preview_namelists import create_namelists
from CIME.case_st_archive import case_st_archive, restore_from_archive

import shutil, time, sys, os, glob

Expand Down Expand Up @@ -80,9 +81,13 @@ def _run_model_impl(case, lid):

pre_run_check(case, lid)

model = case.get_value("MODEL")

# Set OMP_NUM_THREADS
env_mach_pes = case.get_env("mach_pes")
os.environ["OMP_NUM_THREADS"] = str(env_mach_pes.get_max_thread_count(case.get_values("COMP_CLASSES")))
comp_classes = case.get_values("COMP_CLASSES")
thread_count = env_mach_pes.get_max_thread_count(comp_classes)
os.environ["OMP_NUM_THREADS"] = str(thread_count)

# Run the model
logger.info("%s MODEL EXECUTION BEGINS HERE" %(time.strftime("%Y-%m-%d %H:%M:%S")))
Expand All @@ -92,11 +97,50 @@ def _run_model_impl(case, lid):
logger.info("run command is %s " %cmd)

rundir = case.get_value("RUNDIR")
run_cmd_no_fail(cmd, from_dir=rundir)
loop = True

total_tasks = env_mach_pes.get_total_tasks(comp_classes)
num_spare_nodes = env_mach_pes.get_spare_nodes(total_tasks, thread_count)

while loop:
loop = False
stat = run_cmd(cmd, from_dir=rundir)[0]
# Determine if failure was due to a failed node, if so, try to restart
if stat != 0:
node_fail_re = case.get_value("NODE_FAIL_REGEX")
if node_fail_re:
node_fail_regex = re.compile(node_fail_re)
model_logfile = os.path.join(rundir, model + ".log." + lid)
if os.path.exists(model_logfile):
num_fails = len(node_fail_regex.findall(open(model_logfile, 'r').read()))
if num_fails > 0 and num_spare_nodes >= num_fails:
# We failed due to node failure!
logger.warning("Detected model run failed due to node failure, restarting")

# Archive the last consistent set of restart files and restore them
case_st_archive(case, no_resubmit=True)
restore_from_archive(case)

orig_cont = case.get_value("CONTINUE_RUN")
if not orig_cont:
case.set_value("CONTINUE_RUN", True)
create_namelists(case)

lid = new_lid()
loop = True

num_spare_nodes -= num_fails

if not loop:
# We failed and we're not restarting
expect(False, "Command '%s' failed" % cmd)

logger.info("%s MODEL EXECUTION HAS FINISHED" %(time.strftime("%Y-%m-%d %H:%M:%S")))

post_run_check(case, lid)

return lid

###############################################################################
def run_model(case, lid):
###############################################################################
Expand All @@ -117,7 +161,7 @@ def post_run_check(case, lid):
if not os.path.isfile(model_logfile):
expect(False, "Model did not complete, no %s log file " % model_logfile)
elif not os.path.isfile(cpl_logfile):
expect(False, "Model did not complete, no cpl log file")
expect(False, "Model did not complete, no cpl log file '%s'" % cpl_logfile)
elif os.stat(model_logfile).st_size == 0:
expect(False, "Run FAILED")
else:
Expand Down Expand Up @@ -222,21 +266,20 @@ def case_run(case):
lid = new_lid()

if prerun_script:
print "DEBUG: i am here",prerun_script
do_external(prerun_script, case.get_value("CASEROOT"), case.get_value("RUNDIR"),
do_external(prerun_script, case.get_value("CASEROOT"), case.get_value("RUNDIR"),
lid, prefix="prerun")

run_model(case, lid)
lid = run_model(case, lid)
save_logs(case, lid) # Copy log files back to caseroot
if case.get_value("CHECK_TIMING") or case.get_value("SAVE_TIMING"):
get_timing(case, lid) # Run the getTiming script

if data_assimilation:
do_data_assimilation(data_assimilation_script, case.get_value("CASEROOT"), cycle, lid,
case.get_value("RUNDIR"))
do_data_assimilation(data_assimilation_script, case.get_value("CASEROOT"), cycle, lid,
case.get_value("RUNDIR"))

if postrun_script:
do_external(postrun_script, case.get_value("CASEROOT"), case.get_value("RUNDIR"),
do_external(postrun_script, case.get_value("CASEROOT"), case.get_value("RUNDIR"),
lid, prefix="postrun")

save_postrun_provenance(case)
Expand Down
24 changes: 20 additions & 4 deletions scripts/lib/CIME/case_st_archive.py
Original file line number Diff line number Diff line change
Expand Up @@ -198,11 +198,10 @@ def get_histfiles_for_restarts(case, archive, archive_entry, restfile):
histfile = matchobj.group(1).strip()
histfile = os.path.basename(histfile)
# append histfile to the list ONLY if it exists in rundir before the archiving
if os.path.isfile(os.path.join(rundir,histfile)):
if os.path.isfile(os.path.join(rundir,histfile)):
histfiles.append(histfile)
return histfiles


###############################################################################
def _archive_restarts(case, archive, archive_entry,
compclass, compname, datename, datename_is_last):
Expand Down Expand Up @@ -353,9 +352,26 @@ def _archive_process(case, archive):
_archive_history_files(case, archive, archive_entry,
compclass, compname, histfiles_savein_rundir)

###############################################################################
def restore_from_archive(case):
###############################################################################
"""
Take most recent archived restart files and load them into current case.
"""
dout_sr = case.get_value("DOUT_S_ROOT")
rundir = case.get_value("RUNDIR")
most_recent_rest = run_cmd_no_fail("ls -1dt %s/rest/* | head -1" % dout_sr)

for item in glob.glob("%s/*" % most_recent_rest):
base = os.path.basename(item)
dst = os.path.join(rundir, base)
if os.path.exists(dst):
os.remove(dst)

shutil.copy(item, rundir)

###############################################################################
def case_st_archive(case):
def case_st_archive(case, no_resubmit=False):
###############################################################################
"""
Create archive object and perform short term archiving
Expand Down Expand Up @@ -387,7 +403,7 @@ def case_st_archive(case):

# resubmit case if appropriate
resubmit = case.get_value("RESUBMIT")
if resubmit > 0:
if resubmit > 0 and not no_resubmit:
logger.info("resubmitting from st_archive, resubmit=%d"%resubmit)
if case.get_value("MACH") == "mira":
expect(os.path.isfile(".original_host"), "ERROR alcf host file not found")
Expand Down
2 changes: 1 addition & 1 deletion scripts/lib/CIME/test_scheduler.py
Original file line number Diff line number Diff line change
Expand Up @@ -830,7 +830,7 @@ def run_tests(self, wait=False):
ts_status = ts.get_overall_test_status(ignore_namelists=True, check_memory=False, check_throughput=False)

if ts_status not in [TEST_PASS_STATUS, TEST_PEND_STATUS]:
logger.info( "%s %s (phase %s)" % (status, test, phase))
logger.info( "%s %s (phase %s)" % (ts_status, test, phase))
rv = False
elif nlfail:
logger.info( "%s %s (but otherwise OK) %s" % (NAMELIST_FAIL_STATUS, test, phase))
Expand Down
40 changes: 40 additions & 0 deletions scripts/tests/scripts_regression_tests.py
Original file line number Diff line number Diff line change
Expand Up @@ -910,6 +910,46 @@ def test_jenkins_generic_job_kill(self):
self.assertTrue(self._thread_error is None, msg="Thread had failure: %s" % self._thread_error)
assert_dashboard_has_build(self, build_name)

###############################################################################
class T_TestRunRestart(TestCreateTestCommon):
###############################################################################

###########################################################################
def test_run_restart(self):
###########################################################################
run_cmd_assert_result(self, "%s/create_test --test-root %s --output-root %s -t %s NODEFAIL_P1.f45_g37.X"
% (SCRIPT_DIR, TEST_ROOT, TEST_ROOT, self._baseline_name))
if self._hasbatch:
run_cmd_assert_result(self, "%s/wait_for_tests *%s/TestStatus" % (TOOLS_DIR, self._baseline_name),
from_dir=self._testroot)

casedir = os.path.join(self._testroot,
"%s.%s" % (CIME.utils.get_full_test_name("NODEFAIL_P1.f45_g37.X", machine=self._machine, compiler=self._compiler), self._baseline_name))

fail_sentinel = os.path.join(casedir, "run", "FAIL_SENTINEL")
self.assertTrue(os.path.exists(fail_sentinel), msg="Missing %s" % fail_sentinel)

self.assertEqual(open(fail_sentinel, "r").read().count("FAIL"), 3)

###########################################################################
def test_run_restart_too_many_fails(self):
###########################################################################
os.environ["NODEFAIL_NUM_FAILS"] = "5"
run_cmd_assert_result(self, "%s/create_test --test-root %s --output-root %s -t %s NODEFAIL_P1.f45_g37.X"
% (SCRIPT_DIR, TEST_ROOT, TEST_ROOT, self._baseline_name),
expected_stat=(0 if self._hasbatch else CIME.utils.TESTS_FAILED_ERR_CODE))
if self._hasbatch:
run_cmd_assert_result(self, "%s/wait_for_tests *%s/TestStatus" % (TOOLS_DIR, self._baseline_name),
from_dir=self._testroot, expected_stat=CIME.utils.TESTS_FAILED_ERR_CODE)

casedir = os.path.join(self._testroot,
"%s.%s" % (CIME.utils.get_full_test_name("NODEFAIL_P1.f45_g37.X", machine=self._machine, compiler=self._compiler), self._baseline_name))

fail_sentinel = os.path.join(casedir, "run", "FAIL_SENTINEL")
self.assertTrue(os.path.exists(fail_sentinel), msg="Missing %s" % fail_sentinel)

self.assertEqual(open(fail_sentinel, "r").read().count("FAIL"), 4)

###############################################################################
class Q_TestBlessTestResults(TestCreateTestCommon):
###############################################################################
Expand Down
18 changes: 18 additions & 0 deletions src/drivers/mct/cime_config/config_component.xml
Original file line number Diff line number Diff line change
Expand Up @@ -1821,6 +1821,16 @@
</desc>
</entry>

<entry id="NODE_FAIL_REGEX">
<type>char</type>
<group>run_din</group>
<file>env_run.xml</file>
<desc>
A regular expression to search for an indication that a run failure was caused by a node failure
and should therefore be re-attempted.
</desc>
</entry>

<entry id="PROXY">
<type>char</type>
<default_value>UNSET</default_value>
Expand Down Expand Up @@ -1896,6 +1906,14 @@
<!-- definitions pelayout -->
<!-- ===================================================================== -->

<entry id="PCT_SPARE_NODES">
<type>integer</type>
<default_value>0</default_value>
<group>mach_pes</group>
<file>env_mach_pes.xml</file>
<desc>Percent of extra spare nodes to allocate</desc>
</entry>

<entry id="NTASKS">
<type>integer</type>
<values>
Expand Down
2 changes: 0 additions & 2 deletions src/share/util/shr_orb_mod.F90
Original file line number Diff line number Diff line change
Expand Up @@ -78,8 +78,6 @@ real(SHR_KIND_R8) pure FUNCTION shr_orb_cosz(jday,lat,lon,declin,dt_avg)
shr_orb_cosz = shr_orb_avg_cosz(jday, lat, lon, declin, dt_avg)
else
shr_orb_cosz = sin(lat)*sin(declin) - &
! & cos(lat)*cos(declin)*cos(jday*2.0_SHR_KIND_R8*pi + lon)
! RLJ old version above. Restore more accurate version below in separate PR
cos(lat)*cos(declin) * &
cos((jday-floor(jday))*2.0_SHR_KIND_R8*pi + lon)
end if
Expand Down

0 comments on commit eeb1e2c

Please sign in to comment.