Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add types to timer.py #1260

Merged
merged 9 commits into from
Aug 5, 2024
2 changes: 1 addition & 1 deletion rclpy/rclpy/node.py
Original file line number Diff line number Diff line change
Expand Up @@ -1760,7 +1760,7 @@ def create_service(
def create_timer(
self,
timer_period_sec: float,
callback: Callable[[TimerInfo], None],
callback: Union[Callable[[], None], Callable[[TimerInfo], None], None],
callback_group: Optional[CallbackGroup] = None,
clock: Optional[Clock] = None,
autostart: bool = True,
Expand Down
77 changes: 57 additions & 20 deletions rclpy/rclpy/timer.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,17 +17,52 @@
from types import TracebackType
from typing import Callable
from typing import Optional
from typing import Protocol
from typing import Type
from typing import Union

from rclpy.callback_groups import CallbackGroup
from rclpy.clock import Clock, ClockType
from rclpy.clock import Clock
from rclpy.clock_type import ClockType
from rclpy.context import Context
from rclpy.destroyable import DestroyableType
from rclpy.exceptions import InvalidHandle, ROSInterruptException
from rclpy.impl.implementation_singleton import rclpy_implementation as _rclpy
from rclpy.time import Time
from rclpy.utilities import get_default_context


class TimerHandle(DestroyableType, Protocol):
fujitatomoya marked this conversation as resolved.
Show resolved Hide resolved
"""Type alias of _rclpy.Timer."""

def reset_timer(self) -> None:
...

def is_timer_ready(self) -> bool:
...

def call_timer(self) -> None:
...

def change_timer_period(self, period_nsec: int) -> None:
...

def time_until_next_call(self) -> Optional[int]:
...

def time_since_last_call(self) -> int:
...

def get_timer_period(self) -> int:
...

def cancel_timer(self) -> None:
...

def is_timer_canceled(self) -> bool:
...


class TimerInfo:
"""
Represents a timer call information.
Expand Down Expand Up @@ -64,7 +99,7 @@ class Timer:

def __init__(
self,
callback: Callable,
callback: Union[Callable[[], None], Callable[[TimerInfo], None], None],
callback_group: CallbackGroup,
timer_period_ns: int,
clock: Clock,
Expand Down Expand Up @@ -94,8 +129,10 @@ def __init__(
"""
self._context = get_default_context() if context is None else context
self._clock = clock
if self._context.handle is None:
raise RuntimeError('Context must be initialized before create a TimerHandle.')
with self._clock.handle, self._context.handle:
self.__timer = _rclpy.Timer(
self.__timer: TimerHandle = _rclpy.Timer(
self._clock.handle, self._context.handle, timer_period_ns, autostart)
self.timer_period_ns = timer_period_ns
self.callback = callback
Expand All @@ -104,10 +141,10 @@ def __init__(
self._executor_event = False

@property
def handle(self):
def handle(self) -> TimerHandle:
return self.__timer

def destroy(self):
def destroy(self) -> None:
"""
Destroy a container for a ROS timer.

Expand All @@ -117,44 +154,44 @@ def destroy(self):
self.__timer.destroy_when_not_in_use()

@property
def clock(self):
def clock(self) -> Clock:
return self._clock

@property
def timer_period_ns(self):
def timer_period_ns(self) -> int:
with self.__timer:
val = self.__timer.get_timer_period()
self.__timer_period_ns = val
return val

@timer_period_ns.setter
def timer_period_ns(self, value):
def timer_period_ns(self, value: int) -> None:
val = int(value)
with self.__timer:
self.__timer.change_timer_period(val)
self.__timer_period_ns = val

def is_ready(self):
def is_ready(self) -> bool:
with self.__timer:
return self.__timer.is_timer_ready()

def is_canceled(self):
def is_canceled(self) -> bool:
with self.__timer:
return self.__timer.is_timer_canceled()

def cancel(self):
def cancel(self) -> None:
with self.__timer:
self.__timer.cancel_timer()

def reset(self):
def reset(self) -> None:
with self.__timer:
self.__timer.reset_timer()

def time_since_last_call(self):
def time_since_last_call(self) -> int:
with self.__timer:
return self.__timer.time_since_last_call()

def time_until_next_call(self):
def time_until_next_call(self) -> Optional[int]:
with self.__timer:
return self.__timer.time_until_next_call()

Expand All @@ -173,7 +210,7 @@ def __exit__(
class Rate:
"""A utility for sleeping at a fixed rate."""

def __init__(self, timer: Timer, *, context):
def __init__(self, timer: Timer, *, context: Context):
"""
Create a Rate.

Expand All @@ -196,11 +233,11 @@ def __init__(self, timer: Timer, *, context):
# Set event when ROS is shutdown
context.on_shutdown(self._on_shutdown)

def _on_shutdown(self):
def _on_shutdown(self) -> None:
self._is_shutdown = True
self.destroy()

def destroy(self):
def destroy(self) -> None:
"""
Destroy a container for a ROS rate.

Expand All @@ -210,7 +247,7 @@ def destroy(self):
self._is_destroyed = True
self._event.set()

def _presleep(self):
def _presleep(self) -> None:
if self._is_shutdown:
raise ROSInterruptException()
if self._is_destroyed:
Expand All @@ -221,7 +258,7 @@ def _presleep(self):
with self._lock:
self._num_sleepers += 1

def _postsleep(self):
def _postsleep(self) -> None:
with self._lock:
self._num_sleepers -= 1
if self._num_sleepers == 0:
Expand All @@ -230,7 +267,7 @@ def _postsleep(self):
self.destroy()
raise ROSInterruptException()

def sleep(self):
def sleep(self) -> None:
"""
Block until timer triggers.

Expand Down