diff --git a/ophyd/areadetector/filestore_mixins.py b/ophyd/areadetector/filestore_mixins.py index 602dbb201..082a055df 100644 --- a/ophyd/areadetector/filestore_mixins.py +++ b/ophyd/areadetector/filestore_mixins.py @@ -19,6 +19,7 @@ class MyDetector(PerkinElmerDetector, SingleTrigger): # for example det = MyDetector(...) """ + import logging import os import uuid diff --git a/ophyd/signal.py b/ophyd/signal.py index f01180dbd..376c38048 100644 --- a/ophyd/signal.py +++ b/ophyd/signal.py @@ -85,6 +85,7 @@ class Signal(OphydObject): rtolerance : any, optional The relative tolerance associated with the value """ + SUB_VALUE = "value" SUB_META = "meta" _default_sub = SUB_VALUE @@ -124,6 +125,9 @@ def __init__( self._destroyed = False self._set_thread = None + self._poison_pill = None + self._set_thread_finalizer = None + self._tolerance = tolerance # self.tolerance is a property self.rtolerance = rtolerance @@ -326,9 +330,11 @@ def set(self, value, *, timeout=None, settle_time=None, **kwargs): kwargs, ) + poison_pill = threading.Event() + def set_thread(): try: - self._set_and_wait(value, timeout, **kwargs) + self._set_and_wait(value, timeout, poison_pill=poison_pill, **kwargs) except TimeoutError: success = False self.log.warning( @@ -372,12 +378,17 @@ def set_thread(): th = self._set_thread # these two must be in this order to avoid a race condition self._set_thread = None + self._set_thread_finalizer = None + self._poison_pill = None st._finished(success=success) del th if self._set_thread is not None: raise RuntimeError( - "Another set() call is still in progress " f"for {self.name}" + f"Another set() call is still in progress for {self.name}. " + "If this is due to some transient failure, verify that the " + "device is configured the way you expect, and use clear_set() " + "to ignore and abandon the previous set() operation." ) st = Status(self) @@ -385,8 +396,26 @@ def set_thread(): self._set_thread = self.cl.thread_class(target=set_thread) self._set_thread.daemon = True self._set_thread.start() + self._poison_pill = poison_pill + # If we get gc-ed, stop the thread. This helps ensure that the process + # exits cleanly without dangling threads. + self._set_thread_finalizer = weakref.finalize(self, poison_pill.set) return self._status + def clear_set(self): + """ + Escape 'Another set in progress'. + """ + if self._poison_pill is None: + # Nothing to do + return + self._poison_pill.set() # Break the polling loop in set_and_wait. + self._set_thread.join() # Wait for that to take effect. + warnings.warn( + "A previous set() operation is being ignored. Only do this " + "when debugging or recovering from a hardware failure." + ) + @property def value(self): """The signal's value""" diff --git a/ophyd/tests/config.py b/ophyd/tests/config.py index ad6cc15c9..107577cc9 100644 --- a/ophyd/tests/config.py +++ b/ophyd/tests/config.py @@ -1,4 +1,5 @@ """PV names for tests""" + import os epics_base = os.environ.get("EPICS_BASE", "/usr/lib/epics") diff --git a/ophyd/tests/test_signal.py b/ophyd/tests/test_signal.py index f56e9d4cf..451931d9a 100644 --- a/ophyd/tests/test_signal.py +++ b/ophyd/tests/test_signal.py @@ -18,6 +18,7 @@ ) from ophyd.status import wait from ophyd.utils import AlarmSeverity, AlarmStatus, ReadOnlyError +from ophyd.utils.epics_pvs import AbandonedSet, _set_and_wait logger = logging.getLogger(__name__) @@ -695,3 +696,94 @@ def test_import_ro_signal_class(): from ophyd.signal import SignalRO as SignalRoFromModule assert SignalRoFromPkg is SignalRoFromModule + + +def test_signal_poison_pill(): + sig = Signal(name="sig", value=1) + sig.wait_for_connection() + + assert sig._poison_pill is None + + st = sig.set(28, settle_time=0.1) # give it a brief time to finish + # poison_pill = threading.Event() when SIGNAL.set() starts + assert sig._poison_pill is not None + assert isinstance(sig._poison_pill, threading.Event) + wait(st) + # poison_pill = None when SIGNAL.set() finalizes + assert sig._poison_pill is None + + +def test_signal_set_thread_finalizer(): + sig = Signal(name="sig", value=1) + sig.wait_for_connection() + + assert sig._set_thread_finalizer is None + + st = sig.set(28, settle_time=0.1) # give it a brief time to finish + assert not st.done + assert sig._set_thread_finalizer is not None + wait(st) + # set_thread_finalizer = None when SIGNAL.set() finalizes + assert sig._set_thread_finalizer is None + + +def test_signal_another_call_to_set_in_progress(): + sig = Signal(name="sig", value=1) + sig.wait_for_connection() + + st1 = sig.set(28, settle_time=0.2) + # trap the RuntimeError when "Another set() call " ... + with pytest.raises(RuntimeError): + # TODO: verify the message text startswith? + assert not st1.done + st2 = sig.set(-1) + + assert not st1.done + assert not st1.success + + wait(st1) + assert st1.done + assert st1.success + + with pytest.raises(NameError): + assert st2 is not None + + +def test_signal_clear_set(): + class HackSignal(Signal): + def put(self, value, **kwargs): + ... + + def _super_put(self, value, **kwargs): + super().put(value, **kwargs) + + sig = HackSignal(name="sig", value=1) + sig._super_put(1) + + st1 = sig.set(28) + with pytest.raises(RuntimeError): + sig.set(-1) + + # call SIGNAL.clear_set() and trap the warning after RuntimeError is raised + with pytest.warns(UserWarning): + sig.clear_set() + sig._super_put(28) + wait(st1) + assert sig.get() == 28 + + +def test_epicssignal_abandonedset(): + pill = threading.Event() + + class BrokenPutSignal(Signal): + """put(value) that ignores input""" + + def put(self, value, **kwargs): + super().put(self._readback, **kwargs) + pill.set() + + sig = BrokenPutSignal(name="sig", value=1) + + with pytest.raises(AbandonedSet): + _set_and_wait(sig, sig.get() + 1, timeout=20, poison_pill=pill) + assert sig.get() == 1 diff --git a/ophyd/utils/epics_pvs.py b/ophyd/utils/epics_pvs.py index ce3733e0c..eaf9b0b4f 100644 --- a/ophyd/utils/epics_pvs.py +++ b/ophyd/utils/epics_pvs.py @@ -207,8 +207,20 @@ def wrapper(self, *args, **kwargs): return wrapper +class AbandonedSet(OpException): + ... + + def _set_and_wait( - signal, val, poll_time=0.01, timeout=10, rtol=None, atol=None, **kwargs + signal, + val, + poll_time=0.01, + timeout=10, + rtol=None, + atol=None, + *, + poison_pill=None, + **kwargs, ): """Set a signal to a value and wait until it reads correctly. @@ -228,6 +240,8 @@ def _set_and_wait( allowed relative tolerance between the readback and setpoint values atol : float, optional allowed absolute tolerance between the readback and setpoint values + poison_pill : threading.Event + When set, give up and raise AbandonedSet. kwargs : additional keyword arguments will be passed directly into the underlying "signal.put" call. @@ -238,11 +252,19 @@ def _set_and_wait( """ signal.put(val, **kwargs) _wait_for_value( - signal, val, poll_time=poll_time, timeout=timeout, rtol=rtol, atol=atol + signal, + val, + poll_time=poll_time, + timeout=timeout, + rtol=rtol, + atol=atol, + poison_pill=poison_pill, ) -def _wait_for_value(signal, val, poll_time=0.01, timeout=10, rtol=None, atol=None): +def _wait_for_value( + signal, val, poll_time=0.01, timeout=10, rtol=None, atol=None, *, poison_pill +): """Wait for a signal to match a value. For floating point values, it is strongly recommended to set a tolerance. @@ -305,7 +327,15 @@ def _wait_for_value(signal, val, poll_time=0.01, timeout=10, rtol=None, atol=Non val, within_str, ) - ttime.sleep(poll_time) + # Sleep. + if poison_pill is None: + ttime.sleep(poll_time) + elif poison_pill.wait(poll_time): + # This set operation has been abandoned. + raise AbandonedSet( + f"The signal {signal} was set to {val} but it does not seem " + "to have finished. We are no longer watching for it." + ) if poll_time < 0.1: poll_time *= 2 # logarithmic back-off current_value = signal.get(**get_kwargs)