Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

QoS - API and implementation for Liveliness and Deadline event callbacks #316

Merged
merged 8 commits into from
May 13, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions rclpy/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -199,6 +199,7 @@ if(BUILD_TESTING)
test/test_parameter.py
test/test_parameters_callback.py
test/test_qos.py
test/test_qos_event.py
test/test_task.py
test/test_time_source.py
test/test_time.py
Expand Down
36 changes: 29 additions & 7 deletions rclpy/rclpy/node.py
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,8 @@
from rclpy.qos import qos_profile_parameter_events
from rclpy.qos import qos_profile_services_default
from rclpy.qos import QoSProfile
from rclpy.qos_event import PublisherEventCallbacks
from rclpy.qos_event import SubscriptionEventCallbacks
from rclpy.service import Service
from rclpy.subscription import Subscription
from rclpy.time_source import TimeSource
Expand Down Expand Up @@ -696,16 +698,23 @@ def create_publisher(
msg_type,
topic: str,
*,
qos_profile: QoSProfile = qos_profile_default
qos_profile: QoSProfile = qos_profile_default,
callback_group: Optional[CallbackGroup] = None,
event_callbacks: Optional[PublisherEventCallbacks] = None,
) -> Publisher:
"""
Create a new publisher.

:param msg_type: The type of ROS messages the publisher will publish.
:param topic: The name of the topic the publisher will publish to.
:param qos_profile: The quality of service profile to apply to the publisher.
:param callback_group: The callback group for the publisher's event handlers.
If ``None``, then the node's default callback group is used.
:param event_callbacks: User-defined callbacks for middleware events.
:return: The new publisher.
"""
callback_group = callback_group or self.default_callback_group

# this line imports the typesupport for the message module if not already done
check_for_type_support(msg_type)
failed = False
Expand All @@ -721,9 +730,16 @@ def create_publisher(
publisher_handle = Handle(publisher_capsule)
publisher_handle.requires(self.handle)

publisher = Publisher(publisher_handle, msg_type, topic, qos_profile)
publisher = Publisher(
publisher_handle, msg_type, topic, qos_profile,
event_callbacks=event_callbacks or PublisherEventCallbacks(),
callback_group=callback_group)
self.__publishers.append(publisher)
self._wake_executor()

for event_callback in publisher.event_handlers:
self.add_waitable(event_callback)

return publisher

def create_subscription(
Expand All @@ -733,7 +749,8 @@ def create_subscription(
callback: Callable[[MsgType], None],
*,
qos_profile: QoSProfile = qos_profile_default,
callback_group: CallbackGroup = None,
callback_group: Optional[CallbackGroup] = None,
event_callbacks: Optional[SubscriptionEventCallbacks] = None,
raw: bool = False
) -> Subscription:
"""
Expand All @@ -746,11 +763,12 @@ def create_subscription(
:param qos_profile: The quality of service profile to apply to the subscription.
:param callback_group: The callback group for the subscription. If ``None``, then the
nodes default callback group is used.
:param event_callbacks: User-defined callbacks for middleware events.
:param raw: If ``True``, then received messages will be stored in raw binary
representation.
"""
if callback_group is None:
callback_group = self.default_callback_group
callback_group = callback_group or self.default_callback_group

# this line imports the typesupport for the message module if not already done
check_for_type_support(msg_type)
failed = False
Expand All @@ -768,10 +786,14 @@ def create_subscription(

subscription = Subscription(
subscription_handle, msg_type,
topic, callback, callback_group, qos_profile, raw)
topic, callback, callback_group, qos_profile, raw,
event_callbacks=event_callbacks or SubscriptionEventCallbacks())
self.__subscriptions.append(subscription)
callback_group.add_entity(subscription)
self._wake_executor()

for event_handler in subscription.event_handlers:
self.add_waitable(event_handler)

return subscription

def create_client(
Expand Down
10 changes: 9 additions & 1 deletion rclpy/rclpy/publisher.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,8 +14,11 @@

from typing import TypeVar

from rclpy.callback_groups import CallbackGroup
from rclpy.handle import Handle
from rclpy.impl.implementation_singleton import rclpy_implementation as _rclpy
from rclpy.qos import QoSProfile
from rclpy.qos_event import PublisherEventCallbacks

MsgType = TypeVar('MsgType')

Expand All @@ -24,10 +27,12 @@ class Publisher:

def __init__(
self,
publisher_handle,
publisher_handle: Handle,
msg_type: MsgType,
topic: str,
qos_profile: QoSProfile,
event_callbacks: PublisherEventCallbacks,
callback_group: CallbackGroup,
) -> None:
"""
Create a container for a ROS publisher.
Expand All @@ -48,6 +53,9 @@ def __init__(
self.topic = topic
self.qos_profile = qos_profile

self.event_handlers = event_callbacks.create_event_handlers(
callback_group, publisher_handle)

def publish(self, msg: MsgType) -> None:
"""
Send a message to the topic for the publisher.
Expand Down
231 changes: 231 additions & 0 deletions rclpy/rclpy/qos_event.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,231 @@
# Copyright 2019 Amazon.com, Inc. or its affiliates. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

from enum import IntEnum
from typing import Callable
from typing import List
from typing import NamedTuple
from typing import Optional

import rclpy
from rclpy.callback_groups import CallbackGroup
from rclpy.handle import Handle
from rclpy.impl.implementation_singleton import rclpy_implementation as _rclpy
from rclpy.waitable import NumberOfEntities
from rclpy.waitable import Waitable


class QoSPublisherEventType(IntEnum):
"""
Enum for types of QoS events that a Publisher can receive.

This enum matches the one defined in rcl/event.h
"""

RCL_PUBLISHER_OFFERED_DEADLINE_MISSED = 0
RCL_PUBLISHER_LIVELINESS_LOST = 1


class QoSSubscriptionEventType(IntEnum):
"""
Enum for types of QoS events that a Subscription can receive.

This enum matches the one defined in rcl/event.h
"""

RCL_SUBSCRIPTION_REQUESTED_DEADLINE_MISSED = 0
RCL_SUBSCRIPTION_LIVELINESS_CHANGED = 1


"""
Payload type for Subscription Deadline callback.

Mirrors rmw_requested_deadline_missed_status_t from rmw/types.h
"""
QoSRequestedDeadlineMissedInfo = NamedTuple(
'QoSRequestedDeadlineMissedInfo', [
('total_count', 'int'),
('total_count_change', 'int'),
])

"""
Payload type for Subscription Liveliness callback.

Mirrors rmw_liveliness_changed_status_t from rmw/types.h
"""
QoSLivelinessChangedInfo = NamedTuple(
'QoSLivelinessChangedInfo', [
('alive_count', 'int'),
('not_alive_count', 'int'),
('alive_count_change', 'int'),
('not_alive_count_change', 'int'),
])

"""
Payload type for Publisher Deadline callback.

Mirrors rmw_offered_deadline_missed_status_t from rmw/types.h
"""
QoSOfferedDeadlineMissedInfo = NamedTuple(
'QoSOfferedDeadlineMissedInfo', [
('total_count', 'int'),
('total_count_change', 'int'),
])

"""
Payload type for Publisher Liveliness callback.

Mirrors rmw_liveliness_lost_status_t from rmw/types.h
"""
QoSLivelinessLostInfo = NamedTuple(
'QoSLivelinessLostInfo', [
('total_count', 'int'),
('total_count_change', 'int'),
])


class QoSEventHandler(Waitable):
"""Waitable type to handle QoS events."""

def __init__(
self,
*,
callback_group: CallbackGroup,
callback: Callable,
event_type: IntEnum,
parent_handle: Handle,
):
# Waitable init adds self to callback_group
super().__init__(callback_group)
self.event_type = event_type
self.callback = callback

self._parent_handle = parent_handle
with parent_handle as parent_capsule:
event_capsule = _rclpy.rclpy_create_event(event_type, parent_capsule)
self._event_handle = Handle(event_capsule)
self._event_handle.requires(self._parent_handle)
self._ready_to_take_data = False
self._event_index = None

# Start Waitable API
def is_ready(self, wait_set):
"""Return True if entities are ready in the wait set."""
if self._event_index is None:
return False
if _rclpy.rclpy_wait_set_is_ready('event', wait_set, self._event_index):
self._ready_to_take_data = True
return self._ready_to_take_data

def take_data(self):
"""Take stuff from lower level so the wait set doesn't immediately wake again."""
if self._ready_to_take_data:
self._ready_to_take_data = False
with self._parent_handle as parent_capsule, self._event_handle as event_capsule:
return _rclpy.rclpy_take_event(event_capsule, parent_capsule, self.event_type)
return None

async def execute(self, taken_data):
"""Execute work after data has been taken from a ready wait set."""
if not taken_data:
return
await rclpy.executors.await_or_execute(self.callback, [taken_data])

def get_num_entities(self):
"""Return number of each type of entity used."""
return NumberOfEntities(num_events=1)
emersonknapp marked this conversation as resolved.
Show resolved Hide resolved

def add_to_wait_set(self, wait_set):
"""Add entites to wait set."""
with self._event_handle as event_capsule:
self._event_index = _rclpy.rclpy_wait_set_add_entity('event', wait_set, event_capsule)
# End Waitable API


class SubscriptionEventCallbacks:
"""Container to provide middleware event callbacks for a Subscription."""

def __init__(
self,
*,
deadline: Optional[Callable[[QoSRequestedDeadlineMissedInfo], None]] = None,
liveliness: Optional[Callable[[QoSLivelinessChangedInfo], None]] = None,
) -> None:
"""
Constructor.

:param deadline: A user-defined callback that is called when a topic misses our
requested Deadline.
:param liveliness: A user-defined callback that is called when the Liveliness of
a Publisher on subscribed topic changes.
"""
self.deadline = deadline
self.liveliness = liveliness

def create_event_handlers(
self, callback_group: CallbackGroup, subscription_handle: Handle,
) -> List[QoSEventHandler]:
event_handlers = []
if self.deadline:
event_handlers.append(QoSEventHandler(
callback_group=callback_group,
callback=self.deadline,
event_type=QoSSubscriptionEventType.RCL_SUBSCRIPTION_REQUESTED_DEADLINE_MISSED,
parent_handle=subscription_handle))
if self.liveliness:
event_handlers.append(QoSEventHandler(
callback_group=callback_group,
callback=self.liveliness,
event_type=QoSSubscriptionEventType.RCL_SUBSCRIPTION_LIVELINESS_CHANGED,
parent_handle=subscription_handle))
return event_handlers


class PublisherEventCallbacks:
"""Container to provide middleware event callbacks for a Publisher."""

def __init__(
self,
*,
deadline: Optional[Callable[[QoSOfferedDeadlineMissedInfo], None]] = None,
liveliness: Optional[Callable[[QoSLivelinessLostInfo], None]] = None
) -> None:
"""
Constructor.

:param deadline: A user-defined callback that is called when the Publisher misses
its offered Deadline.
:param liveliness: A user-defined callback that is called when this Publisher
fails to signal its Liveliness and is reported as not-alive.
"""
self.deadline = deadline
self.liveliness = liveliness

def create_event_handlers(
self, callback_group: CallbackGroup, publisher_handle: Handle,
) -> List[QoSEventHandler]:
event_handlers = []
if self.deadline:
event_handlers.append(QoSEventHandler(
callback_group=callback_group,
callback=self.deadline,
event_type=QoSPublisherEventType.RCL_PUBLISHER_OFFERED_DEADLINE_MISSED,
parent_handle=publisher_handle))
if self.liveliness:
event_handlers.append(QoSEventHandler(
callback_group=callback_group,
callback=self.liveliness,
event_type=QoSPublisherEventType.RCL_PUBLISHER_LIVELINESS_LOST,
parent_handle=publisher_handle))
return event_handlers
Loading