Skip to content

Commit

Permalink
Add infrastructure to parse scheduler output for CalcJobs
Browse files Browse the repository at this point in the history
Add a new method `Scheduler.parse_output` that takes three arguments:
`detailed_job_info`, `stdout` and `stderr`, which are the dictionary
returned by `Scheduler.get_detailed_job_info` and the content of
scheduler stdout and stderr files from the repository, respectively.

A scheduler plugin can implement this method to parse the content of
these data sources to detect standard scheduler problems such as node
failures and out of memory errors. If such an error is detected, the
method can return an `ExitCode` that should be defined on the
calculation job class. The `CalcJob` base class already defines certain
exit codes for common errors, such as an out of memory error.

If the detailed job info, stdout and stderr from the scheduler output
are available after the job has been retrieved, and the scheduler plugin
that is used has implemented `parse_output`, it will be called by the
`CalcJob.parse` method. If an exit code is returned, it is set on the
corresponding node and a warning is logged. Subsequently, the normal
output parser is called, if any was defined in the inputs, which can
then of course check the node for the presence of an exit code. It then
has the opportunity to parse the retrieved output files, if any, to try
and determine a more specific error code, if applicable. Returning an
exit code from the output parser will override the exit code set by the
scheduler parser. This is why that exit code is also logged as a warning
so that the information is not completely lost.

This choice does change the old behavior when an output parser would
return `None` which would be interpreted as `ExitCode(0)`. However, now
if the scheduler parser returned an exit code, it will not be overridden
by the `None` of the output parser, which is then essentially ignored.
This is necessary, because otherwise, basic parsers that don't return
anything even if an error might have occurred will always just override
the scheduler exit code, which is not desirable.
  • Loading branch information
sphuber committed Aug 27, 2020
1 parent 477fe30 commit e72a4a3
Show file tree
Hide file tree
Showing 8 changed files with 337 additions and 80 deletions.
2 changes: 1 addition & 1 deletion aiida/backends/testimplbase.py
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,7 @@ def create_computer(self):
label='localhost',
hostname='localhost',
transport_type='local',
scheduler_type='pbspro',
scheduler_type='direct',
workdir='/tmp/aiida',
backend=self.backend
).store()
Expand Down
66 changes: 0 additions & 66 deletions aiida/engine/daemon/execmanager.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@
import os

from aiida.common import AIIDA_LOGGER, exceptions
from aiida.common.datastructures import CalcJobState
from aiida.common.folders import SandboxFolder
from aiida.common.links import LinkType
from aiida.orm import FolderData, Node
Expand Down Expand Up @@ -429,71 +428,6 @@ def kill_calculation(calculation, transport):
return True


def parse_results(process, retrieved_temporary_folder=None):
"""
Parse the results for a given CalcJobNode (job)
:returns: integer exit code, where 0 indicates success and non-zero failure
"""
from aiida.engine import ExitCode

assert process.node.get_state() == CalcJobState.PARSING, \
'job should be in the PARSING state when calling this function yet it is {}'.format(process.node.get_state())

parser_class = process.node.get_parser_class()
exit_code = ExitCode()
logger_extra = get_dblogger_extra(process.node)

if retrieved_temporary_folder:
files = []
for root, directories, filenames in os.walk(retrieved_temporary_folder):
for directory in directories:
files.append('- [D] {}'.format(os.path.join(root, directory)))
for filename in filenames:
files.append('- [F] {}'.format(os.path.join(root, filename)))

execlogger.debug(
'[parsing of calc {}] '
'Content of the retrieved_temporary_folder: \n'
'{}'.format(process.node.pk, '\n'.join(files)),
extra=logger_extra
)
else:
execlogger.debug(
'[parsing of calc {}] '
'No retrieved_temporary_folder.'.format(process.node.pk), extra=logger_extra
)

if parser_class is not None:

parser = parser_class(process.node)
parse_kwargs = parser.get_outputs_for_parsing()

if retrieved_temporary_folder:
parse_kwargs['retrieved_temporary_folder'] = retrieved_temporary_folder

exit_code = parser.parse(**parse_kwargs)

if exit_code is None:
exit_code = ExitCode(0)

if not isinstance(exit_code, ExitCode):
raise ValueError('parse should return an `ExitCode` or None, and not {}'.format(type(exit_code)))

if exit_code.status:
parser.logger.error('parser returned exit code<{}>: {}'.format(exit_code.status, exit_code.message))

for link_label, node in parser.outputs.items():
try:
process.out(link_label, node)
except ValueError as exception:
parser.logger.error('invalid value {} specified with label {}: {}'.format(node, link_label, exception))
exit_code = process.exit_codes.ERROR_INVALID_OUTPUT
break

return exit_code


def _retrieve_singlefiles(job, transport, folder, retrieve_file_list, logger_extra=None):
"""Retrieve files specified through the singlefile list mechanism."""
singlefile_list = []
Expand Down
117 changes: 109 additions & 8 deletions aiida/engine/processes/calcjobs/calcjob.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
from aiida.common.lang import override, classproperty
from aiida.common.links import LinkType

from ..exit_code import ExitCode
from ..process import Process, ProcessState
from ..process_spec import CalcJobProcessSpec
from .tasks import Waiting, UPLOAD_COMMAND
Expand Down Expand Up @@ -192,6 +193,14 @@ def define(cls, spec: CalcJobProcessSpec):
help='Files that are retrieved by the daemon will be stored in this node. By default the stdout and stderr '
'of the scheduler will be added, but one can add more by specifying them in `CalcInfo.retrieve_list`.')

# Errors caused or returned by the scheduler
spec.exit_code(100, 'ERROR_NO_RETRIEVED_FOLDER',
message='The process did not have the required `retrieved` output.')
spec.exit_code(110, 'ERROR_SCHEDULER_OUT_OF_MEMORY',
message='The job ran out of memory.')
spec.exit_code(120, 'ERROR_SCHEDULER_OUT_OF_WALLTIME',
message='The job ran out of walltime.')

@classproperty
def spec_options(cls): # pylint: disable=no-self-argument
"""Return the metadata options port namespace of the process specification of this process.
Expand Down Expand Up @@ -281,22 +290,114 @@ def parse(self, retrieved_temporary_folder=None):
This is called once it's finished waiting for the calculation to be finished and the data has been retrieved.
"""
import shutil
from aiida.engine.daemon import execmanager

try:
exit_code = execmanager.parse_results(self, retrieved_temporary_folder)
retrieved = self.node.outputs.retrieved
except exceptions.NotExistent:
return self.exit_codes.ERROR_NO_RETRIEVED_FOLDER # pylint: disable=no-member

# Call the scheduler output parser
exit_code_scheduler = self.parse_scheduler_output(retrieved)

if exit_code_scheduler is not None and exit_code_scheduler.status > 0:
# If an exit code is returned by the scheduler output parser, we log it and set it on the node. This will
# allow the actual `Parser` implementation, if defined in the inputs, to inspect it and decide to keep it,
# or override it with a more specific exit code, if applicable.
args = (exit_code_scheduler.status, exit_code_scheduler.message)
self.logger.warning('scheduler parser returned exit code<{}>: {}'.format(*args))
self.node.set_exit_status(exit_code_scheduler.status)
self.node.set_exit_message(exit_code_scheduler.message)

# Call the retrieved output parser
try:
exit_code_retrieved = self.parse_retrieved_output(retrieved_temporary_folder)
finally:
# Delete the temporary folder
try:
shutil.rmtree(retrieved_temporary_folder)
except OSError as exception:
if exception.errno != 2:
raise
shutil.rmtree(retrieved_temporary_folder, ignore_errors=True)

if exit_code_retrieved is not None and exit_code_retrieved.status > 0:
args = (exit_code_retrieved.status, exit_code_retrieved.message)
self.logger.warning('output parser returned exit code<{}>: {}'.format(*args))

# The final exit code is that of the scheduler, unless the output parser returned one
if exit_code_retrieved is not None:
exit_code = exit_code_retrieved
else:
exit_code = exit_code_scheduler

# Finally link up the outputs and we're done
for entry in self.node.get_outgoing():
self.out(entry.link_label, entry.node)

return exit_code or ExitCode(0)

def parse_scheduler_output(self, retrieved):
"""Parse the output of the scheduler if that functionality has been implemented for the plugin."""
scheduler = self.node.computer.get_scheduler()
filename_stderr = self.node.get_option('scheduler_stderr')
filename_stdout = self.node.get_option('scheduler_stdout')

detailed_job_info = self.node.get_detailed_job_info()
if detailed_job_info is None:
self.logger.warning('could not parse scheduler output: the `detailed_job_info` attribute is missing')

try:
scheduler_stderr = retrieved.get_object_content(filename_stderr)
except FileNotFoundError:
scheduler_stderr = None
self.logger.warning('could not parse scheduler output: the `{}` file is missing'.format(filename_stderr))

try:
scheduler_stdout = retrieved.get_object_content(filename_stdout)
except FileNotFoundError:
scheduler_stdout = None
self.logger.warning('could not parse scheduler output: the `{}` file is missing'.format(filename_stdout))

# Only attempt to call the scheduler parser if all three resources of information are available
if any(entry is None for entry in [detailed_job_info, scheduler_stderr, scheduler_stdout]):
return

try:
exit_code = scheduler.parse_output(detailed_job_info, scheduler_stdout, scheduler_stderr)
except exceptions.FeatureNotAvailable as exception:
self.logger.warning('could not parse scheduler output: {}'.format(exception))
return
except Exception as exception: # pylint: disable=broad-except
self.logger.warning('the `parse_output` method of the scheduler excepted: {}'.format(exception))
return

if exit_code is not None and not isinstance(exit_code, ExitCode):
args = (scheduler.__class__.__name__, type(exit_code))
raise ValueError('`{}.parse_output` returned neither an `ExitCode` nor None, but: {}'.format(*args))

return exit_code

def parse_retrieved_output(self, retrieved_temporary_folder=None):
"""Parse the retrieved data by calling the parser plugin if it was defined in the inputs."""
parser_class = self.node.get_parser_class()

if parser_class is None:
return

parser = parser_class(self.node)
parse_kwargs = parser.get_outputs_for_parsing()

if retrieved_temporary_folder:
parse_kwargs['retrieved_temporary_folder'] = retrieved_temporary_folder

exit_code = parser.parse(**parse_kwargs)

for link_label, node in parser.outputs.items():
try:
self.out(link_label, node)
except ValueError as exception:
self.logger.error('invalid value {} specified with label {}: {}'.format(node, link_label, exception))
exit_code = self.exit_codes.ERROR_INVALID_OUTPUT # pylint: disable=no-member
break

if exit_code is not None and not isinstance(exit_code, ExitCode):
args = (parser_class.__name__, type(exit_code))
raise ValueError('`{}.parse` returned neither an `ExitCode` nor None, but: {}'.format(*args))

return exit_code

def presubmit(self, folder):
Expand Down
12 changes: 12 additions & 0 deletions aiida/schedulers/scheduler.py
Original file line number Diff line number Diff line change
Expand Up @@ -414,3 +414,15 @@ def _parse_kill_output(self, retval, stdout, stderr):
:return: True if everything seems ok, False otherwise.
"""

def parse_output(self, detailed_job_info, stdout, stderr):
"""Parse the output of the scheduler.
:param detailed_job_info: dictionary with the output returned by the `Scheduler.get_detailed_job_info` command.
This should contain the keys `retval`, `stdout` and `stderr` corresponding to the return value, stdout and
stderr returned by the accounting command executed for a specific job id.
:param stdout: string with the output written by the scheduler to stdout
:param stderr: string with the output written by the scheduler to stderr
:return: None or an instance of `aiida.engine.processes.exit_code.ExitCode`
"""
raise exceptions.FeatureNotAvailable('output parsing is not available for `{}`'.format(self.__class__.__name__))
2 changes: 2 additions & 0 deletions docs/source/howto/plugin_codes.rst
Original file line number Diff line number Diff line change
Expand Up @@ -251,6 +251,8 @@ It checks:

AiiDA stores the exit code returned by the |parse| method on the calculation node that is being parsed, from where it can then be inspected further down the line.
The Topics section on :ref:`defining processes <topics:processes:usage:defining>` provides more details on exit codes.
Note that scheduler plugins can also implement parsing of the output generated by the job scheduler and in the case of problems can set an exit code.
The Topics section on :ref:`scheduler exit codes <topics:calculations:usage:calcjobs:scheduler-errors>` explains how they can be inspected inside an output parser and how they can optionally be overridden.


.. todo::
Expand Down
47 changes: 47 additions & 0 deletions docs/source/topics/calculations/usage.rst
Original file line number Diff line number Diff line change
Expand Up @@ -518,3 +518,50 @@ However, we can give you some guidelines:
If you were to store all this data in the database, it would become unnecessarily bloated, because the chances you would have to query for this data are unlikely.
Instead these array type data nodes store the bulk of their content in the repository.
This way you still keep the data and therewith the provenance of your calculations, while keeping your database lean and fast!
.. _topics:calculations:usage:calcjobs:scheduler-errors:
Scheduler errors
----------------
Besides the output parsers, the scheduler plugins can also provide parsing of the output generated by the job scheduler, by implementing the :meth:`~aiida.schedulers.scheduler.Scheduler.parse_output` method.
If the scheduler plugin has implemented this method, the output generated by the scheduler, written to the stdout and stderr file descriptors as well as the output of the detailed job info command, is parsed.
If the parser detects a known problem, such as an out-of-memory (OOM) error, the corresponding exit code will already be set on the calculation job node.
The output parser, if defined in the inputs, can inspect the exit status on the node and decide to keep it or override it with a different, potentially more useful, exit code.
.. code:: python
class SomeParser(Parser):
def parse(self, **kwargs):
"""Parse the contents of the output files retrieved in the `FolderData`."""
if self.node.exit_status is not None:
# If an exit status is already set on the node, that means the
# scheduler plugin detected a problem.
return
Note that in the example given above, the parser returns immediately if it detects that the scheduler detected a problem.
Since it returns `None`, the exit code of the scheduler will be kept and will be the final exit code of the calculation job.
However, the parser does not have to immediately return.
It can still try to parse some of the retrieved output, if there is any.
If it finds a more specific problem than the generic scheduler error, it can always return an exit code of itself to override it.
The parser can even return ``ExitCode(0)`` to have the calculation marked as successfully finished, despite the scheduler having determined that there was a problem.
The following table summarizes the possible scenarios of the scheduler parser and output parser returning an exit code and what the final resulting exit code will be that is set on the node:
+------------------------------------------------------------------------------------+-----------------------+-----------------------+-----------------------+
| **Scenario** | **Scheduler result** | **Retrieved result** | **Final result** |
+====================================================================================+=======================+=======================+=======================+
| Neither parser found any problem. | ``None`` | ``None`` | ``ExitCode(0)`` |
+------------------------------------------------------------------------------------+-----------------------+-----------------------+-----------------------+
| Scheduler parser found an issue, | ``ExitCode(100)`` | ``None`` | ``ExitCode(100)`` |
| but output parser does not override. | | | |
+------------------------------------------------------------------------------------+-----------------------+-----------------------+-----------------------+
| Only output parser found a problem. | ``None`` | ``ExitCode(400)`` | ``ExitCode(400)`` |
+------------------------------------------------------------------------------------+-----------------------+-----------------------+-----------------------+
| Scheduler parser found an issue, but the output parser overrides with a more | ``ExitCode(100)`` | ``ExitCode(400)`` | ``ExitCode(400)`` |
| specific error code. | | | |
+------------------------------------------------------------------------------------+-----------------------+-----------------------+-----------------------+
| Scheduler found issue but output parser overrides saying that despite that the | ``ExitCode(100)`` | ``ExitCode(0)`` | ``ExitCode(0)`` |
| calculation should be considered finished successfully. | | | |
+------------------------------------------------------------------------------------+-----------------------+-----------------------+-----------------------+
8 changes: 5 additions & 3 deletions tests/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -77,21 +77,23 @@ def generate_calc_job():
to it, into which the raw input files will have been written.
"""

def _generate_calc_job(folder, entry_point_name, inputs=None):
def _generate_calc_job(folder, entry_point_name, inputs=None, return_process=False):
"""Fixture to generate a mock `CalcInfo` for testing calculation jobs."""
from aiida.engine.utils import instantiate_process
from aiida.manage.manager import get_manager
from aiida.plugins import CalculationFactory

inputs = inputs or {}
manager = get_manager()
runner = manager.get_runner()

process_class = CalculationFactory(entry_point_name)
process = instantiate_process(runner, process_class, **inputs)

calc_info = process.prepare_for_submission(folder)
if return_process:
return process

return calc_info
return process.prepare_for_submission(folder)

return _generate_calc_job

Expand Down
Loading

0 comments on commit e72a4a3

Please sign in to comment.