diff --git a/rclpy/CMakeLists.txt b/rclpy/CMakeLists.txt index 30ce39b29..f0fde4a8d 100644 --- a/rclpy/CMakeLists.txt +++ b/rclpy/CMakeLists.txt @@ -199,6 +199,7 @@ if(BUILD_TESTING) test/test_parameter.py test/test_parameters_callback.py test/test_qos.py + test/test_qos_event.py test/test_task.py test/test_time_source.py test/test_time.py diff --git a/rclpy/rclpy/node.py b/rclpy/rclpy/node.py index d46b8e5c4..0afa6f4da 100644 --- a/rclpy/rclpy/node.py +++ b/rclpy/rclpy/node.py @@ -52,6 +52,8 @@ from rclpy.qos import qos_profile_parameter_events from rclpy.qos import qos_profile_services_default from rclpy.qos import QoSProfile +from rclpy.qos_event import PublisherEventCallbacks +from rclpy.qos_event import SubscriptionEventCallbacks from rclpy.service import Service from rclpy.subscription import Subscription from rclpy.time_source import TimeSource @@ -696,7 +698,9 @@ def create_publisher( msg_type, topic: str, *, - qos_profile: QoSProfile = qos_profile_default + qos_profile: QoSProfile = qos_profile_default, + callback_group: Optional[CallbackGroup] = None, + event_callbacks: Optional[PublisherEventCallbacks] = None, ) -> Publisher: """ Create a new publisher. @@ -704,8 +708,13 @@ def create_publisher( :param msg_type: The type of ROS messages the publisher will publish. :param topic: The name of the topic the publisher will publish to. :param qos_profile: The quality of service profile to apply to the publisher. + :param callback_group: The callback group for the publisher's event handlers. + If ``None``, then the node's default callback group is used. + :param event_callbacks: User-defined callbacks for middleware events. :return: The new publisher. """ + callback_group = callback_group or self.default_callback_group + # this line imports the typesupport for the message module if not already done check_for_type_support(msg_type) failed = False @@ -721,9 +730,16 @@ def create_publisher( publisher_handle = Handle(publisher_capsule) publisher_handle.requires(self.handle) - publisher = Publisher(publisher_handle, msg_type, topic, qos_profile) + publisher = Publisher( + publisher_handle, msg_type, topic, qos_profile, + event_callbacks=event_callbacks or PublisherEventCallbacks(), + callback_group=callback_group) self.__publishers.append(publisher) self._wake_executor() + + for event_callback in publisher.event_handlers: + self.add_waitable(event_callback) + return publisher def create_subscription( @@ -733,7 +749,8 @@ def create_subscription( callback: Callable[[MsgType], None], *, qos_profile: QoSProfile = qos_profile_default, - callback_group: CallbackGroup = None, + callback_group: Optional[CallbackGroup] = None, + event_callbacks: Optional[SubscriptionEventCallbacks] = None, raw: bool = False ) -> Subscription: """ @@ -746,11 +763,12 @@ def create_subscription( :param qos_profile: The quality of service profile to apply to the subscription. :param callback_group: The callback group for the subscription. If ``None``, then the nodes default callback group is used. + :param event_callbacks: User-defined callbacks for middleware events. :param raw: If ``True``, then received messages will be stored in raw binary representation. """ - if callback_group is None: - callback_group = self.default_callback_group + callback_group = callback_group or self.default_callback_group + # this line imports the typesupport for the message module if not already done check_for_type_support(msg_type) failed = False @@ -768,10 +786,14 @@ def create_subscription( subscription = Subscription( subscription_handle, msg_type, - topic, callback, callback_group, qos_profile, raw) + topic, callback, callback_group, qos_profile, raw, + event_callbacks=event_callbacks or SubscriptionEventCallbacks()) self.__subscriptions.append(subscription) callback_group.add_entity(subscription) - self._wake_executor() + + for event_handler in subscription.event_handlers: + self.add_waitable(event_handler) + return subscription def create_client( diff --git a/rclpy/rclpy/publisher.py b/rclpy/rclpy/publisher.py index df683977e..818f30d98 100644 --- a/rclpy/rclpy/publisher.py +++ b/rclpy/rclpy/publisher.py @@ -14,8 +14,11 @@ from typing import TypeVar +from rclpy.callback_groups import CallbackGroup +from rclpy.handle import Handle from rclpy.impl.implementation_singleton import rclpy_implementation as _rclpy from rclpy.qos import QoSProfile +from rclpy.qos_event import PublisherEventCallbacks MsgType = TypeVar('MsgType') @@ -24,10 +27,12 @@ class Publisher: def __init__( self, - publisher_handle, + publisher_handle: Handle, msg_type: MsgType, topic: str, qos_profile: QoSProfile, + event_callbacks: PublisherEventCallbacks, + callback_group: CallbackGroup, ) -> None: """ Create a container for a ROS publisher. @@ -48,6 +53,9 @@ def __init__( self.topic = topic self.qos_profile = qos_profile + self.event_handlers = event_callbacks.create_event_handlers( + callback_group, publisher_handle) + def publish(self, msg: MsgType) -> None: """ Send a message to the topic for the publisher. diff --git a/rclpy/rclpy/qos_event.py b/rclpy/rclpy/qos_event.py new file mode 100644 index 000000000..954e04e8f --- /dev/null +++ b/rclpy/rclpy/qos_event.py @@ -0,0 +1,231 @@ +# Copyright 2019 Amazon.com, Inc. or its affiliates. All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from enum import IntEnum +from typing import Callable +from typing import List +from typing import NamedTuple +from typing import Optional + +import rclpy +from rclpy.callback_groups import CallbackGroup +from rclpy.handle import Handle +from rclpy.impl.implementation_singleton import rclpy_implementation as _rclpy +from rclpy.waitable import NumberOfEntities +from rclpy.waitable import Waitable + + +class QoSPublisherEventType(IntEnum): + """ + Enum for types of QoS events that a Publisher can receive. + + This enum matches the one defined in rcl/event.h + """ + + RCL_PUBLISHER_OFFERED_DEADLINE_MISSED = 0 + RCL_PUBLISHER_LIVELINESS_LOST = 1 + + +class QoSSubscriptionEventType(IntEnum): + """ + Enum for types of QoS events that a Subscription can receive. + + This enum matches the one defined in rcl/event.h + """ + + RCL_SUBSCRIPTION_REQUESTED_DEADLINE_MISSED = 0 + RCL_SUBSCRIPTION_LIVELINESS_CHANGED = 1 + + +""" +Payload type for Subscription Deadline callback. + +Mirrors rmw_requested_deadline_missed_status_t from rmw/types.h +""" +QoSRequestedDeadlineMissedInfo = NamedTuple( + 'QoSRequestedDeadlineMissedInfo', [ + ('total_count', 'int'), + ('total_count_change', 'int'), + ]) + +""" +Payload type for Subscription Liveliness callback. + +Mirrors rmw_liveliness_changed_status_t from rmw/types.h +""" +QoSLivelinessChangedInfo = NamedTuple( + 'QoSLivelinessChangedInfo', [ + ('alive_count', 'int'), + ('not_alive_count', 'int'), + ('alive_count_change', 'int'), + ('not_alive_count_change', 'int'), + ]) + +""" +Payload type for Publisher Deadline callback. + +Mirrors rmw_offered_deadline_missed_status_t from rmw/types.h +""" +QoSOfferedDeadlineMissedInfo = NamedTuple( + 'QoSOfferedDeadlineMissedInfo', [ + ('total_count', 'int'), + ('total_count_change', 'int'), + ]) + +""" +Payload type for Publisher Liveliness callback. + +Mirrors rmw_liveliness_lost_status_t from rmw/types.h +""" +QoSLivelinessLostInfo = NamedTuple( + 'QoSLivelinessLostInfo', [ + ('total_count', 'int'), + ('total_count_change', 'int'), + ]) + + +class QoSEventHandler(Waitable): + """Waitable type to handle QoS events.""" + + def __init__( + self, + *, + callback_group: CallbackGroup, + callback: Callable, + event_type: IntEnum, + parent_handle: Handle, + ): + # Waitable init adds self to callback_group + super().__init__(callback_group) + self.event_type = event_type + self.callback = callback + + self._parent_handle = parent_handle + with parent_handle as parent_capsule: + event_capsule = _rclpy.rclpy_create_event(event_type, parent_capsule) + self._event_handle = Handle(event_capsule) + self._event_handle.requires(self._parent_handle) + self._ready_to_take_data = False + self._event_index = None + + # Start Waitable API + def is_ready(self, wait_set): + """Return True if entities are ready in the wait set.""" + if self._event_index is None: + return False + if _rclpy.rclpy_wait_set_is_ready('event', wait_set, self._event_index): + self._ready_to_take_data = True + return self._ready_to_take_data + + def take_data(self): + """Take stuff from lower level so the wait set doesn't immediately wake again.""" + if self._ready_to_take_data: + self._ready_to_take_data = False + with self._parent_handle as parent_capsule, self._event_handle as event_capsule: + return _rclpy.rclpy_take_event(event_capsule, parent_capsule, self.event_type) + return None + + async def execute(self, taken_data): + """Execute work after data has been taken from a ready wait set.""" + if not taken_data: + return + await rclpy.executors.await_or_execute(self.callback, [taken_data]) + + def get_num_entities(self): + """Return number of each type of entity used.""" + return NumberOfEntities(num_events=1) + + def add_to_wait_set(self, wait_set): + """Add entites to wait set.""" + with self._event_handle as event_capsule: + self._event_index = _rclpy.rclpy_wait_set_add_entity('event', wait_set, event_capsule) + # End Waitable API + + +class SubscriptionEventCallbacks: + """Container to provide middleware event callbacks for a Subscription.""" + + def __init__( + self, + *, + deadline: Optional[Callable[[QoSRequestedDeadlineMissedInfo], None]] = None, + liveliness: Optional[Callable[[QoSLivelinessChangedInfo], None]] = None, + ) -> None: + """ + Constructor. + + :param deadline: A user-defined callback that is called when a topic misses our + requested Deadline. + :param liveliness: A user-defined callback that is called when the Liveliness of + a Publisher on subscribed topic changes. + """ + self.deadline = deadline + self.liveliness = liveliness + + def create_event_handlers( + self, callback_group: CallbackGroup, subscription_handle: Handle, + ) -> List[QoSEventHandler]: + event_handlers = [] + if self.deadline: + event_handlers.append(QoSEventHandler( + callback_group=callback_group, + callback=self.deadline, + event_type=QoSSubscriptionEventType.RCL_SUBSCRIPTION_REQUESTED_DEADLINE_MISSED, + parent_handle=subscription_handle)) + if self.liveliness: + event_handlers.append(QoSEventHandler( + callback_group=callback_group, + callback=self.liveliness, + event_type=QoSSubscriptionEventType.RCL_SUBSCRIPTION_LIVELINESS_CHANGED, + parent_handle=subscription_handle)) + return event_handlers + + +class PublisherEventCallbacks: + """Container to provide middleware event callbacks for a Publisher.""" + + def __init__( + self, + *, + deadline: Optional[Callable[[QoSOfferedDeadlineMissedInfo], None]] = None, + liveliness: Optional[Callable[[QoSLivelinessLostInfo], None]] = None + ) -> None: + """ + Constructor. + + :param deadline: A user-defined callback that is called when the Publisher misses + its offered Deadline. + :param liveliness: A user-defined callback that is called when this Publisher + fails to signal its Liveliness and is reported as not-alive. + """ + self.deadline = deadline + self.liveliness = liveliness + + def create_event_handlers( + self, callback_group: CallbackGroup, publisher_handle: Handle, + ) -> List[QoSEventHandler]: + event_handlers = [] + if self.deadline: + event_handlers.append(QoSEventHandler( + callback_group=callback_group, + callback=self.deadline, + event_type=QoSPublisherEventType.RCL_PUBLISHER_OFFERED_DEADLINE_MISSED, + parent_handle=publisher_handle)) + if self.liveliness: + event_handlers.append(QoSEventHandler( + callback_group=callback_group, + callback=self.liveliness, + event_type=QoSPublisherEventType.RCL_PUBLISHER_LIVELINESS_LOST, + parent_handle=publisher_handle)) + return event_handlers diff --git a/rclpy/rclpy/subscription.py b/rclpy/rclpy/subscription.py index 25596a290..f9c8b9061 100644 --- a/rclpy/rclpy/subscription.py +++ b/rclpy/rclpy/subscription.py @@ -16,7 +16,10 @@ from typing import TypeVar from rclpy.callback_groups import CallbackGroup +from rclpy.handle import Handle from rclpy.qos import QoSProfile +from rclpy.qos_event import SubscriptionEventCallbacks + # For documentation only MsgType = TypeVar('MsgType') @@ -26,13 +29,14 @@ class Subscription: def __init__( self, - subscription_handle, + subscription_handle: Handle, msg_type: MsgType, topic: str, callback: Callable, callback_group: CallbackGroup, qos_profile: QoSProfile, - raw: bool + raw: bool, + event_callbacks: SubscriptionEventCallbacks, ) -> None: """ Create a container for a ROS subscription. @@ -62,6 +66,9 @@ def __init__( self.qos_profile = qos_profile self.raw = raw + self.event_handlers = event_callbacks.create_event_handlers( + callback_group, subscription_handle) + @property def handle(self): return self.__handle diff --git a/rclpy/src/rclpy/_rclpy.c b/rclpy/src/rclpy/_rclpy.c index 2e8ff89de..a141df4c5 100644 --- a/rclpy/src/rclpy/_rclpy.c +++ b/rclpy/src/rclpy/_rclpy.c @@ -37,39 +37,7 @@ #include <rosidl_generator_c/message_type_support_struct.h> #include "rclpy_common/common.h" - -typedef struct -{ - // Important: a pointer to a structure is also a pointer to its first member. - // The subscription must be first in the struct to compare sub.handle.pointer to an address - // in a wait set. - rcl_subscription_t subscription; - rcl_node_t * node; -} rclpy_subscription_t; - -typedef struct -{ - rcl_publisher_t publisher; - rcl_node_t * node; -} rclpy_publisher_t; - -typedef struct -{ - // Important: a pointer to a structure is also a pointer to its first member. - // The client must be first in the struct to compare cli.handle.pointer to an address - // in a wait set. - rcl_client_t client; - rcl_node_t * node; -} rclpy_client_t; - -typedef struct -{ - // Important: a pointer to a structure is also a pointer to its first member. - // The service must be first in the struct to compare srv.handle.pointer to an address - // in a wait set. - rcl_service_t service; - rcl_node_t * node; -} rclpy_service_t; +#include "./_rclpy_qos_event.c" void _rclpy_context_capsule_destructor(PyObject * capsule) @@ -2498,6 +2466,9 @@ rclpy_wait_set_add_entity(PyObject * Py_UNUSED(self), PyObject * args) return NULL; } ret = rcl_wait_set_add_guard_condition(wait_set, guard_condition, &index); + } else if (0 == strcmp(entity_type, "event")) { + rcl_event_t * event = (rcl_event_t *)PyCapsule_GetPointer(pyentity, "rcl_event_t"); + ret = rcl_wait_set_add_event(wait_set, event, &index); } else { ret = RCL_RET_ERROR; // to avoid a linter warning PyErr_Format(PyExc_RuntimeError, @@ -2563,6 +2534,9 @@ rclpy_wait_set_is_ready(PyObject * Py_UNUSED(self), PyObject * args) } else if (0 == strcmp(entity_type, "guard_condition")) { entities = (void *)wait_set->guard_conditions; num_entities = wait_set->size_of_guard_conditions; + } else if (0 == strcmp(entity_type, "event")) { + entities = (void *)wait_set->events; + num_entities = wait_set->size_of_events; } else { PyErr_Format(PyExc_RuntimeError, "'%s' is not a known entity", entity_type); @@ -3634,9 +3608,9 @@ rclpy_assert_liveliness(PyObject * Py_UNUSED(self), PyObject * args) return NULL; } } else if (PyCapsule_IsValid(pyentity, "rclpy_publisher_t")) { - rcl_publisher_t * publisher = (rcl_publisher_t *)PyCapsule_GetPointer( + rclpy_publisher_t * publisher = (rclpy_publisher_t *)PyCapsule_GetPointer( pyentity, "rclpy_publisher_t"); - if (RCL_RET_OK != rcl_publisher_assert_liveliness(publisher)) { + if (RCL_RET_OK != rcl_publisher_assert_liveliness(&publisher->publisher)) { PyErr_Format(PyExc_RuntimeError, "Failed to assert liveliness on the Publisher: %s", rcl_get_error_string().str); rcl_reset_error(); @@ -4645,6 +4619,11 @@ static PyMethodDef rclpy_methods[] = { "rclpy_create_timer", rclpy_create_timer, METH_VARARGS, "Create a Timer." }, + { + "rclpy_create_event", rclpy_create_event, METH_VARARGS, + "Create an Event." + }, + { "rclpy_create_guard_condition", rclpy_create_guard_condition, METH_VARARGS, "Create a general purpose guard_condition." @@ -4771,6 +4750,10 @@ static PyMethodDef rclpy_methods[] = { "rclpy_take_response", rclpy_take_response, METH_VARARGS, "rclpy_take_response." }, + { + "rclpy_take_event", rclpy_take_event, METH_VARARGS, + "Get the pending data for a ready QoS Event." + }, { "rclpy_ok", rclpy_ok, METH_VARARGS, diff --git a/rclpy/src/rclpy/_rclpy_qos_event.c b/rclpy/src/rclpy/_rclpy_qos_event.c new file mode 100644 index 000000000..f8659df36 --- /dev/null +++ b/rclpy/src/rclpy/_rclpy_qos_event.c @@ -0,0 +1,347 @@ +// Copyright 2019 Amazon.com, Inc. or its affiliates. All Rights Reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#include <rcl/event.h> + +typedef union _qos_event_callback_data { + // Subscription events + rmw_requested_deadline_missed_status_t requested_deadline_missed; + rmw_liveliness_changed_status_t liveliness_changed; + // Publisher events + rmw_offered_deadline_missed_status_t offered_deadline_missed; + rmw_liveliness_lost_status_t liveliness_lost; +} _qos_event_callback_data_t; + +typedef PyObject * (* _qos_event_data_filler_function)(_qos_event_callback_data_t *); + +static +bool +_check_rcl_return(rcl_ret_t ret, const char * error_msg) +{ + if (RCL_RET_OK != ret) { + PyErr_Format(PyExc_RuntimeError, + "%s: %s", error_msg, rcl_get_error_string().str); + rcl_reset_error(); + return false; + } + return true; +} + +static +void +_destroy_event_capsule(PyObject * pycapsule) +{ + rcl_event_t * event = (rcl_event_t *)PyCapsule_GetPointer(pycapsule, "rcl_event_t"); + if (!event) { + PyErr_Clear(); + int stack_level = 1; + PyErr_WarnFormat( + PyExc_RuntimeWarning, stack_level, "_destroy_event_capsule failed to get pointer"); + } + rcl_ret_t ret = rcl_event_fini(event); + if (RCL_RET_OK != ret) { + int stack_level = 1; + PyErr_WarnFormat( + PyExc_RuntimeWarning, stack_level, "Failed to fini event: %s", rcl_get_error_string().str); + } + PyMem_Free(event); +} + +static +bool +_is_pycapsule_rcl_subscription(PyObject * pycapsule) +{ + return PyCapsule_IsValid(pycapsule, "rclpy_subscription_t"); +} + +static +bool +_is_pycapsule_rcl_publisher(PyObject * pycapsule) +{ + return PyCapsule_IsValid(pycapsule, "rclpy_publisher_t"); +} + +static +rcl_event_t * +_pycapsule_to_rcl_event(PyObject * pycapsule) +{ + return (rcl_event_t *)PyCapsule_GetPointer(pycapsule, "rcl_event_t"); +} + +static +rcl_event_t * +_new_zero_initialized_rcl_event() +{ + rcl_event_t * event = (rcl_event_t *)PyMem_Malloc(sizeof(rcl_event_t)); + if (!event) { + PyErr_Format(PyExc_MemoryError, "Failed to allocate memory for event"); + return NULL; + } + *event = rcl_get_zero_initialized_event(); + return event; +} + +/// Create a Python object of the given type from the qos_event module. +/** + * \param[in] class_name The name of the rclpy.qos_event class to construct. + * \param[in] args Tuple-like arguments to the constructor of the type. + * NOTE this function steals a reference to `args` instead of just borrowing it. + */ +static +PyObject * _create_py_qos_event(const char * class_name, PyObject * args) +{ + PyObject * pyqos_event_module = NULL; + PyObject * pyqos_event_class = NULL; + PyObject * pyqos_event = NULL; + + pyqos_event_module = PyImport_ImportModule("rclpy.qos_event"); + if (!pyqos_event_module) { + goto cleanup; + } + + pyqos_event_class = PyObject_GetAttrString(pyqos_event_module, class_name); + if (!pyqos_event_class) { + goto cleanup; + } + + pyqos_event = PyObject_CallObject(pyqos_event_class, args); + +cleanup: + Py_XDECREF(pyqos_event_module); + Py_XDECREF(pyqos_event_class); + Py_XDECREF(args); + + return pyqos_event; +} + +static +PyObject * +_requested_deadline_missed_to_py_object(_qos_event_callback_data_t * data) +{ + rmw_requested_deadline_missed_status_t * actual_data = &data->requested_deadline_missed; + PyObject * args = Py_BuildValue( + "ii", + actual_data->total_count, + actual_data->total_count_change); + if (!args) { + return NULL; + } + return _create_py_qos_event("QoSRequestedDeadlineMissedInfo", args); +} + +static +PyObject * +_liveliness_changed_to_py_object(_qos_event_callback_data_t * data) +{ + rmw_liveliness_changed_status_t * actual_data = &data->liveliness_changed; + PyObject * args = Py_BuildValue( + "iiii", + actual_data->alive_count, + actual_data->not_alive_count, + actual_data->alive_count_change, + actual_data->not_alive_count_change); + if (!args) { + return NULL; + } + return _create_py_qos_event("QoSLivelinessChangedInfo", args); +} + +static +PyObject * +_offered_deadline_missed_to_py_object(_qos_event_callback_data_t * data) +{ + rmw_offered_deadline_missed_status_t * actual_data = &data->offered_deadline_missed; + PyObject * args = Py_BuildValue( + "ii", + actual_data->total_count, + actual_data->total_count_change); + if (!args) { + return NULL; + } + return _create_py_qos_event("QoSOfferedDeadlineMissedInfo", args); +} + +static +PyObject * +_liveliness_lost_to_py_object(_qos_event_callback_data_t * data) +{ + rmw_liveliness_lost_status_t * actual_data = &data->liveliness_lost; + PyObject * args = Py_BuildValue( + "ii", + actual_data->total_count, + actual_data->total_count_change); + if (!args) { + return NULL; + } + return _create_py_qos_event("QoSLivelinessLostInfo", args); +} + +static +_qos_event_data_filler_function +_get_qos_event_data_filler_function_for(PyObject * pyparent, unsigned PY_LONG_LONG event_type) +{ + if (_is_pycapsule_rcl_subscription(pyparent)) { + switch (event_type) { + case RCL_SUBSCRIPTION_REQUESTED_DEADLINE_MISSED: + return &_requested_deadline_missed_to_py_object; + case RCL_SUBSCRIPTION_LIVELINESS_CHANGED: + return &_liveliness_changed_to_py_object; + default: + PyErr_Format(PyExc_ValueError, + "Event type %llu for Subscriptions not understood by rclpy.", event_type); + } + } else if (_is_pycapsule_rcl_publisher(pyparent)) { + switch (event_type) { + case RCL_PUBLISHER_OFFERED_DEADLINE_MISSED: + return &_offered_deadline_missed_to_py_object; + case RCL_PUBLISHER_LIVELINESS_LOST: + return &_liveliness_lost_to_py_object; + default: + PyErr_Format(PyExc_ValueError, + "Event type %llu for Publishers not understood by rclpy.", event_type); + } + } else { + PyErr_Format(PyExc_TypeError, + "Parent handle was not a valid Publisher or Subscription."); + } + return NULL; +} + +/// Create an event object for QoS event handling. +/** + * This function will create an event handle for the given Subscription or Publisher parent. + * + * Raises MemoryError if the event can't be allocated. + * Raises RuntimeError on initialization failure. + * Raises TypeError if the capsules are not the correct types. + * + * \param[in] event_type Enum value of + * rcl_publisher_event_type_t or rcl_subscription_event_type_t, chosen by the type of pyparent. + * \param[in] pyparent Capsule containing the parent Publisher or Subscription. + * \return capsule containing rcl_event_t. + * \return NULL on failure. + */ +static PyObject * +rclpy_create_event(PyObject * Py_UNUSED(self), PyObject * args) +{ + unsigned PY_LONG_LONG event_type; + PyObject * pyparent = NULL; + + rcl_ret_t ret; + rcl_subscription_t * subscription = NULL; + rcl_publisher_t * publisher = NULL; + rcl_event_t * event = NULL; + + PyObject * pyevent = NULL; + + if (!PyArg_ParseTuple(args, "KO", &event_type, &pyparent)) { + return NULL; + } + + if (_is_pycapsule_rcl_subscription(pyparent)) { + rclpy_subscription_t * py_subscription = + (rclpy_subscription_t *)PyCapsule_GetPointer(pyparent, "rclpy_subscription_t"); + subscription = py_subscription ? &py_subscription->subscription : NULL; + } else if (_is_pycapsule_rcl_publisher(pyparent)) { + rclpy_publisher_t * py_publisher = + (rclpy_publisher_t *)PyCapsule_GetPointer(pyparent, "rclpy_publisher_t"); + publisher = py_publisher ? &py_publisher->publisher : NULL; + } else { + PyErr_Format(PyExc_TypeError, "Event parent was not a valid Publisher or Subscription."); + return NULL; + } + + event = _new_zero_initialized_rcl_event(); + if (!event) { + return NULL; + } + + if (subscription) { + ret = rcl_subscription_event_init(event, subscription, event_type); + } else { + ret = rcl_publisher_event_init(event, publisher, event_type); + } + if (!_check_rcl_return(ret, "Failed to initialize event")) { + PyMem_Free(event); + return NULL; + } + + pyevent = PyCapsule_New(event, "rcl_event_t", _destroy_event_capsule); + if (!pyevent) { + ret = rcl_event_fini(event); + PyMem_Free(event); + _check_rcl_return(ret, "Failed to fini 'rcl_event_t'"); + return NULL; + } + + return pyevent; +} + +/// Get a pending QoS event's data. +/** + * After having determined that a middleware event is ready, get the callback payload. + * + * Raises RuntimeError on failure to take the event from the middleware. + * Raises TypeError if the capsules are not the correct types. + * Raises ValueError on unknown event_type argument. + * + * \param[in] pyevent Event handle from rclpy_create_event. + * \param[in] event_type Enum value of + * rcl_publisher_event_type_t or rcl_subscription_event_type_t, chosen by the type of pyparent. + * \param[in] pyparent Capsule containing the parent Publisher or Subscription. + * \return Python object from rclpy.qos_event containing callback data. + * \return NULL on failure. + */ +static PyObject * +rclpy_take_event(PyObject * Py_UNUSED(self), PyObject * args) +{ + // Arguments + PyObject * pyevent = NULL; + PyObject * pyparent = NULL; + unsigned PY_LONG_LONG event_type; + + // Type conversion + rcl_ret_t ret; + rcl_event_t * event = NULL; + _qos_event_callback_data_t event_data; + _qos_event_data_filler_function event_filler = NULL; + + if (!PyArg_ParseTuple(args, "OOK", &pyevent, &pyparent, &event_type)) { + return NULL; + } + + event = _pycapsule_to_rcl_event(pyevent); + if (!event) { + return NULL; + } + + event_filler = _get_qos_event_data_filler_function_for(pyparent, event_type); + if (!event_filler) { + return NULL; + } + + ret = rcl_take_event(event, &event_data); + if (RCL_RET_UNSUPPORTED == ret) { + PyErr_Format(PyExc_NotImplementedError, + "Take event is not implemented in the current RMW implementation: %s", + rcl_get_error_string().str); + rcl_reset_error(); + } else if (RCL_RET_OK != ret) { + PyErr_Format(PyExc_RuntimeError, "Failed to take event: %s", rcl_get_error_string().str); + rcl_reset_error(); + } else { + return event_filler(&event_data); + } + return NULL; +} diff --git a/rclpy/src/rclpy_common/include/rclpy_common/common.h b/rclpy/src/rclpy_common/include/rclpy_common/common.h index 7c80eaa3a..f3a30f3ac 100644 --- a/rclpy/src/rclpy_common/include/rclpy_common/common.h +++ b/rclpy/src/rclpy_common/include/rclpy_common/common.h @@ -17,6 +17,7 @@ #include <Python.h> #include <rcl/graph.h> // rcl_names_and_types_t +#include <rcl/rcl.h> #include <rmw/types.h> #include "rclpy_common/visibility_control.h" @@ -26,6 +27,38 @@ typedef void destroy_ros_message_signature (void *); typedef bool convert_from_py_signature (PyObject *, void *); typedef PyObject * convert_to_py_signature (void *); +typedef struct +{ + // Important: a pointer to a structure is also a pointer to its first member. + // The subscription must be first in the struct to compare sub.handle.pointer to an address + // in a wait set. + rcl_subscription_t subscription; + rcl_node_t * node; +} rclpy_subscription_t; + +typedef struct +{ + rcl_publisher_t publisher; + rcl_node_t * node; +} rclpy_publisher_t; + +typedef struct +{ + // Important: a pointer to a structure is also a pointer to its first member. + // The client must be first in the struct to compare cli.handle.pointer to an address + // in a wait set. + rcl_client_t client; + rcl_node_t * node; +} rclpy_client_t; + +typedef struct +{ + // Important: a pointer to a structure is also a pointer to its first member. + // The service must be first in the struct to compare srv.handle.pointer to an address + // in a wait set. + rcl_service_t service; + rcl_node_t * node; +} rclpy_service_t; /// Finalize names and types struct with error setting. /** diff --git a/rclpy/test/test_qos_event.py b/rclpy/test/test_qos_event.py new file mode 100644 index 000000000..07e10cd93 --- /dev/null +++ b/rclpy/test/test_qos_event.py @@ -0,0 +1,237 @@ +# Copyright 2019 Amazon.com, Inc. or its affiliates. All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import unittest +from unittest.mock import Mock + +import rclpy +from rclpy.handle import Handle +from rclpy.impl.implementation_singleton import rclpy_implementation as _rclpy +from rclpy.qos_event import PublisherEventCallbacks +from rclpy.qos_event import QoSLivelinessChangedInfo +from rclpy.qos_event import QoSLivelinessLostInfo +from rclpy.qos_event import QoSOfferedDeadlineMissedInfo +from rclpy.qos_event import QoSPublisherEventType +from rclpy.qos_event import QoSRequestedDeadlineMissedInfo +from rclpy.qos_event import QoSSubscriptionEventType +from rclpy.qos_event import SubscriptionEventCallbacks + +from test_msgs.msg import Empty as EmptyMsg + + +class TestQoSEvent(unittest.TestCase): + + @classmethod + def setUpClass(cls): + cls.context = rclpy.context.Context() + rclpy.init(context=cls.context) + cls.node = rclpy.create_node('TestQoSEvent', namespace='/rclpy/test', context=cls.context) + + @classmethod + def tearDownClass(cls): + cls.node.destroy_node() + rclpy.shutdown(context=cls.context) + + def test_publisher_constructor(self): + callbacks = PublisherEventCallbacks() + liveliness_callback = Mock() + deadline_callback = Mock() + + # No arg + publisher = self.node.create_publisher(EmptyMsg, 'test_topic') + self.assertEqual(len(publisher.event_handlers), 0) + self.node.destroy_publisher(publisher) + + # Arg with no callbacks + publisher = self.node.create_publisher(EmptyMsg, 'test_topic', event_callbacks=callbacks) + self.assertEqual(len(publisher.event_handlers), 0) + self.node.destroy_publisher(publisher) + + # Arg with one of the callbacks + callbacks.deadline = deadline_callback + publisher = self.node.create_publisher(EmptyMsg, 'test_topic', event_callbacks=callbacks) + self.assertEqual(len(publisher.event_handlers), 1) + self.node.destroy_publisher(publisher) + + # Arg with both callbacks + callbacks.liveliness = liveliness_callback + publisher = self.node.create_publisher(EmptyMsg, 'test_topic', event_callbacks=callbacks) + self.assertEqual(len(publisher.event_handlers), 2) + self.node.destroy_publisher(publisher) + + def test_subscription_constructor(self): + callbacks = SubscriptionEventCallbacks() + liveliness_callback = Mock() + deadline_callback = Mock() + message_callback = Mock() + + # No arg + subscription = self.node.create_subscription(EmptyMsg, 'test_topic', message_callback) + self.assertEqual(len(subscription.event_handlers), 0) + self.node.destroy_subscription(subscription) + + # Arg with no callbacks + subscription = self.node.create_subscription( + EmptyMsg, 'test_topic', message_callback, event_callbacks=callbacks) + self.assertEqual(len(subscription.event_handlers), 0) + self.node.destroy_subscription(subscription) + + # Arg with one of the callbacks + callbacks.deadline = deadline_callback + subscription = self.node.create_subscription( + EmptyMsg, 'test_topic', message_callback, event_callbacks=callbacks) + self.assertEqual(len(subscription.event_handlers), 1) + self.node.destroy_subscription(subscription) + + # Arg with both callbacks + callbacks.liveliness = liveliness_callback + subscription = self.node.create_subscription( + EmptyMsg, 'test_topic', message_callback, event_callbacks=callbacks) + self.assertEqual(len(subscription.event_handlers), 2) + self.node.destroy_subscription(subscription) + + def _create_event_handle(self, parent_entity, event_type): + with parent_entity.handle as parent_capsule: + event_capsule = _rclpy.rclpy_create_event(event_type, parent_capsule) + self.assertIsNotNone(event_capsule) + return Handle(event_capsule) + + def _do_create_destroy(self, parent_entity, event_type): + handle = self._create_event_handle(parent_entity, event_type) + handle.destroy() + + def test_publisher_event_create_destroy(self): + publisher = self.node.create_publisher(EmptyMsg, 'test_topic') + self._do_create_destroy( + publisher, QoSPublisherEventType.RCL_PUBLISHER_OFFERED_DEADLINE_MISSED) + self._do_create_destroy( + publisher, QoSPublisherEventType.RCL_PUBLISHER_LIVELINESS_LOST) + self.node.destroy_publisher(publisher) + + def test_subscription_event_create_destroy(self): + message_callback = Mock() + subscription = self.node.create_subscription(EmptyMsg, 'test_topic', message_callback) + self._do_create_destroy( + subscription, QoSSubscriptionEventType.RCL_SUBSCRIPTION_LIVELINESS_CHANGED) + self._do_create_destroy( + subscription, QoSSubscriptionEventType.RCL_SUBSCRIPTION_REQUESTED_DEADLINE_MISSED) + self.node.destroy_subscription(subscription) + + def test_call_publisher_rclpy_event_apis(self): + # Go through the exposed apis and ensure that things don't explode when called + # Make no assumptions about being able to actually receive the events + publisher = self.node.create_publisher(EmptyMsg, 'test_topic') + wait_set = _rclpy.rclpy_get_zero_initialized_wait_set() + _rclpy.rclpy_wait_set_init(wait_set, 0, 0, 0, 0, 0, 2, self.context.handle) + + deadline_event_handle = self._create_event_handle( + publisher, QoSPublisherEventType.RCL_PUBLISHER_OFFERED_DEADLINE_MISSED) + with deadline_event_handle as capsule: + deadline_event_index = _rclpy.rclpy_wait_set_add_entity('event', wait_set, capsule) + self.assertIsNotNone(deadline_event_index) + + liveliness_event_handle = self._create_event_handle( + publisher, QoSPublisherEventType.RCL_PUBLISHER_LIVELINESS_LOST) + with liveliness_event_handle as capsule: + liveliness_event_index = _rclpy.rclpy_wait_set_add_entity('event', wait_set, capsule) + self.assertIsNotNone(liveliness_event_index) + + # We live in our own namespace and have created no other participants, so + # there can't be any of these events. + _rclpy.rclpy_wait(wait_set, 0) + self.assertFalse(_rclpy.rclpy_wait_set_is_ready('event', wait_set, deadline_event_index)) + self.assertFalse(_rclpy.rclpy_wait_set_is_ready('event', wait_set, liveliness_event_index)) + + # Calling take data even though not ready should provide me an empty initialized message + # Tests data conversion utilities in C side + try: + with deadline_event_handle as event_capsule, publisher.handle as publisher_capsule: + event_data = _rclpy.rclpy_take_event( + event_capsule, + publisher_capsule, + QoSPublisherEventType.RCL_PUBLISHER_OFFERED_DEADLINE_MISSED) + self.assertIsInstance(event_data, QoSOfferedDeadlineMissedInfo) + self.assertEqual(event_data.total_count, 0) + self.assertEqual(event_data.total_count_change, 0) + except NotImplementedError: + pass + + try: + with liveliness_event_handle as event_capsule, publisher.handle as publisher_capsule: + event_data = _rclpy.rclpy_take_event( + event_capsule, + publisher_capsule, + QoSPublisherEventType.RCL_PUBLISHER_LIVELINESS_LOST) + self.assertIsInstance(event_data, QoSLivelinessLostInfo) + self.assertEqual(event_data.total_count, 0) + self.assertEqual(event_data.total_count_change, 0) + except NotImplementedError: + pass + + self.node.destroy_publisher(publisher) + + def test_call_subscription_rclpy_event_apis(self): + # Go through the exposed apis and ensure that things don't explode when called + # Make no assumptions about being able to actually receive the events + subscription = self.node.create_subscription(EmptyMsg, 'test_topic', Mock()) + wait_set = _rclpy.rclpy_get_zero_initialized_wait_set() + _rclpy.rclpy_wait_set_init(wait_set, 0, 0, 0, 0, 0, 2, self.context.handle) + + deadline_event_handle = self._create_event_handle( + subscription, QoSSubscriptionEventType.RCL_SUBSCRIPTION_REQUESTED_DEADLINE_MISSED) + with deadline_event_handle as capsule: + deadline_event_index = _rclpy.rclpy_wait_set_add_entity('event', wait_set, capsule) + self.assertIsNotNone(deadline_event_index) + + liveliness_event_handle = self._create_event_handle( + subscription, QoSSubscriptionEventType.RCL_SUBSCRIPTION_LIVELINESS_CHANGED) + with liveliness_event_handle as capsule: + liveliness_event_index = _rclpy.rclpy_wait_set_add_entity('event', wait_set, capsule) + self.assertIsNotNone(liveliness_event_index) + + # We live in our own namespace and have created no other participants, so + # there can't be any of these events. + _rclpy.rclpy_wait(wait_set, 0) + self.assertFalse(_rclpy.rclpy_wait_set_is_ready('event', wait_set, deadline_event_index)) + self.assertFalse(_rclpy.rclpy_wait_set_is_ready('event', wait_set, liveliness_event_index)) + + # Calling take data even though not ready should provide me an empty initialized message + # Tests data conversion utilities in C side + try: + with deadline_event_handle as event_capsule, subscription.handle as parent_capsule: + event_data = _rclpy.rclpy_take_event( + event_capsule, + parent_capsule, + QoSSubscriptionEventType.RCL_SUBSCRIPTION_REQUESTED_DEADLINE_MISSED) + self.assertIsInstance(event_data, QoSRequestedDeadlineMissedInfo) + self.assertEqual(event_data.total_count, 0) + self.assertEqual(event_data.total_count_change, 0) + except NotImplementedError: + pass + + try: + with liveliness_event_handle as event_capsule, subscription.handle as parent_capsule: + event_data = _rclpy.rclpy_take_event( + event_capsule, + parent_capsule, + QoSSubscriptionEventType.RCL_SUBSCRIPTION_LIVELINESS_CHANGED) + self.assertIsInstance(event_data, QoSLivelinessChangedInfo) + self.assertEqual(event_data.alive_count, 0) + self.assertEqual(event_data.alive_count_change, 0) + self.assertEqual(event_data.not_alive_count, 0) + self.assertEqual(event_data.not_alive_count_change, 0) + except NotImplementedError: + pass + + self.node.destroy_subscription(subscription) diff --git a/rclpy/test/test_waitable.py b/rclpy/test/test_waitable.py index d62ace2b8..73dd09a6a 100644 --- a/rclpy/test/test_waitable.py +++ b/rclpy/test/test_waitable.py @@ -381,21 +381,23 @@ def test_waitable_with_mutually_exclusive_callback_group(self): class TestNumberOfEntities(unittest.TestCase): def test_add(self): - n1 = NumberOfEntities(1, 2, 3, 4, 5) - n2 = NumberOfEntities(10, 20, 30, 40, 50) + n1 = NumberOfEntities(1, 2, 3, 4, 5, 6) + n2 = NumberOfEntities(10, 20, 30, 40, 50, 60) n = n1 + n2 assert n.num_subscriptions == 11 assert n.num_guard_conditions == 22 assert n.num_timers == 33 assert n.num_clients == 44 assert n.num_services == 55 + assert n.num_events == 66 def test_add_assign(self): - n1 = NumberOfEntities(1, 2, 3, 4, 5) - n2 = NumberOfEntities(10, 20, 30, 40, 50) + n1 = NumberOfEntities(1, 2, 3, 4, 5, 6) + n2 = NumberOfEntities(10, 20, 30, 40, 50, 60) n1 += n2 assert n1.num_subscriptions == 11 assert n1.num_guard_conditions == 22 assert n1.num_timers == 33 assert n1.num_clients == 44 assert n1.num_services == 55 + assert n1.num_events == 66