Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix the unhandled failure mechanism of the BaseRestartWorkChain #4155

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion aiida/engine/processes/workchains/restart.py
Original file line number Diff line number Diff line change
Expand Up @@ -240,7 +240,7 @@ def inspect_process(self): # pylint: disable=inconsistent-return-statements,too

# Here either the process finished successful or at least one handler returned a report so it can no longer be
# considered to be an unhandled failed process and therefore we reset the flag
self.ctx.unhandled_failure = True
self.ctx.unhandled_failure = False

# If at least one handler returned a report, the action depends on its exit code and that of the process itself
if last_report:
Expand Down
53 changes: 53 additions & 0 deletions tests/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -91,3 +91,56 @@ def _generate_calc_job(folder, entry_point_name, inputs=None):
return calc_info

return _generate_calc_job


@pytest.fixture
def generate_work_chain():
"""Generate an instance of a `WorkChain`."""

def _generate_work_chain(entry_point, inputs=None):
"""Generate an instance of a `WorkChain` with the given entry point and inputs.

:param entry_point: entry point name of the work chain subclass.
:param inputs: inputs to be passed to process construction.
:return: a `WorkChain` instance.
"""
from aiida.engine.utils import instantiate_process
from aiida.manage.manager import get_manager
from aiida.plugins import WorkflowFactory

inputs = inputs or {}
process_class = WorkflowFactory(entry_point) if isinstance(entry_point, str) else entry_point
runner = get_manager().get_runner()
process = instantiate_process(runner, process_class, **inputs)
Comment on lines +113 to +114
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sorry to be nuisance, but could I get some context on these couple of lines? I only read about the workchains and mostly the basic idea, but never really ran one. I tried going to the source in manager.py and utils.py but I feel like I need some extra general information on how the engine works to understand these functions/classes.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is how a Process instance is created. Normally this is done automatically by the engine when you call run or submit, but in this case I need more control. Because instead of the engine actually starting to run it, for these tests I just want the process instance and "run" parts of it myself.


return process

return _generate_work_chain


@pytest.fixture
def generate_calculation_node():
"""Generate an instance of a `CalculationNode`."""
from aiida.engine import ProcessState

def _generate_calculation_node(process_state=ProcessState.FINISHED, exit_status=None):
"""Generate an instance of a `CalculationNode`..

:param process_state: state to set
:param exit_status: optional exit status, will be set to `0` if `process_state` is `ProcessState.FINISHED`
:return: a `CalculationNode` instance.
"""
from aiida.orm import CalculationNode

if process_state is ProcessState.FINISHED and exit_status is None:
exit_status = 0

node = CalculationNode()
node.set_process_state(process_state)

if exit_status is not None:
node.set_exit_status(exit_status)

return node

return _generate_calculation_node
113 changes: 84 additions & 29 deletions tests/engine/processes/workchains/test_restart.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,44 +8,99 @@
# For further information please visit http://www.aiida.net #
###########################################################################
"""Tests for `aiida.engine.processes.workchains.restart` module."""
from aiida.backends.testbase import AiidaTestCase
from aiida.engine.processes.workchains.restart import BaseRestartWorkChain
from aiida.engine.processes.workchains.utils import process_handler
# pylint: disable=invalid-name
import pytest

from aiida.engine import CalcJob, BaseRestartWorkChain, process_handler, ProcessState, ProcessHandlerReport, ExitCode
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So, this is a more general question.

I understand that from a core-user perspective, having the important methods available through the higher level modules insted of the specific ones (I mean using from aiida.engine import process_handler instead of using the complete from aiida.engine.processes.workchains.utils import process_handler) is helpful because they don't need to know the internal organization to have access to these.

However, wouldn't still be better for us to internally keep using the most specific path possible when importing inside aiida-core? I mean, I feel that if we need to debug or stuff like that it would be easier to know the exact location of the functions and classes being imported, and although I know most GUI's have some sort of automatic seach engine for this, but perhaps sometimes you need to use some simplified interface or maybe you are just grepping and it helps to have the information there.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As you said, most proper IDE's can automatically jump to the source and the increased brevity is a big plus in this case. Exception, in my opinion, is when you are importing something from a submodule. In that case the from .module import something is better than the absolute form

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Additional point, it's important for us to use the "public" interface as much as possible in the tests - if only to make sure that we don't accidentally break it.

Copy link
Member

@ramirezfranciscof ramirezfranciscof Jun 9, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah, ok. Simultaneously checking in the tests that you are not breaking the public interface is a good point, I didn't think about that. I still think there is some advantages of knowing the exact location of the code you are using, and (at least for the non-test internals) it is not clear to me that the brevity in the "header" of the file (where there is not much logic to obfuscate) outweights this.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I really don't think using from aiida.orm.nodes.data.int import Int every time instead of from aiida.orm import Int does the understanding of the code any favors


class TestBaseRestartWorkChain(AiidaTestCase):
"""Tests for the `BaseRestartWorkChain` class."""

@staticmethod
def test_is_process_handler():
"""Test the `BaseRestartWorkChain.is_process_handler` class method."""
class SomeWorkChain(BaseRestartWorkChain):
"""Dummy class."""

class SomeWorkChain(BaseRestartWorkChain):
"""Dummy class."""
_process_class = CalcJob
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What is this? I don't remember needing to set a _process_class property inside workchains...is this new?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is just a concept of the BaseRestartWorkChain. Note that this is an implementation of a WorkChain but not a concrete one, i.e., you cannot run a BaseRestartWorkChain. It is merely a baseclass for a very common type of WorkChain: one that runs a CalcJob and deals with errors. So to run an actual base restart workchain, one should subclass it and define this _process_class which will be the CalcJob that the workchain will launch. Look at the BaseRestartWorkChain docstring for more information


@process_handler()
def handler_a(self, node):
pass
@process_handler()
def handler_a(self, node): # pylint: disable=inconsistent-return-statements,no-self-use
if node.exit_status == 400:
return ProcessHandlerReport()

def not_a_handler(self, node):
pass
def not_a_handler(self, node):
pass

assert SomeWorkChain.is_process_handler('handler_a')
assert not SomeWorkChain.is_process_handler('not_a_handler')
assert not SomeWorkChain.is_process_handler('unexisting_method')

@staticmethod
def test_get_process_handler():
"""Test the `BaseRestartWorkChain.get_process_handlers` class method."""
def test_is_process_handler():
"""Test the `BaseRestartWorkChain.is_process_handler` class method."""
assert SomeWorkChain.is_process_handler('handler_a')
assert not SomeWorkChain.is_process_handler('not_a_handler')
assert not SomeWorkChain.is_process_handler('unexisting_method')

class SomeWorkChain(BaseRestartWorkChain):
"""Dummy class."""

@process_handler
def handler_a(self, node):
pass
def test_get_process_handler():
"""Test the `BaseRestartWorkChain.get_process_handlers` class method."""
assert [handler.__name__ for handler in SomeWorkChain.get_process_handlers()] == ['handler_a']

def not_a_handler(self, node):
pass

assert [handler.__name__ for handler in SomeWorkChain.get_process_handlers()] == ['handler_a']
@pytest.mark.usefixtures('clear_database_before_test')
def test_excepted_process(generate_work_chain, generate_calculation_node):
"""Test that the workchain aborts if the sub process was excepted."""
process = generate_work_chain(SomeWorkChain, {})
process.setup()
process.ctx.children = [generate_calculation_node(ProcessState.EXCEPTED)]
assert process.inspect_process() == BaseRestartWorkChain.exit_codes.ERROR_SUB_PROCESS_EXCEPTED # pylint: disable=no-member


@pytest.mark.usefixtures('clear_database_before_test')
def test_killed_process(generate_work_chain, generate_calculation_node):
"""Test that the workchain aborts if the sub process was killed."""
process = generate_work_chain(SomeWorkChain, {})
process.setup()
process.ctx.children = [generate_calculation_node(ProcessState.KILLED)]
assert process.inspect_process() == BaseRestartWorkChain.exit_codes.ERROR_SUB_PROCESS_KILLED # pylint: disable=no-member


@pytest.mark.usefixtures('clear_database_before_test')
def test_unhandled_failure(generate_work_chain, generate_calculation_node):
"""Test the unhandled failure mechanism.

The workchain should be aborted if there are two consecutive failed sub processes that went unhandled. We simulate
it by setting `ctx.unhandled_failure` to True and append two failed process nodes in `ctx.children`.
"""
process = generate_work_chain(SomeWorkChain, {})
process.setup()
process.ctx.children = [generate_calculation_node(exit_status=100)]
assert process.inspect_process() is None
assert process.ctx.unhandled_failure is True

process.ctx.children.append(generate_calculation_node(exit_status=100))
assert process.inspect_process() == BaseRestartWorkChain.exit_codes.ERROR_SECOND_CONSECUTIVE_UNHANDLED_FAILURE # pylint: disable=no-member


@pytest.mark.usefixtures('clear_database_before_test')
def test_unhandled_reset_after_success(generate_work_chain, generate_calculation_node):
"""Test `ctx.unhandled_failure` is reset to `False` in `inspect_process` after a successful process."""
process = generate_work_chain(SomeWorkChain, {})
process.setup()
process.ctx.children = [generate_calculation_node(exit_status=100)]
assert process.inspect_process() is None
assert process.ctx.unhandled_failure is True

process.ctx.children.append(generate_calculation_node(exit_status=0))
assert process.inspect_process() is None
assert process.ctx.unhandled_failure is False


@pytest.mark.usefixtures('clear_database_before_test')
def test_unhandled_reset_after_handled(generate_work_chain, generate_calculation_node):
"""Test `ctx.unhandled_failure` is reset to `False` in `inspect_process` after a handled failed process."""
process = generate_work_chain(SomeWorkChain, {})
process.setup()
process.ctx.children = [generate_calculation_node(exit_status=0)]
assert process.inspect_process() is None
assert process.ctx.unhandled_failure is False

# Exit status 400 of the last calculation job will be handled and so should reset the flag
process.ctx.children.append(generate_calculation_node(exit_status=400))
result = process.inspect_process()
assert isinstance(result, ExitCode)
assert result.status == 0
assert process.ctx.unhandled_failure is False