diff --git a/src/ophyd_async/sim/demo/_sim_motor.py b/src/ophyd_async/sim/demo/_sim_motor.py index f1c7f043df..309a852a76 100644 --- a/src/ophyd_async/sim/demo/_sim_motor.py +++ b/src/ophyd_async/sim/demo/_sim_motor.py @@ -2,6 +2,7 @@ import contextlib import time +import numpy as np from bluesky.protocols import Movable, Stoppable from ophyd_async.core import ( @@ -44,22 +45,20 @@ def __init__(self, name="", instant=True) -> None: async def _move(self, old_position: float, new_position: float, move_time: float): start = time.monotonic() - distance = abs(new_position - old_position) - while True: - time_elapsed = round(time.monotonic() - start, 2) - - # update position based on time elapsed - if time_elapsed >= move_time: - # successfully reached our target position - self._user_readback_set(new_position) - break - else: - current_position = old_position + distance * time_elapsed / move_time - - self._user_readback_set(current_position) - - # 10hz update loop - await asyncio.sleep(0.1) + # Make an array of relative update times at 10Hz intervals + update_times = np.arange(0.1, move_time, 0.1) + # With the end position appended + update_times = np.concatenate((update_times, [move_time])) + # Interpolate the [old, new] position array with those update times + new_positions = np.interp( + update_times, [0, move_time], [old_position, new_position] + ) + for update_time, new_position in zip(update_times, new_positions, strict=True): + # Calculate how long to wait to get there + relative_time = time.monotonic() - start + await asyncio.sleep(update_time - relative_time) + # Update the readback position + self._user_readback_set(new_position) @WatchableAsyncStatus.wrap async def set(self, value: float): @@ -75,22 +74,25 @@ async def set(self, value: float): self.velocity.get_value(), ) # If zero velocity, do instant move - move_time = abs(new_position - old_position) / velocity if velocity else 0 - self._move_status = AsyncStatus( - self._move(old_position, new_position, move_time) - ) - # If stop is called then this will raise a CancelledError, ignore it - with contextlib.suppress(asyncio.CancelledError): - async for current_position in observe_value( - self.user_readback, done_status=self._move_status - ): - yield WatcherUpdate( - current=current_position, - initial=old_position, - target=new_position, - name=self.name, - unit=units, - ) + if velocity == 0: + self._user_readback_set(new_position) + else: + move_time = abs(new_position - old_position) / velocity + self._move_status = AsyncStatus( + self._move(old_position, new_position, move_time) + ) + # If stop is called then this will raise a CancelledError, ignore it + with contextlib.suppress(asyncio.CancelledError): + async for current_position in observe_value( + self.user_readback, done_status=self._move_status + ): + yield WatcherUpdate( + current=current_position, + initial=old_position, + target=new_position, + name=self.name, + unit=units, + ) if not self._set_success: raise RuntimeError("Motor was stopped") diff --git a/tests/sim/demo/test_sim_motor.py b/tests/sim/demo/test_sim_motor.py index 9a8ac75a6a..bb92697062 100644 --- a/tests/sim/demo/test_sim_motor.py +++ b/tests/sim/demo/test_sim_motor.py @@ -1,6 +1,7 @@ import asyncio import time +import pytest from bluesky.plans import spiral_square from bluesky.run_engine import RunEngine @@ -38,6 +39,43 @@ async def test_slow_move(): assert elapsed < 1 +async def test_negative_move(): + m1 = SimMotor("M1", instant=False) + await m1.connect() + assert await m1.user_readback.get_value() == 0.0 + status = m1.set(-0.19) + updates = [] + status.watch(lambda **kwargs: updates.append(kwargs)) + await status + assert await m1.user_readback.get_value() == -0.19 + assert updates == [ + { + "current": 0.0, + "initial": 0.0, + "name": "M1", + "target": -0.19, + "time_elapsed": pytest.approx(0, abs=0.05), + "unit": "mm", + }, + { + "current": -0.1, + "initial": 0.0, + "name": "M1", + "target": -0.19, + "time_elapsed": pytest.approx(0.1, abs=0.05), + "unit": "mm", + }, + { + "current": -0.19, + "initial": 0.0, + "name": "M1", + "target": -0.19, + "time_elapsed": pytest.approx(0.19, abs=0.05), + "unit": "mm", + }, + ] + + async def test_stop(): async with DeviceCollector(): m1 = SimMotor("M1", instant=False)