diff --git a/src/ophyd_async/epics/pmac/_pmacIO.py b/src/ophyd_async/epics/pmac/_pmacIO.py index ee269a9c5d..712f2b0828 100644 --- a/src/ophyd_async/epics/pmac/_pmacIO.py +++ b/src/ophyd_async/epics/pmac/_pmacIO.py @@ -3,7 +3,7 @@ from ophyd_async.core import DeviceVector, StandardReadable, SubsetEnum -from ..signal.signal import epics_signal_r, epics_signal_rw +from ..signal import epics_signal_r, epics_signal_rw class Pmac(StandardReadable): diff --git a/src/ophyd_async/epics/pmac/_pmacTrajectory.py b/src/ophyd_async/epics/pmac/_pmacTrajectory.py index 9251f7b37a..f6d2773e6e 100644 --- a/src/ophyd_async/epics/pmac/_pmacTrajectory.py +++ b/src/ophyd_async/epics/pmac/_pmacTrajectory.py @@ -6,18 +6,21 @@ from scanspec.specs import Frames, Path, Spec from velocity_profile import velocityprofile as vp -from ophyd_async.core import TriggerLogic -from ophyd_async.core.async_status import AsyncStatus, WatchableAsyncStatus -from ophyd_async.core.signal import observe_value -from ophyd_async.core.utils import WatcherUpdate -from ophyd_async.epics.motion import Motor +from ophyd_async.core import ( + AsyncStatus, + TriggerLogic, + WatchableAsyncStatus, + WatcherUpdate, + observe_value, +) +from ophyd_async.epics import motor from ophyd_async.epics.pmac import Pmac TICK_S = 0.000001 class PmacTrajInfo(BaseModel): - spec: Spec[Motor] = Field() + spec: Spec[motor.Motor] = Field() class PmacTrajectoryTriggerLogic(TriggerLogic[PmacTrajInfo]): @@ -44,7 +47,7 @@ async def prepare(self, value: PmacTrajInfo): positions: dict[int, npt.NDArray[np.float64]] = {} velocities: dict[int, npt.NDArray[np.float64]] = {} time_array: npt.NDArray[np.float64] = [] - cs_axes: dict[Motor, int] = {} + cs_axes: dict[motor.Motor, int] = {} # Which Axes are in use? scan_axes = chunk.axes() @@ -205,7 +208,7 @@ async def complete(self): if percent >= 100: break - async def get_cs_info(self, motor: Motor) -> tuple[str, int]: + async def get_cs_info(self, motor: motor.Motor) -> tuple[str, int]: output_link = await motor.output_link.get_value() # Split "@asyn(PORT,num)" into ["PORT", "num"] split = output_link.split("(")[1].rstrip(")").split(",") @@ -214,7 +217,7 @@ async def get_cs_info(self, motor: Motor) -> tuple[str, int]: cs_index = int(split[1].strip()) - 1 return cs_port, cs_index - def _calculate_gaps(self, chunk: Frames[Motor]): + def _calculate_gaps(self, chunk: Frames[motor.Motor]): inds = np.argwhere(chunk.gap) if len(inds) == 0: return len(chunk) @@ -223,7 +226,7 @@ def _calculate_gaps(self, chunk: Frames[Motor]): async def ramp_up_velocity_pos( - motor: Motor, + motor: motor.Motor, start_velocity: float, end_velocity: float, ramp_time: int = None, @@ -244,7 +247,7 @@ async def ramp_up_velocity_pos( async def make_velocity_profile( - axis: Motor, + axis: motor.Motor, v1: float, v2: float, distance: float, @@ -269,7 +272,7 @@ async def make_velocity_profile( return p -async def get_gap_profile(chunk: Frames[Motor], gap: int): +async def get_gap_profile(chunk: Frames[motor.Motor], gap: int): # Work out the velocity profiles of how to move to the start # Turnaround can't be less than 2 ms prev_point = gap - 1 @@ -301,7 +304,7 @@ async def get_gap_profile(chunk: Frames[Motor], gap: int): async def profile_between_points( - chunk: Frames[Motor], + chunk: Frames[motor.Motor], gap: int, min_time: float = 0.002, min_interval: float = 0.002, @@ -377,7 +380,7 @@ async def profile_between_points( async def point_velocities( - chunk: Frames[Motor], index: int, entry: bool = True + chunk: Frames[motor.Motor], index: int, entry: bool = True ) -> dict[str, float]: """Find the velocities of each axis over the entry/exit of current point""" velocities = {} @@ -412,11 +415,11 @@ async def point_velocities( async def calculate_profile_from_velocities( - chunk: Frames[Motor], - time_arrays: dict[Motor, npt.NDArray[np.float64]], - velocity_arrays: dict[Motor, npt.NDArray[np.float64]], - current_positions: dict[Motor, npt.NDArray[np.float64]], -) -> list[dict[Motor, npt.NDArray[np.float64]]]: + chunk: Frames[motor.Motor], + time_arrays: dict[motor.Motor, npt.NDArray[np.float64]], + velocity_arrays: dict[motor.Motor, npt.NDArray[np.float64]], + current_positions: dict[motor.Motor, npt.NDArray[np.float64]], +) -> list[dict[motor.Motor, npt.NDArray[np.float64]]]: # at this point we have time/velocity arrays with 2-4 values for each # axis. Each time represents a (instantaneous) change in acceleration. # We want to translate this into a move profile (time/position). @@ -441,8 +444,8 @@ async def calculate_profile_from_velocities( time_intervals.append(t - prev_time) prev_time = t # generate the profile positions in a temporary dict: - turnaround_profile: dict[Motor, npt.NDArray[np.float64]] = {} - turnaround_velocity: dict[Motor, npt.NDArray[np.float64]] = {} + turnaround_profile: dict[motor.Motor, npt.NDArray[np.float64]] = {} + turnaround_velocity: dict[motor.Motor, npt.NDArray[np.float64]] = {} # Do this for each axis' velocity and time arrays for axis in chunk.axes(): diff --git a/tests/epics/pmac/test_pmac.py b/tests/epics/pmac/test_pmac.py index 4f013f99d1..da7a1fa6bc 100644 --- a/tests/epics/pmac/test_pmac.py +++ b/tests/epics/pmac/test_pmac.py @@ -2,14 +2,14 @@ from scanspec.specs import Line, fly from ophyd_async.core import DeviceCollector, set_mock_value -from ophyd_async.epics.motion import Motor +from ophyd_async.epics import motor from ophyd_async.epics.pmac import Pmac, PmacTrajectoryTriggerLogic, PmacTrajInfo @pytest.fixture async def sim_x_motor(): async with DeviceCollector(mock=True): - sim_motor = Motor("BLxxI-MO-STAGE-01:X", name="sim_x_motor") + sim_motor = motor.Motor("BLxxI-MO-STAGE-01:X", name="sim_x_motor") set_mock_value(sim_motor.motor_egu, "mm") set_mock_value(sim_motor.precision, 3) @@ -24,7 +24,7 @@ async def sim_x_motor(): @pytest.fixture async def sim_y_motor(): async with DeviceCollector(mock=True): - sim_motor = Motor("BLxxI-MO-STAGE-01:Y", name="sim_x_motor") + sim_motor = motor.Motor("BLxxI-MO-STAGE-01:Y", name="sim_x_motor") set_mock_value(sim_motor.motor_egu, "mm") set_mock_value(sim_motor.precision, 3)