Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Adds types to Lifecycle Objects #1338

Merged
merged 39 commits into from
Oct 4, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
39 commits
Select commit Hold shift + click to select a range
1198c24
Add types to wait_for_message.py
InvincibleRMC Jul 31, 2024
a819955
Add copyright
InvincibleRMC Jul 31, 2024
6b52d12
re-run CI
InvincibleRMC Jul 31, 2024
a6072c1
re-run CI
InvincibleRMC Jul 31, 2024
37c87da
Merge branch 'rolling' into wait-for-message
InvincibleRMC Aug 9, 2024
e39049a
move Handles into _rclpy_pybind11
InvincibleRMC Aug 10, 2024
33cf136
Move Handles into type stubs:
InvincibleRMC Aug 10, 2024
22edf24
Move Handles into type stubs
InvincibleRMC Aug 10, 2024
90f1518
move [] into string
InvincibleRMC Aug 10, 2024
1019bcc
fix imports
InvincibleRMC Aug 10, 2024
7f22be9
remove extra line
InvincibleRMC Aug 10, 2024
a61090f
puy _rclpy.Publisher in quotes
InvincibleRMC Aug 10, 2024
c18a10a
fix capitalization
InvincibleRMC Aug 10, 2024
e45fd69
Add EventHandle Constructor
InvincibleRMC Aug 10, 2024
e010378
Use RuntimeError for context
InvincibleRMC Aug 21, 2024
05d5094
Merge branch 'rolling' into wait-for-message
mergify[bot] Aug 21, 2024
47c43f5
Add TYPE_CHECKING import
InvincibleRMC Aug 22, 2024
19e7afc
start lifecycle
InvincibleRMC Aug 22, 2024
82930c6
Add types to lifecylce objects
InvincibleRMC Aug 22, 2024
ad431f2
move types into string literals
InvincibleRMC Aug 22, 2024
9b55cec
use unpack instead
InvincibleRMC Aug 22, 2024
79ad276
flake8
InvincibleRMC Aug 22, 2024
31f6b9b
move Context into string
InvincibleRMC Aug 22, 2024
31a7734
update create_lifecycle_publisher
InvincibleRMC Aug 22, 2024
94098d8
test Unpack tuple
InvincibleRMC Aug 22, 2024
e570f00
merge
InvincibleRMC Aug 23, 2024
30d22fe
use Enum
InvincibleRMC Aug 23, 2024
d0e2786
use RuntimeError
InvincibleRMC Aug 23, 2024
693595a
Merge branch 'rolling' into TransitionCallbackReturn
mergify[bot] Aug 26, 2024
b2dca14
Mpve Unpack import
InvincibleRMC Aug 26, 2024
f6316f9
move type inside TYPE_CHECKING block
InvincibleRMC Aug 27, 2024
8efb1a1
flake8
InvincibleRMC Aug 27, 2024
2b06516
Merge branch 'rolling' into TransitionCallbackReturn
InvincibleRMC Aug 30, 2024
7663c24
Merge branch 'rolling' into TransitionCallbackReturn
InvincibleRMC Sep 4, 2024
54f7783
Merge branch 'rolling' into TransitionCallbackReturn
InvincibleRMC Sep 4, 2024
883980a
Merge branch 'rolling' into TransitionCallbackReturn
InvincibleRMC Sep 7, 2024
f0cff7d
Merge branch 'rolling' into TransitionCallbackReturn
InvincibleRMC Sep 11, 2024
e4917d3
Merge branch 'rolling' into TransitionCallbackReturn
InvincibleRMC Sep 13, 2024
9eabc30
Merge branch 'rolling' into TransitionCallbackReturn
InvincibleRMC Sep 26, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
97 changes: 83 additions & 14 deletions rclpy/rclpy/impl/_rclpy_pybind11.pyi
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ from __future__ import annotations

from enum import Enum, IntEnum
from types import TracebackType
from typing import Any, Generic, Literal, overload, Sequence, TypedDict
from typing import Any, Generic, Literal, overload, Sequence, TypeAlias, TypedDict

from rclpy.clock import JumpHandle
from rclpy.clock_type import ClockType
Expand Down Expand Up @@ -101,21 +101,23 @@ class rcl_duration_t:
nanoseconds: int


class rcl_subscription_event_type_t(IntEnum):
RCL_SUBSCRIPTION_REQUESTED_DEADLINE_MISSED: int
RCL_SUBSCRIPTION_LIVELINESS_CHANGED: int
RCL_SUBSCRIPTION_REQUESTED_INCOMPATIBLE_QOS: int
RCL_SUBSCRIPTION_MESSAGE_LOST: int
RCL_SUBSCRIPTION_INCOMPATIBLE_TYPE: int
RCL_SUBSCRIPTION_MATCHED: int
class rcl_subscription_event_type_t(Enum):
_value_: int
RCL_SUBSCRIPTION_REQUESTED_DEADLINE_MISSED = ...
RCL_SUBSCRIPTION_LIVELINESS_CHANGED = ...
RCL_SUBSCRIPTION_REQUESTED_INCOMPATIBLE_QOS = ...
RCL_SUBSCRIPTION_MESSAGE_LOST = ...
RCL_SUBSCRIPTION_INCOMPATIBLE_TYPE = ...
RCL_SUBSCRIPTION_MATCHED = ...


class rcl_publisher_event_type_t(IntEnum):
RCL_PUBLISHER_OFFERED_DEADLINE_MISSED: int
RCL_PUBLISHER_LIVELINESS_LOST: int
RCL_PUBLISHER_OFFERED_INCOMPATIBLE_QOS: int
RCL_PUBLISHER_INCOMPATIBLE_TYPE: int
RCL_PUBLISHER_MATCHED: int
class rcl_publisher_event_type_t(Enum):
_value_: int
RCL_PUBLISHER_OFFERED_DEADLINE_MISSED = ...
RCL_PUBLISHER_LIVELINESS_LOST = ...
RCL_PUBLISHER_OFFERED_INCOMPATIBLE_QOS = ...
RCL_PUBLISHER_INCOMPATIBLE_TYPE = ...
RCL_PUBLISHER_MATCHED = ...


class EventHandle(Destroyable):
Expand All @@ -135,6 +137,73 @@ class EventHandle(Destroyable):
"""Get pending data from a ready event."""


LifecycleStateMachineState: TypeAlias = tuple[int, str]


class LifecycleStateMachine(Destroyable):

def __init__(self, node: Node, enable_com_interface: bool) -> None: ...

@property
def initialized(self) -> bool:
"""Check if state machine is initialized."""

@property
def current_state(self) -> LifecycleStateMachineState:
"""Get the current state machine state."""

@property
def available_states(self) -> list[LifecycleStateMachineState]:
"""Get the available states."""

@property
def available_transitions(self) -> list[tuple[int, str, int, str, int, str]]:
"""Get the available transitions."""

@property
def transition_graph(self) -> list[tuple[int, str, int, str, int, str]]:
"""Get the transition graph."""

def get_transition_by_label(self, label: str) -> int:
"""Get the transition id from a transition label."""

def trigger_transition_by_id(self, transition_id: int, publish_update: bool) -> None:
"""Trigger a transition by transition id."""

def trigger_transition_by_label(self, label: str, publish_update: bool) -> None:
"""Trigger a transition by label."""

@property
def service_change_state(self) -> Service:
"""Get the change state service."""

@property
def service_get_state(self) -> Service:
"""Get the get state service."""

@property
def service_get_available_states(self) -> Service:
"""Get the get available states service."""

@property
def service_get_available_transitions(self) -> Service:
"""Get the get available transitions service."""

@property
def service_get_transition_graph(self) -> Service:
"""Get the get transition graph service."""


class TransitionCallbackReturnType(Enum):
_value_: int
SUCCESS = ...
FAILURE = ...
ERROR = ...

def to_label(self) -> str:
"""Convert the transition callback return code to a transition label."""


class GuardCondition(Destroyable):

def __init__(self, context: Context) -> None: ...
Expand Down
50 changes: 36 additions & 14 deletions rclpy/rclpy/lifecycle/managed_entity.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,63 +13,85 @@
# limitations under the License.

from functools import wraps
from typing import Any, Callable, Dict, List, Optional, overload, TYPE_CHECKING, Union

from ..impl.implementation_singleton import rclpy_implementation as _rclpy

if TYPE_CHECKING:
from typing import TypeAlias
from rclpy.lifecycle.node import LifecycleState

TransitionCallbackReturn = _rclpy.TransitionCallbackReturnType

TransitionCallbackReturn: 'TypeAlias' = _rclpy.TransitionCallbackReturnType


class ManagedEntity:

def on_configure(self, state) -> TransitionCallbackReturn:
def on_configure(self, state: 'LifecycleState') -> TransitionCallbackReturn:
"""Handle configure transition request."""
return TransitionCallbackReturn.SUCCESS

def on_cleanup(self, state) -> TransitionCallbackReturn:
def on_cleanup(self, state: 'LifecycleState') -> TransitionCallbackReturn:
"""Handle cleanup transition request."""
return TransitionCallbackReturn.SUCCESS

def on_shutdown(self, state) -> TransitionCallbackReturn:
def on_shutdown(self, state: 'LifecycleState') -> TransitionCallbackReturn:
"""Handle shutdown transition request."""
return TransitionCallbackReturn.SUCCESS

def on_activate(self, state) -> TransitionCallbackReturn:
def on_activate(self, state: 'LifecycleState') -> TransitionCallbackReturn:
"""Handle activate transition request."""
return TransitionCallbackReturn.SUCCESS

def on_deactivate(self, state) -> TransitionCallbackReturn:
def on_deactivate(self, state: 'LifecycleState') -> TransitionCallbackReturn:
"""Handle deactivate transition request."""
return TransitionCallbackReturn.SUCCESS

def on_error(self, state) -> TransitionCallbackReturn:
def on_error(self, state: 'LifecycleState') -> TransitionCallbackReturn:
"""Handle error transition request."""
return TransitionCallbackReturn.SUCCESS


class SimpleManagedEntity(ManagedEntity):
"""A simple managed entity that only sets a flag when activated/deactivated."""

def __init__(self):
def __init__(self) -> None:
self._enabled = False

def on_activate(self, state) -> TransitionCallbackReturn:
def on_activate(self, state: 'LifecycleState') -> TransitionCallbackReturn:
self._enabled = True
return TransitionCallbackReturn.SUCCESS

def on_deactivate(self, state) -> TransitionCallbackReturn:
def on_deactivate(self, state: 'LifecycleState') -> TransitionCallbackReturn:
self._enabled = False
return TransitionCallbackReturn.SUCCESS

@property
def is_activated(self):
def is_activated(self) -> bool:
return self._enabled

@staticmethod
def when_enabled(wrapped=None, *, when_not_enabled=None):
def decorator(wrapped):
@overload
def when_enabled(wrapped: None, *,
when_not_enabled: Optional[Callable[..., None]] = None
) -> Callable[[Callable[..., None]], Callable[..., None]]: ...

@staticmethod
@overload
def when_enabled(wrapped: Callable[..., None], *,
when_not_enabled: Optional[Callable[..., None]] = None
) -> Callable[..., None]: ...

@staticmethod
def when_enabled(wrapped: Optional[Callable[..., None]] = None, *,
when_not_enabled: Optional[Callable[..., None]] = None) -> Union[
Callable[..., None],
Callable[[Callable[..., None]], Callable[..., None]]
]:
def decorator(wrapped: Callable[..., None]) -> Callable[..., None]:
@wraps(wrapped)
def only_when_enabled_wrapper(self: SimpleManagedEntity, *args, **kwargs):
def only_when_enabled_wrapper(self: SimpleManagedEntity, *args: List[Any],
**kwargs: Dict[str, Any]) -> None:
if not self._enabled:
if when_not_enabled is not None:
when_not_enabled()
Expand Down
Loading