Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Unreliable simulations wrapper #484

Open
wants to merge 5 commits into
base: dev
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.rst
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
Changelog
=========

- Add tools to handle simulation errors and limit simulation time
- Use kernel copy to avoid pickle issue and allow BOLFI parallelisation with non-default kernel
- Restrict matplotlib version < 3.9 for compatibility with GPy
- Add option to use additive or multiplicative adjustment in any acquisition method
Expand Down
3 changes: 3 additions & 0 deletions docs/api.rst
Original file line number Diff line number Diff line change
Expand Up @@ -126,6 +126,7 @@ Other
.. autosummary::
elfi.tools.vectorize
elfi.tools.external_operation
elfi.tools.unreliable_operation



Expand Down Expand Up @@ -338,3 +339,5 @@ Other
.. automethod:: elfi.tools.vectorize

.. automethod:: elfi.tools.external_operation

.. automethod:: elfi.tools.unreliable_operation
114 changes: 113 additions & 1 deletion elfi/model/tools.py
Original file line number Diff line number Diff line change
@@ -1,13 +1,17 @@
"""This module contains tools for ELFI graphs."""

__all__ = ['vectorize', 'external_operation', 'unreliable_operation']

import logging
import signal
import subprocess
from functools import partial

import numpy as np

from elfi.utils import get_sub_seed, is_array

__all__ = ['vectorize', 'external_operation']
logger = logging.getLogger(__name__)


def run_vectorized(operation, *inputs, constants=None, dtype=None, batch_size=None, **kwargs):
Expand Down Expand Up @@ -284,3 +288,111 @@ def external_operation(command,
prepare_inputs=prepare_inputs,
stdout=stdout,
subprocess_kwargs=subprocess_kwargs)


def run_with_recovery(operation, known_errors, *inputs, error_output=None, **kwargs):
"""Run the operation with error recovery.

Helper that returns a predetermined output when an accepted error occurs in the operation.
This tool is still experimental and may not work in all cases.

Parameters
----------
operation : callable
Operation to be executed.
known_errors : Exception or tuple
Accepted errors.
inputs
Positional arguments for the operation.
error_output : any, optional
Output to return when an accepted error occurs. Defaults to None.
kwargs
Keyword arguments for the operation.

Returns
-------
output : any
Operation output or error_output if operation failed with an accepted error.

"""
try:
output = operation(*inputs, **kwargs)
except known_errors as e:
logger.warning("Exception occurred: {}".format(e))
batch_size = kwargs.get('batch_size', None)
output = np.array([error_output] * batch_size) if batch_size else error_output
return output


def run_with_time_limit(operation, time_limit, *inputs, error_output=None, **kwargs):
"""Run the operation with time limit.

Helper that terminates the operation at time limit and returns a predetermined output.
This tool is still experimental and may not work in all cases.

Parameters
----------
operation : callable
Operation to be executed.
time_limit : int
Operation time limit in seconds.
inputs
Positional arguments for the operation.
error_output : any, optional
Output to return when the operation exceeds time limit. Defaults to None.
kwargs
Keyword arguments for the operation.

Returns
-------
output : any
Operation output or error_output if operation exceeded time limit.

"""
def timeout_handler(signum, frame):
raise TimeoutError

try:
signal.signal(signal.SIGALRM, timeout_handler)
signal.alarm(time_limit)
output = operation(*inputs, **kwargs)
except TimeoutError:
logger.warning("Operation exceeded time limit.")
batch_size = kwargs.get('batch_size', None)
output = np.array([error_output] * batch_size) if batch_size else error_output
finally:
signal.alarm(0) # cancel the alarm
return output


def unreliable_operation(operation,
known_errors=None,
time_limit=None,
error_output=None):
"""Wrap an operation to run with timeout and recovery options.

This tool is still experimental and may not work in all cases.

Parameters
----------
operation : callable
Operation to be executed.
known_errors : Exception or tuple
Accepted errors. Defaults to None.
time_limit : int, optional
Operation time limit in seconds. Defaults to None.
error_output : any, optional
Output to return when an accepted error occurs or the operation exceeds time limit.
Defaults to None.

Returns
-------
operation : callable
ELFI compatible operation that can be used e.g. as a simulator.

"""
if time_limit is not None:
operation = partial(run_with_time_limit, operation, time_limit, error_output=error_output)
if known_errors is not None:
operation = partial(run_with_recovery, operation, known_errors, error_output=error_output)
return operation
32 changes: 32 additions & 0 deletions tests/unit/test_tools.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
import pickle
import time

import numpy as np
import pytest
Expand Down Expand Up @@ -86,6 +87,37 @@ def test_vectorized_and_external_combined():
assert len(np.unique(g[:, 3]) == 1)


def test_unreliable_operation():
def simulator(param, error=None, sleep=0, random_state=None):
if error is not None:
raise error
time.sleep(sleep)
return param * np.linspace(0, 1, 5)

errors = RuntimeError
sim = elfi.tools.unreliable_operation(simulator, known_errors=errors)
assert(np.all(sim(2, error=None) == simulator(2)))
assert(sim(2, error=RuntimeError("Example runtime error.")) == None)

sim = elfi.tools.unreliable_operation(simulator, known_errors=errors, error_output=np.zeros(5))
assert(np.all(sim(2, error=RuntimeError("Example runtime error.")) == np.zeros(5)))

errors = (RuntimeError, ArithmeticError)
sim = elfi.tools.unreliable_operation(simulator, known_errors=errors)
assert(sim(2, error=RuntimeError("Example runtime error.")) == None)
assert(sim(2, error=ZeroDivisionError("Example arithmetic error.")) == None)

errors = Exception
sim = elfi.tools.unreliable_operation(simulator, known_errors=errors)
assert(sim(2, error=RuntimeError("Example runtime error.")) == None)
with pytest.raises(KeyboardInterrupt):
sim(2, error=KeyboardInterrupt)

sim = elfi.tools.unreliable_operation(simulator, time_limit=1)
assert(np.all(sim(2, sleep=0) == simulator(2)))
assert(sim(2, sleep=2) == None)


def test_progress_bar(ma2):
thresholds = [.5, .2]
N = 1000
Expand Down
Loading