Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

add shutter tests and make PSS shutter more flexible #857

Merged
merged 13 commits into from
May 18, 2023
2 changes: 2 additions & 0 deletions CHANGES.rst
Original file line number Diff line number Diff line change
Expand Up @@ -35,10 +35,12 @@ Enhancements

* Add ``fb_epid`` database support from the optics module.
* Add guide *How to interrupt/stop/abort a running plan & recover to safe settings*.
* Add ``close_pv`` & ``open_pv`` kwargs to ``ApsPssShutter``

Maintenance
------------

* Add unit tests for shutters.
* Set ``kind`` attribute to add plugin to ad.read_attrs list.
* Clear ``PVPositionerSoftDone``'s setpoint & readback subscriptions at exit.

Expand Down
100 changes: 93 additions & 7 deletions apstools/devices/shutters.py
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,10 @@ class ShutterBase(Device):
(constant) Text reported by ``state`` when not open or closed.
cannot move to this position
(default = "unknown")

name
*str* :
(kwarg, required) object's canonical name
"""

# fmt: off
Expand Down Expand Up @@ -145,9 +149,7 @@ def state(self):
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
self.valid_open_values = list(map(self.lowerCaseString, self.valid_open_values))
self.valid_close_values = list(
map(self.lowerCaseString, self.valid_close_values)
)
self.valid_close_values = list(map(self.lowerCaseString, self.valid_close_values))

@property
def isOpen(self):
Expand Down Expand Up @@ -264,6 +266,10 @@ class OneSignalShutter(ShutterBase):
one communication channel to use. See the
``ApsPssShutter`` as an example.

name
*str* :
(kwarg, required) object's canonical name

See ``ShutterBase`` for more parameters.

EXAMPLE
Expand Down Expand Up @@ -370,9 +376,30 @@ class ApsPssShutter(ShutterBase):
shutter motion. Change this as desired. Advise if this
default should be changed.

PARAMETERS

prefix
*str* :
EPICS PV prefix

name
*str* :
(kwarg, required) object's canonical name

close_pv
*str* :
(kwarg, optional) Name of EPICS PV to close the shutter.
If ``None``, defaults to ``"{prefix}Close"``.

open_pv
*str* :
(kwarg, optional) Name of EPICS PV to open the shutter.
If ``None``, defaults to ``"{prefix}Open"``.

EXAMPLE::

shutter_a = ApsPssShutter("2bma:A_shutter:", name="shutter")
shutter_a.wait_for_connection()

shutter_a.open()
shutter_a.close()
Expand Down Expand Up @@ -403,11 +430,16 @@ def in_a_plan(shutter):

# bo records that reset after a short time, set to 1 to move
# note: upper-case first characters here (unique to 9-ID)?
open_signal = Component(EpicsSignal, "Open")
close_signal = Component(EpicsSignal, "Close")
open_signal = FormattedComponent(EpicsSignal, "{self.open_pv}")
close_signal = FormattedComponent(EpicsSignal, "{self.close_pv}")

delay_s = 1.2 # allow time for shutter to move

def __init__(self, prefix, *args, close_pv=None, open_pv=None, **kwargs):
self.open_pv = open_pv or f"{prefix}Open"
self.close_pv = close_pv or f"{prefix}Close"
super().__init__(prefix, *args, **kwargs)

@property
def state(self):
"""is shutter "open", "close", or "unknown"?"""
Expand Down Expand Up @@ -451,6 +483,20 @@ class ApsPssShutterWithStatus(ApsPssShutter):
* a separate status PV tells if the shutter is open or closed
(see :func:`ApsPssShutter()` for alternative)

PARAMETERS

prefix
*str* :
EPICS PV prefix

state_pv
*str* :
Name of EPICS PV that provides shutter's current state.

name
*str* :
(kwarg, required) object's canonical name

EXAMPLE::

A_shutter = ApsPssShutterWithStatus(
Expand All @@ -461,6 +507,8 @@ class ApsPssShutterWithStatus(ApsPssShutter):
"2bma:B_shutter:",
"PA:02BM:STA_B_SBS_OPEN_PL",
name="B_shutter")
A_shutter.wait_for_connection()
B_shutter.wait_for_connection()

A_shutter.open()
A_shutter.close()
Expand Down Expand Up @@ -521,19 +569,24 @@ def wait_for_state(self, target, timeout=10, poll_s=0.01):

PARAMETERS

(kwarg, optional) Name of EPICS PV to close the shutter.
If ``None``, defaults to ``"{prefix}Close"``.

target
*[str]* :
list of strings containing acceptable values

timeout
*non-negative number* :
maximum amount of time (seconds) to wait for PSS state to reach target
(kwarg, optional) Maximum amount of time (seconds) to wait for PSS
state to reach target. If ``None``, defaults to ``10``.

poll_s
*non-negative number* :
Time to wait (seconds) in first polling cycle.
(kwarg, optional) Time to wait (seconds) in first polling cycle.
After first poll, this will be increased by ``_poll_factor_``
up to a maximum time of ``_poll_s_max_``.
If ``None``, defaults to ``0.01``.
"""
if timeout is not None:
expiration = time.time() + max(timeout, 0) # ensure non-negative timeout
Expand Down Expand Up @@ -594,6 +647,16 @@ class SimulatedApsPssShutterWithStatus(ApsPssShutterWithStatus):

.. index:: Ophyd Device; SimulatedApsPssShutterWithStatus

PARAMETERS

prefix
*str* :
EPICS PV prefix

name
*str* :
(kwarg, required) object's canonical name

EXAMPLE::

sim = SimulatedApsPssShutterWithStatus(name="sim")
Expand Down Expand Up @@ -650,9 +713,20 @@ class EpicsMotorShutter(OneSignalShutter):

.. index:: Ophyd Device; EpicsMotorShutter

PARAMETERS

prefix
*str* :
EPICS PV prefix

name
*str* :
(kwarg, required) object's canonical name

EXAMPLE::

tomo_shutter = EpicsMotorShutter("2bma:m23", name="tomo_shutter")
tomo_shutter.wait_for_connection()
tomo_shutter.close_value = 1.0 # default
tomo_shutter.open_value = 0.0 # default
tomo_shutter.tolerance = 0.01 # default
Expand Down Expand Up @@ -715,9 +789,20 @@ class EpicsOnOffShutter(OneSignalShutter):
The current position is determined by comparing the value of the control
with the expected open and close values.

PARAMETERS

prefix
*str* :
EPICS PV prefix

name
*str* :
(kwarg, required) object's canonical name

EXAMPLE::

bit_shutter = EpicsOnOffShutter("2bma:bit1", name="bit_shutter")
bit_shutter.wait_for_connection()
bit_shutter.close_value = 0 # default
bit_shutter.open_value = 1 # default
bit_shutter.open()
Expand All @@ -732,6 +817,7 @@ def planA():

signal = Component(EpicsSignal, "")


# -----------------------------------------------------------------------------
# :author: Pete R. Jemian
# :email: jemian@anl.gov
Expand Down
2 changes: 1 addition & 1 deletion apstools/devices/tests/test_lakeshores.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,10 +4,10 @@
Hardware is not available so test with best efforts
"""

from ...tests import IOC
from ..lakeshore_controllers import LakeShore336Device
from ..lakeshore_controllers import LakeShore340Device

IOC = "gp:"
PV_PREFIX = f"phony:{IOC}lakeshore:"


Expand Down
118 changes: 118 additions & 0 deletions apstools/devices/tests/test_shutters.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,118 @@
"""
Test the shutter classes.
"""

import pytest
from ophyd import Component
from ophyd import EpicsSignal

from ...tests import IOC
from ...tests import timed_pause
from .. import shutters

PV_BIT = f"{IOC}gp:bit20"
PV_MOTOR = f"{IOC}m16"


def set_and_assert_signal(signal, value):
if signal.get() != value:
signal.put(value)
timed_pause()
assert signal.get() == value


def operate_shutter(shutter):
shutter.open()
timed_pause()
assert shutter.state in ("open", "unknown")
if shutter.state != "unknown":
assert shutter.isOpen
assert not shutter.isClosed

shutter.close()
timed_pause()
assert shutter.state in ("close", "unknown")
if shutter.state != "unknown":
assert not shutter.isOpen
assert shutter.isClosed


@pytest.mark.parametrize("close_pv", [None, "a:close:pv", f"{IOC}XYZ:CLOSE_EPICS.VAL"])
@pytest.mark.parametrize("open_pv", [None, "that:open:pvname", f"{IOC}ABC:OPEN_EPICS.VAL"])
def test_ApsPssShutter(close_pv, open_pv):
"""
Structure tests only.

Cannot connect or operate! We don't have the APS when testing!
"""
prefix = "TEST:"
shutter = shutters.ApsPssShutter(prefix, name="shutter", close_pv=close_pv, open_pv=open_pv)
assert not shutter.connected

close_pv = close_pv or f"{prefix}Close"
open_pv = open_pv or f"{prefix}Open"
assert shutter.open_signal.pvname == open_pv
assert shutter.close_signal.pvname == close_pv


@pytest.mark.parametrize("state_pv", [None, "the:state:pv", "the:state:EPICS_PV", f"{IOC}hutch_BEAM_PRESENT"])
@pytest.mark.parametrize("close_pv", [None, "a:close:pv", f"{IOC}XYZ:CLOSE_EPICS.VAL"])
@pytest.mark.parametrize("open_pv", [None, "that:open:pvname", f"{IOC}ABC:OPEN_EPICS.VAL"])
def test_ApsPssShutterWithStatus(state_pv, close_pv, open_pv):
"""
Structure tests only.

Cannot connect or operate! We don't have the APS when testing!
"""
prefix = "TEST:"
shutter = shutters.ApsPssShutterWithStatus(
prefix, state_pv, name="shutter", close_pv=close_pv, open_pv=open_pv
)
assert not shutter.connected

assert shutter.pss_state.pvname == str(state_pv)


def test_EpicsMotorShutter():
shutter = shutters.EpicsMotorShutter(PV_MOTOR, name="shutter")
shutter.wait_for_connection()
shutter.close_value = 1.0 # default
shutter.open_value = 0.0 # default
shutter.tolerance = 0.01 # default

# put the shutter into known state
set_and_assert_signal(shutter.signal.user_setpoint, shutter.close_value)
operate_shutter(shutter)


def test_EpicsOnOffShutter():
shutter = shutters.EpicsOnOffShutter(PV_BIT, name="shutter")
shutter.close_value = 0 # default
shutter.open_value = 1 # default

# put the shutter into known state
set_and_assert_signal(shutter.signal, shutter.close_value)
operate_shutter(shutter)


def test_OneEpicsSignalShutter():
class OneEpicsSignalShutter(shutters.OneSignalShutter):
signal = Component(EpicsSignal, "")

shutter = OneEpicsSignalShutter(PV_BIT, name="shutter")
shutter.wait_for_connection()
assert shutter.connected

# put the shutter into known state
set_and_assert_signal(shutter.signal, shutter.close_value)
operate_shutter(shutter)


def test_OneSignalShutter():
shutter = shutters.OneSignalShutter(name="shutter")
operate_shutter(shutter)


def test_SimulatedApsPssShutterWithStatus():
shutter = shutters.SimulatedApsPssShutterWithStatus(name="shutter")
operate_shutter(shutter)