Skip to content

Commit

Permalink
Implement callbacks for liveliness and deadline QoS events (#695)
Browse files Browse the repository at this point in the history
* implement deadline and liveliness qos callbacks

Signed-off-by: Miaofei <miaofei@amazon.com>

* fix windows build

Signed-off-by: Miaofei <miaofei@amazon.com>

* address feedback from pull request

Signed-off-by: Miaofei <miaofei@amazon.com>

* update formatting to be compatible with ros2 coding style and ament_uncrustify

Signed-off-by: Miaofei <miaofei@amazon.com>

* make QOSEventHandlerBase::add_to_wait_set() throw

Signed-off-by: Miaofei <miaofei@amazon.com>

* mark throw_from_rcl_error as [[noreturn]]

Signed-off-by: Miaofei <miaofei@amazon.com>

* fix windows compilation error

Signed-off-by: Miaofei <miaofei@amazon.com>

* Ignore uncrustify for single [[noreturn]] syntax instance

Signed-off-by: Emerson Knapp <eknapp@amazon.com>
  • Loading branch information
mm318 authored and wjwwood committed May 3, 2019
1 parent ecf3511 commit d399fef
Show file tree
Hide file tree
Showing 37 changed files with 446 additions and 28 deletions.
1 change: 1 addition & 0 deletions rclcpp/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,7 @@ set(${PROJECT_NAME}_SRCS
src/rclcpp/parameter_map.cpp
src/rclcpp/parameter_service.cpp
src/rclcpp/publisher_base.cpp
src/rclcpp/qos_event.cpp
src/rclcpp/service.cpp
src/rclcpp/signal_handler.cpp
src/rclcpp/subscription_base.cpp
Expand Down
5 changes: 5 additions & 0 deletions rclcpp/include/rclcpp/callback_group.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
#include <vector>

#include "rclcpp/client.hpp"
#include "rclcpp/publisher.hpp"
#include "rclcpp/service.hpp"
#include "rclcpp/subscription.hpp"
#include "rclcpp/timer.hpp"
Expand Down Expand Up @@ -92,6 +93,10 @@ class CallbackGroup
protected:
RCLCPP_DISABLE_COPY(CallbackGroup)

RCLCPP_PUBLIC
void
add_publisher(const rclcpp::PublisherBase::SharedPtr publisher_ptr);

RCLCPP_PUBLIC
void
add_subscription(const rclcpp::SubscriptionBase::SharedPtr subscription_ptr);
Expand Down
8 changes: 6 additions & 2 deletions rclcpp/include/rclcpp/create_publisher.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,8 @@ create_publisher(
rclcpp::node_interfaces::NodeTopicsInterface * node_topics,
const std::string & topic_name,
const rmw_qos_profile_t & qos_profile,
const PublisherEventCallbacks & event_callbacks,
rclcpp::callback_group::CallbackGroup::SharedPtr group,
bool use_intra_process_comms,
std::shared_ptr<AllocatorT> allocator)
{
Expand All @@ -39,10 +41,12 @@ create_publisher(

auto pub = node_topics->create_publisher(
topic_name,
rclcpp::create_publisher_factory<MessageT, AllocatorT, PublisherT>(allocator),
rclcpp::create_publisher_factory<MessageT, AllocatorT, PublisherT>(event_callbacks, allocator),
publisher_options,
use_intra_process_comms);
node_topics->add_publisher(pub);

node_topics->add_publisher(pub, group);

return std::dynamic_pointer_cast<PublisherT>(pub);
}

Expand Down
6 changes: 5 additions & 1 deletion rclcpp/include/rclcpp/create_subscription.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ create_subscription(
const std::string & topic_name,
CallbackT && callback,
const rmw_qos_profile_t & qos_profile,
const SubscriptionEventCallbacks & event_callbacks,
rclcpp::callback_group::CallbackGroup::SharedPtr group,
bool ignore_local_publications,
bool use_intra_process_comms,
Expand All @@ -52,7 +53,10 @@ create_subscription(

auto factory = rclcpp::create_subscription_factory
<MessageT, CallbackT, AllocatorT, CallbackMessageT, SubscriptionT>(
std::forward<CallbackT>(callback), msg_mem_strat, allocator);
std::forward<CallbackT>(callback),
event_callbacks,
msg_mem_strat,
allocator);

auto sub = node_topics->create_subscription(
topic_name,
Expand Down
4 changes: 3 additions & 1 deletion rclcpp/include/rclcpp/exceptions.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -110,13 +110,15 @@ class InvalidServiceNameError : public NameValidationError
* \throws std::runtime_error if the rcl_get_error_state returns 0
* \throws RCLErrorBase some child class exception based on ret
*/
/* *INDENT-OFF* */ // Uncrustify cannot yet understand [[noreturn]] properly
RCLCPP_PUBLIC
void
throw_from_rcl_error(
throw_from_rcl_error [[noreturn]] (
rcl_ret_t ret,
const std::string & prefix = "",
const rcl_error_state_t * error_state = nullptr,
void (* reset_error)() = rcl_reset_error);
/* *INDENT-ON* */

class RCLErrorBase
{
Expand Down
1 change: 1 addition & 0 deletions rclcpp/include/rclcpp/memory_strategy.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@ class RCLCPP_PUBLIC MemoryStrategy
virtual size_t number_of_ready_subscriptions() const = 0;
virtual size_t number_of_ready_services() const = 0;
virtual size_t number_of_ready_clients() const = 0;
virtual size_t number_of_ready_events() const = 0;
virtual size_t number_of_ready_timers() const = 0;
virtual size_t number_of_guard_conditions() const = 0;
virtual size_t number_of_waitables() const = 0;
Expand Down
13 changes: 13 additions & 0 deletions rclcpp/include/rclcpp/node.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -1126,6 +1126,19 @@ class Node : public std::enable_shared_from_this<Node>
const NodeOptions &
get_node_options() const;

/// Manually assert that this Node is alive (for RMW_QOS_POLICY_LIVELINESS_MANUAL_BY_NODE).
/**
* If the rmw Liveliness policy is set to RMW_QOS_POLICY_LIVELINESS_MANUAL_BY_NODE, the creator
* of this node may manually call `assert_liveliness` at some point in time to signal to the rest
* of the system that this Node is still alive.
*
* \return `true` if the liveliness was asserted successfully, otherwise `false`
*/
RCLCPP_PUBLIC
RCUTILS_WARN_UNUSED
bool
assert_liveliness() const;

protected:
/// Construct a sub-node, which will extend the namespace of all entities created with it.
/**
Expand Down
3 changes: 3 additions & 0 deletions rclcpp/include/rclcpp/node_impl.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -97,6 +97,8 @@ Node::create_publisher(
this->node_topics_.get(),
extend_name_with_sub_namespace(topic_name, this->get_sub_namespace()),
qos_profile,
options.event_callbacks,
options.callback_group,
use_intra_process,
allocator);
}
Expand Down Expand Up @@ -181,6 +183,7 @@ Node::create_subscription(
extend_name_with_sub_namespace(topic_name, this->get_sub_namespace()),
std::forward<CallbackT>(callback),
qos_profile,
options.event_callbacks,
options.callback_group,
options.ignore_local_publications,
use_intra_process,
Expand Down
5 changes: 5 additions & 0 deletions rclcpp/include/rclcpp/node_interfaces/node_base.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,11 @@ class NodeBase : public NodeBaseInterface
std::shared_ptr<const rcl_node_t>
get_shared_rcl_node_handle() const;

RCLCPP_PUBLIC
virtual
bool
assert_liveliness() const;

RCLCPP_PUBLIC
virtual
rclcpp::callback_group::CallbackGroup::SharedPtr
Expand Down
6 changes: 6 additions & 0 deletions rclcpp/include/rclcpp/node_interfaces/node_base_interface.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -102,6 +102,12 @@ class NodeBaseInterface
std::shared_ptr<const rcl_node_t>
get_shared_rcl_node_handle() const = 0;

/// Manually assert that this Node is alive (for RMW_QOS_POLICY_LIVELINESS_MANUAL_BY_NODE).
RCLCPP_PUBLIC
virtual
bool
assert_liveliness() const = 0;

/// Create and return a callback group.
RCLCPP_PUBLIC
virtual
Expand Down
3 changes: 2 additions & 1 deletion rclcpp/include/rclcpp/node_interfaces/node_topics.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,8 @@ class NodeTopics : public NodeTopicsInterface
virtual
void
add_publisher(
rclcpp::PublisherBase::SharedPtr publisher);
rclcpp::PublisherBase::SharedPtr publisher,
rclcpp::callback_group::CallbackGroup::SharedPtr callback_group);

RCLCPP_PUBLIC
virtual
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,8 @@ class NodeTopicsInterface
virtual
void
add_publisher(
rclcpp::PublisherBase::SharedPtr publisher) = 0;
rclcpp::PublisherBase::SharedPtr publisher,
rclcpp::callback_group::CallbackGroup::SharedPtr callback_group) = 0;

RCLCPP_PUBLIC
virtual
Expand Down
1 change: 1 addition & 0 deletions rclcpp/include/rclcpp/parameter_client.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -127,6 +127,7 @@ class AsyncParametersClient
"parameter_events",
std::forward<CallbackT>(callback),
rmw_qos_profile_default,
SubscriptionEventCallbacks(),
nullptr, // group,
false, // ignore_local_publications,
false, // use_intra_process_comms_,
Expand Down
11 changes: 10 additions & 1 deletion rclcpp/include/rclcpp/publisher.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,6 @@
#include "rclcpp/allocator/allocator_deleter.hpp"
#include "rclcpp/intra_process_manager.hpp"
#include "rclcpp/macros.hpp"
#include "rclcpp/node_interfaces/node_base_interface.hpp"
#include "rclcpp/publisher_base.hpp"
#include "rclcpp/type_support_decl.hpp"
#include "rclcpp/visibility_control.hpp"
Expand All @@ -59,6 +58,7 @@ class Publisher : public PublisherBase
rclcpp::node_interfaces::NodeBaseInterface * node_base,
const std::string & topic,
const rcl_publisher_options_t & publisher_options,
const PublisherEventCallbacks & event_callbacks,
const std::shared_ptr<MessageAlloc> & allocator)
: PublisherBase(
node_base,
Expand All @@ -68,6 +68,15 @@ class Publisher : public PublisherBase
message_allocator_(allocator)
{
allocator::set_allocator_for_deleter(&message_deleter_, message_allocator_.get());

if (event_callbacks.deadline_callback) {
this->add_event_handler(event_callbacks.deadline_callback,
RCL_PUBLISHER_OFFERED_DEADLINE_MISSED);
}
if (event_callbacks.liveliness_callback) {
this->add_event_handler(event_callbacks.liveliness_callback,
RCL_PUBLISHER_LIVELINESS_LOST);
}
}

virtual ~Publisher()
Expand Down
37 changes: 37 additions & 0 deletions rclcpp/include/rclcpp/publisher_base.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -23,11 +23,13 @@
#include <memory>
#include <sstream>
#include <string>
#include <vector>

#include "rcl/publisher.h"

#include "rclcpp/macros.hpp"
#include "rclcpp/mapped_ring_buffer.hpp"
#include "rclcpp/qos_event.hpp"
#include "rclcpp/type_support_decl.hpp"
#include "rclcpp/visibility_control.hpp"

Expand Down Expand Up @@ -112,6 +114,12 @@ class PublisherBase
const rcl_publisher_t *
get_publisher_handle() const;

/// Get all the QoS event handlers associated with this publisher.
/** \return The vector of QoS event handlers. */
RCLCPP_PUBLIC
const std::vector<std::shared_ptr<rclcpp::QOSEventHandlerBase>> &
get_event_handlers() const;

/// Get subscription count
/** \return The number of subscriptions. */
RCLCPP_PUBLIC
Expand All @@ -124,6 +132,19 @@ class PublisherBase
size_t
get_intra_process_subscription_count() const;

/// Manually assert that this Publisher is alive (for RMW_QOS_POLICY_LIVELINESS_MANUAL_BY_TOPIC).
/**
* If the rmw Liveliness policy is set to RMW_QOS_POLICY_LIVELINESS_MANUAL_BY_TOPIC, the creator
* of this publisher may manually call `assert_liveliness` at some point in time to signal to the
* rest of the system that this Node is still alive.
*
* \return `true` if the liveliness was asserted successfully, otherwise `false`
*/
RCLCPP_PUBLIC
RCUTILS_WARN_UNUSED
bool
assert_liveliness() const;

/// Get the actual QoS settings, after the defaults have been determined.
/**
* The actual configuration applied when using RMW_QOS_POLICY_*_SYSTEM_DEFAULT
Expand Down Expand Up @@ -175,11 +196,27 @@ class PublisherBase
const rcl_publisher_options_t & intra_process_options);

protected:
template<typename EventCallbackT>
void
add_event_handler(
const EventCallbackT & callback,
const rcl_publisher_event_type_t event_type)
{
auto handler = std::make_shared<QOSEventHandler<EventCallbackT>>(
callback,
rcl_publisher_event_init,
&publisher_handle_,
event_type);
event_handlers_.emplace_back(handler);
}

std::shared_ptr<rcl_node_t> rcl_node_handle_;

rcl_publisher_t publisher_handle_ = rcl_get_zero_initialized_publisher();
rcl_publisher_t intra_process_publisher_handle_ = rcl_get_zero_initialized_publisher();

std::vector<std::shared_ptr<rclcpp::QOSEventHandlerBase>> event_handlers_;

using IntraProcessManagerWeakPtr =
std::weak_ptr<rclcpp::intra_process_manager::IntraProcessManager>;
bool intra_process_is_enabled_;
Expand Down
13 changes: 10 additions & 3 deletions rclcpp/include/rclcpp/publisher_factory.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -57,21 +57,28 @@ struct PublisherFactory
/// Return a PublisherFactory with functions setup for creating a PublisherT<MessageT, Alloc>.
template<typename MessageT, typename Alloc, typename PublisherT>
PublisherFactory
create_publisher_factory(std::shared_ptr<Alloc> allocator)
create_publisher_factory(
const PublisherEventCallbacks & event_callbacks,
std::shared_ptr<Alloc> allocator)
{
PublisherFactory factory;

// factory function that creates a MessageT specific PublisherT
factory.create_typed_publisher =
[allocator](
[event_callbacks, allocator](
rclcpp::node_interfaces::NodeBaseInterface * node_base,
const std::string & topic_name,
rcl_publisher_options_t & publisher_options) -> std::shared_ptr<PublisherT>
{
auto message_alloc = std::make_shared<typename PublisherT::MessageAlloc>(*allocator.get());
publisher_options.allocator = allocator::get_rcl_allocator<MessageT>(*message_alloc.get());

return std::make_shared<PublisherT>(node_base, topic_name, publisher_options, message_alloc);
return std::make_shared<PublisherT>(
node_base,
topic_name,
publisher_options,
event_callbacks,
message_alloc);
};

// return the factory now that it is populated
Expand Down
6 changes: 5 additions & 1 deletion rclcpp/include/rclcpp/publisher_options.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,9 @@
#include <string>
#include <vector>

#include "rclcpp/callback_group.hpp"
#include "rclcpp/intra_process_setting.hpp"
#include "rclcpp/qos_event.hpp"
#include "rclcpp/visibility_control.hpp"

namespace rclcpp
Expand All @@ -29,10 +31,12 @@ namespace rclcpp
template<typename Allocator>
struct PublisherOptionsWithAllocator
{
PublisherEventCallbacks event_callbacks;
/// The quality of service profile to pass on to the rmw implementation.
rmw_qos_profile_t qos_profile = rmw_qos_profile_default;
rclcpp::callback_group::CallbackGroup::SharedPtr callback_group;
/// Optional custom allocator.
std::shared_ptr<Allocator> allocator = nullptr;
std::shared_ptr<Allocator> allocator;
/// Setting to explicitly set intraprocess communications.
IntraProcessSetting use_intra_process_comm = IntraProcessSetting::NodeDefault;
};
Expand Down
Loading

0 comments on commit d399fef

Please sign in to comment.