Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add way to escape 'Another set in progress'. #901

Open
wants to merge 14 commits into
base: master
Choose a base branch
from
1 change: 1 addition & 0 deletions ophyd/areadetector/filestore_mixins.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ class MyDetector(PerkinElmerDetector, SingleTrigger): # for example

det = MyDetector(...)
"""

import logging
import os
import uuid
Expand Down
33 changes: 31 additions & 2 deletions ophyd/signal.py
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,7 @@ class Signal(OphydObject):
rtolerance : any, optional
The relative tolerance associated with the value
"""

SUB_VALUE = "value"
SUB_META = "meta"
_default_sub = SUB_VALUE
Expand Down Expand Up @@ -124,6 +125,9 @@ def __init__(
self._destroyed = False

self._set_thread = None
self._poison_pill = None
self._set_thread_finalizer = None

self._tolerance = tolerance
# self.tolerance is a property
self.rtolerance = rtolerance
Expand Down Expand Up @@ -326,9 +330,11 @@ def set(self, value, *, timeout=None, settle_time=None, **kwargs):
kwargs,
)

poison_pill = threading.Event()

def set_thread():
try:
self._set_and_wait(value, timeout, **kwargs)
self._set_and_wait(value, timeout, poison_pill=poison_pill, **kwargs)
except TimeoutError:
success = False
self.log.warning(
Expand Down Expand Up @@ -372,21 +378,44 @@ def set_thread():
th = self._set_thread
# these two must be in this order to avoid a race condition
self._set_thread = None
self._set_thread_finalizer = None
self._poison_pill = None
st._finished(success=success)
del th

if self._set_thread is not None:
raise RuntimeError(
"Another set() call is still in progress " f"for {self.name}"
f"Another set() call is still in progress for {self.name}. "
"If this is due to some transient failure, verify that the "
"device is configured the way you expect, and use clear_set() "
"to ignore and abandon the previous set() operation."
)

st = Status(self)
self._status = st
self._set_thread = self.cl.thread_class(target=set_thread)
self._set_thread.daemon = True
self._set_thread.start()
self._poison_pill = poison_pill
# If we get gc-ed, stop the thread. This helps ensure that the process
# exits cleanly without dangling threads.
self._set_thread_finalizer = weakref.finalize(self, poison_pill.set)
return self._status

def clear_set(self):
"""
Escape 'Another set in progress'.
"""
if self._poison_pill is None:
# Nothing to do
return
self._poison_pill.set() # Break the polling loop in set_and_wait.
self._set_thread.join() # Wait for that to take effect.
warnings.warn(
"A previous set() operation is being ignored. Only do this "
"when debugging or recovering from a hardware failure."
)

@property
def value(self):
"""The signal's value"""
Expand Down
1 change: 1 addition & 0 deletions ophyd/tests/config.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
"""PV names for tests"""

import os

epics_base = os.environ.get("EPICS_BASE", "/usr/lib/epics")
Expand Down
92 changes: 92 additions & 0 deletions ophyd/tests/test_signal.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
)
from ophyd.status import wait
from ophyd.utils import AlarmSeverity, AlarmStatus, ReadOnlyError
from ophyd.utils.epics_pvs import AbandonedSet, _set_and_wait

logger = logging.getLogger(__name__)

Expand Down Expand Up @@ -695,3 +696,94 @@ def test_import_ro_signal_class():
from ophyd.signal import SignalRO as SignalRoFromModule

assert SignalRoFromPkg is SignalRoFromModule


def test_signal_poison_pill():
sig = Signal(name="sig", value=1)
sig.wait_for_connection()

assert sig._poison_pill is None

st = sig.set(28, settle_time=0.1) # give it a brief time to finish
# poison_pill = threading.Event() when SIGNAL.set() starts
assert sig._poison_pill is not None
assert isinstance(sig._poison_pill, threading.Event)
wait(st)
# poison_pill = None when SIGNAL.set() finalizes
assert sig._poison_pill is None


def test_signal_set_thread_finalizer():
sig = Signal(name="sig", value=1)
sig.wait_for_connection()

assert sig._set_thread_finalizer is None

st = sig.set(28, settle_time=0.1) # give it a brief time to finish
assert not st.done
assert sig._set_thread_finalizer is not None
wait(st)
# set_thread_finalizer = None when SIGNAL.set() finalizes
assert sig._set_thread_finalizer is None


def test_signal_another_call_to_set_in_progress():
sig = Signal(name="sig", value=1)
sig.wait_for_connection()

st1 = sig.set(28, settle_time=0.2)
# trap the RuntimeError when "Another set() call " ...
with pytest.raises(RuntimeError):
# TODO: verify the message text startswith?
assert not st1.done
st2 = sig.set(-1)

assert not st1.done
assert not st1.success

wait(st1)
assert st1.done
assert st1.success

with pytest.raises(NameError):
assert st2 is not None


def test_signal_clear_set():
class HackSignal(Signal):
def put(self, value, **kwargs):
...

def _super_put(self, value, **kwargs):
super().put(value, **kwargs)

sig = HackSignal(name="sig", value=1)
sig._super_put(1)

st1 = sig.set(28)
with pytest.raises(RuntimeError):
sig.set(-1)

# call SIGNAL.clear_set() and trap the warning after RuntimeError is raised
with pytest.warns(UserWarning):
sig.clear_set()
sig._super_put(28)
wait(st1)
assert sig.get() == 28


def test_epicssignal_abandonedset():
pill = threading.Event()

class BrokenPutSignal(Signal):
"""put(value) that ignores input"""

def put(self, value, **kwargs):
super().put(self._readback, **kwargs)
pill.set()

sig = BrokenPutSignal(name="sig", value=1)

with pytest.raises(AbandonedSet):
prjemian marked this conversation as resolved.
Show resolved Hide resolved
_set_and_wait(sig, sig.get() + 1, timeout=20, poison_pill=pill)
assert sig.get() == 1
38 changes: 34 additions & 4 deletions ophyd/utils/epics_pvs.py
Original file line number Diff line number Diff line change
Expand Up @@ -207,8 +207,20 @@ def wrapper(self, *args, **kwargs):
return wrapper


class AbandonedSet(OpException):
...


def _set_and_wait(
signal, val, poll_time=0.01, timeout=10, rtol=None, atol=None, **kwargs
signal,
val,
poll_time=0.01,
timeout=10,
rtol=None,
atol=None,
*,
poison_pill=None,
**kwargs,
):
"""Set a signal to a value and wait until it reads correctly.

Expand All @@ -228,6 +240,8 @@ def _set_and_wait(
allowed relative tolerance between the readback and setpoint values
atol : float, optional
allowed absolute tolerance between the readback and setpoint values
poison_pill : threading.Event
When set, give up and raise AbandonedSet.
kwargs :
additional keyword arguments will be passed directly into the
underlying "signal.put" call.
Expand All @@ -238,11 +252,19 @@ def _set_and_wait(
"""
signal.put(val, **kwargs)
_wait_for_value(
signal, val, poll_time=poll_time, timeout=timeout, rtol=rtol, atol=atol
signal,
val,
poll_time=poll_time,
timeout=timeout,
rtol=rtol,
atol=atol,
poison_pill=poison_pill,
)


def _wait_for_value(signal, val, poll_time=0.01, timeout=10, rtol=None, atol=None):
def _wait_for_value(
signal, val, poll_time=0.01, timeout=10, rtol=None, atol=None, *, poison_pill
):
"""Wait for a signal to match a value.

For floating point values, it is strongly recommended to set a tolerance.
Expand Down Expand Up @@ -305,7 +327,15 @@ def _wait_for_value(signal, val, poll_time=0.01, timeout=10, rtol=None, atol=Non
val,
within_str,
)
ttime.sleep(poll_time)
# Sleep.
if poison_pill is None:
ttime.sleep(poll_time)
elif poison_pill.wait(poll_time):
# This set operation has been abandoned.
raise AbandonedSet(
f"The signal {signal} was set to {val} but it does not seem "
"to have finished. We are no longer watching for it."
)
if poll_time < 0.1:
poll_time *= 2 # logarithmic back-off
current_value = signal.get(**get_kwargs)
Expand Down
Loading