diff --git a/CONTRIBUTING.md b/CONTRIBUTING.md index 6f63de9e5d..cfba094dac 100644 --- a/CONTRIBUTING.md +++ b/CONTRIBUTING.md @@ -11,3 +11,8 @@ be under the Apache 2 License, as dictated by that the terms of any separate license agreement you may have executed with Licensor regarding such Contributions. ~~~ + +Contributors must sign-off each commit by adding a `Signed-off-by: ...` +line to commit messages to certify that they have the right to submit +the code they are contributing to the project according to the +[Developer Certificate of Origin (DCO)](https://developercertificate.org/). diff --git a/rclcpp/CMakeLists.txt b/rclcpp/CMakeLists.txt index df866a99c6..6d6bc01b97 100644 --- a/rclcpp/CMakeLists.txt +++ b/rclcpp/CMakeLists.txt @@ -65,6 +65,7 @@ set(${PROJECT_NAME}_SRCS src/rclcpp/parameter_map.cpp src/rclcpp/parameter_service.cpp src/rclcpp/publisher.cpp + src/rclcpp/qos_event.cpp src/rclcpp/service.cpp src/rclcpp/signal_handler.cpp src/rclcpp/subscription.cpp diff --git a/rclcpp/include/rclcpp/callback_group.hpp b/rclcpp/include/rclcpp/callback_group.hpp index c53bffbd7d..0a5c49cbb7 100644 --- a/rclcpp/include/rclcpp/callback_group.hpp +++ b/rclcpp/include/rclcpp/callback_group.hpp @@ -22,6 +22,7 @@ #include "rclcpp/client.hpp" #include "rclcpp/service.hpp" +#include "rclcpp/publisher.hpp" #include "rclcpp/subscription.hpp" #include "rclcpp/timer.hpp" #include "rclcpp/visibility_control.hpp" @@ -61,6 +62,10 @@ class CallbackGroup RCLCPP_PUBLIC explicit CallbackGroup(CallbackGroupType group_type); + RCLCPP_PUBLIC + const std::vector & + get_publisher_ptrs() const; + RCLCPP_PUBLIC const std::vector & get_subscription_ptrs() const; @@ -92,6 +97,10 @@ class CallbackGroup protected: RCLCPP_DISABLE_COPY(CallbackGroup) + RCLCPP_PUBLIC + void + add_publisher(const rclcpp::PublisherBase::SharedPtr publisher_ptr); + RCLCPP_PUBLIC void add_subscription(const rclcpp::SubscriptionBase::SharedPtr subscription_ptr); @@ -119,6 +128,7 @@ class CallbackGroup CallbackGroupType type_; // Mutex to protect the subsequent vectors of pointers. mutable std::mutex mutex_; + std::vector publisher_ptrs_; std::vector subscription_ptrs_; std::vector timer_ptrs_; std::vector service_ptrs_; diff --git a/rclcpp/include/rclcpp/create_publisher.hpp b/rclcpp/include/rclcpp/create_publisher.hpp index f136f33653..e21b00687b 100644 --- a/rclcpp/include/rclcpp/create_publisher.hpp +++ b/rclcpp/include/rclcpp/create_publisher.hpp @@ -31,6 +31,8 @@ create_publisher( rclcpp::node_interfaces::NodeTopicsInterface * node_topics, const std::string & topic_name, const rmw_qos_profile_t & qos_profile, + const PublisherEventCallbacks & event_callbacks, + rclcpp::callback_group::CallbackGroup::SharedPtr group, bool use_intra_process_comms, std::shared_ptr allocator) { @@ -39,10 +41,12 @@ create_publisher( auto pub = node_topics->create_publisher( topic_name, - rclcpp::create_publisher_factory(allocator), + rclcpp::create_publisher_factory(event_callbacks, allocator), publisher_options, use_intra_process_comms); - node_topics->add_publisher(pub); + + node_topics->add_publisher(pub, group); + return std::dynamic_pointer_cast(pub); } diff --git a/rclcpp/include/rclcpp/create_subscription.hpp b/rclcpp/include/rclcpp/create_subscription.hpp index a4ea581092..aa1f547039 100644 --- a/rclcpp/include/rclcpp/create_subscription.hpp +++ b/rclcpp/include/rclcpp/create_subscription.hpp @@ -38,6 +38,7 @@ create_subscription( const std::string & topic_name, CallbackT && callback, const rmw_qos_profile_t & qos_profile, + const SubscriptionEventCallbacks & event_callbacks, rclcpp::callback_group::CallbackGroup::SharedPtr group, bool ignore_local_publications, bool use_intra_process_comms, @@ -52,7 +53,10 @@ create_subscription( auto factory = rclcpp::create_subscription_factory ( - std::forward(callback), msg_mem_strat, allocator); + std::forward(callback), + event_callbacks, + msg_mem_strat, + allocator); auto sub = node_topics->create_subscription( topic_name, diff --git a/rclcpp/include/rclcpp/executors/multi_threaded_executor.hpp b/rclcpp/include/rclcpp/executors/multi_threaded_executor.hpp index eb41a93e11..72df5dd82a 100644 --- a/rclcpp/include/rclcpp/executors/multi_threaded_executor.hpp +++ b/rclcpp/include/rclcpp/executors/multi_threaded_executor.hpp @@ -78,7 +78,6 @@ class MultiThreadedExecutor : public executor::Executor size_t number_of_threads_; bool yield_before_execute_; - std::mutex scheduled_timers_mutex_; std::set scheduled_timers_; }; diff --git a/rclcpp/include/rclcpp/memory_strategy.hpp b/rclcpp/include/rclcpp/memory_strategy.hpp index 416890ed6c..70db12237a 100644 --- a/rclcpp/include/rclcpp/memory_strategy.hpp +++ b/rclcpp/include/rclcpp/memory_strategy.hpp @@ -51,6 +51,7 @@ class RCLCPP_PUBLIC MemoryStrategy virtual size_t number_of_ready_subscriptions() const = 0; virtual size_t number_of_ready_services() const = 0; virtual size_t number_of_ready_clients() const = 0; + virtual size_t number_of_ready_events() const = 0; virtual size_t number_of_ready_timers() const = 0; virtual size_t number_of_guard_conditions() const = 0; virtual size_t number_of_waitables() const = 0; diff --git a/rclcpp/include/rclcpp/node.hpp b/rclcpp/include/rclcpp/node.hpp index a3fe7eb472..3f0cde52e5 100644 --- a/rclcpp/include/rclcpp/node.hpp +++ b/rclcpp/include/rclcpp/node.hpp @@ -142,76 +142,27 @@ class Node : public std::enable_shared_from_this /// Create and return a Publisher. /** * \param[in] topic_name The topic for this publisher to publish on. - * \param[in] qos_history_depth The depth of the publisher message queue. - * \param[in] allocator Optional custom allocator. + * \param[in] group The callback group for this publisher. NULL for no callback group. + * \param[in] options Additional options to control creation of the publisher. * \return Shared pointer to the created publisher. */ template< - typename MessageT, typename Alloc = std::allocator, - typename PublisherT = ::rclcpp::Publisher> - std::shared_ptr - create_publisher( - const std::string & topic_name, size_t qos_history_depth, - std::shared_ptr allocator = nullptr); - - /// Create and return a Publisher. - /** - * \param[in] topic_name The topic for this publisher to publish on. - * \param[in] qos_profile The quality of service profile to pass on to the rmw implementation. - * \param[in] allocator Optional custom allocator. - * \return Shared pointer to the created publisher. - */ - template< - typename MessageT, typename Alloc = std::allocator, + typename MessageT, + typename Alloc = std::allocator, typename PublisherT = ::rclcpp::Publisher> std::shared_ptr create_publisher( const std::string & topic_name, - const rmw_qos_profile_t & qos_profile = rmw_qos_profile_default, - std::shared_ptr allocator = nullptr); - - /// Create and return a Subscription. - /** - * \param[in] topic_name The topic to subscribe on. - * \param[in] callback The user-defined callback function. - * \param[in] qos_profile The quality of service profile to pass on to the rmw implementation. - * \param[in] group The callback group for this subscription. NULL for no callback group. - * \param[in] ignore_local_publications True to ignore local publications. - * \param[in] msg_mem_strat The message memory strategy to use for allocating messages. - * \param[in] allocator Optional custom allocator. - * \return Shared pointer to the created subscription. - */ - /* TODO(jacquelinekay): - Windows build breaks when static member function passed as default - argument to msg_mem_strat, nullptr is a workaround. - */ - template< - typename MessageT, - typename CallbackT, - typename Alloc = std::allocator, - typename SubscriptionT = rclcpp::Subscription< - typename rclcpp::subscription_traits::has_message_type::type, Alloc>> - std::shared_ptr - create_subscription( - const std::string & topic_name, - CallbackT && callback, - const rmw_qos_profile_t & qos_profile = rmw_qos_profile_default, rclcpp::callback_group::CallbackGroup::SharedPtr group = nullptr, - bool ignore_local_publications = false, - typename rclcpp::message_memory_strategy::MessageMemoryStrategy< - typename rclcpp::subscription_traits::has_message_type::type, Alloc>::SharedPtr - msg_mem_strat = nullptr, - std::shared_ptr allocator = nullptr); + const PublisherOptions & options = PublisherOptions()); /// Create and return a Subscription. /** * \param[in] topic_name The topic to subscribe on. - * \param[in] qos_history_depth The depth of the subscription's incoming message queue. * \param[in] callback The user-defined callback function. * \param[in] group The callback group for this subscription. NULL for no callback group. - * \param[in] ignore_local_publications True to ignore local publications. + * \param[in] options Additional options to control creation of the subscription. * \param[in] msg_mem_strat The message memory strategy to use for allocating messages. - * \param[in] allocator Optional custom allocator. * \return Shared pointer to the created subscription. */ /* TODO(jacquelinekay): @@ -228,13 +179,11 @@ class Node : public std::enable_shared_from_this create_subscription( const std::string & topic_name, CallbackT && callback, - size_t qos_history_depth, rclcpp::callback_group::CallbackGroup::SharedPtr group = nullptr, - bool ignore_local_publications = false, + const SubscriptionOptions & options = SubscriptionOptions(), typename rclcpp::message_memory_strategy::MessageMemoryStrategy< typename rclcpp::subscription_traits::has_message_type::type, Alloc>::SharedPtr - msg_mem_strat = nullptr, - std::shared_ptr allocator = nullptr); + msg_mem_strat = nullptr); /// Create a timer. /** @@ -600,6 +549,18 @@ class Node : public std::enable_shared_from_this const NodeOptions & get_node_options() const; + + /// Manually assert that this Node is alive (for RMW_QOS_POLICY_MANUAL_BY_NODE) + /** + * If the rmw Liveliness policy is set to RMW_QOS_POLICY_MANUAL_BY_NODE, the creator of this + * node must manually call `assert_liveliness` on a regular basis to signal to the rest of the + * system that this Node is still alive. + * This function must be called at least as often as the qos_profile's liveliness_lease_duration + */ + RCLCPP_PUBLIC + void + assert_liveliness() {} + protected: /// Construct a sub-node, which will extend the namespace of all entities created with it. /** diff --git a/rclcpp/include/rclcpp/node_impl.hpp b/rclcpp/include/rclcpp/node_impl.hpp index a2ef35b9bb..7fb4e177e1 100644 --- a/rclcpp/include/rclcpp/node_impl.hpp +++ b/rclcpp/include/rclcpp/node_impl.hpp @@ -51,20 +51,6 @@ namespace rclcpp { -template -std::shared_ptr -Node::create_publisher( - const std::string & topic_name, size_t qos_history_depth, - std::shared_ptr allocator) -{ - if (!allocator) { - allocator = std::make_shared(); - } - rmw_qos_profile_t qos = rmw_qos_profile_default; - qos.depth = qos_history_depth; - return this->create_publisher(topic_name, qos, allocator); -} - RCLCPP_LOCAL inline std::string @@ -80,9 +66,11 @@ extend_name_with_sub_namespace(const std::string & name, const std::string & sub template std::shared_ptr Node::create_publisher( - const std::string & topic_name, const rmw_qos_profile_t & qos_profile, - std::shared_ptr allocator) + const std::string & topic_name, + rclcpp::callback_group::CallbackGroup::SharedPtr group, + const PublisherOptions & options) { + std::shared_ptr allocator = options.allocator(); if (!allocator) { allocator = std::make_shared(); } @@ -90,7 +78,9 @@ Node::create_publisher( return rclcpp::create_publisher( this->node_topics_.get(), extend_name_with_sub_namespace(topic_name, this->get_sub_namespace()), - qos_profile, + options.qos_profile(), + options.event_callbacks(), + group, this->get_node_options().use_intra_process_comms(), allocator); } @@ -104,16 +94,15 @@ std::shared_ptr Node::create_subscription( const std::string & topic_name, CallbackT && callback, - const rmw_qos_profile_t & qos_profile, rclcpp::callback_group::CallbackGroup::SharedPtr group, - bool ignore_local_publications, + const SubscriptionOptions & options, typename rclcpp::message_memory_strategy::MessageMemoryStrategy< typename rclcpp::subscription_traits::has_message_type::type, Alloc>::SharedPtr - msg_mem_strat, - std::shared_ptr allocator) + msg_mem_strat) { using CallbackMessageT = typename rclcpp::subscription_traits::has_message_type::type; + std::shared_ptr allocator = options.allocator(); if (!allocator) { allocator = std::make_shared(); } @@ -127,44 +116,15 @@ Node::create_subscription( this->node_topics_.get(), extend_name_with_sub_namespace(topic_name, this->get_sub_namespace()), std::forward(callback), - qos_profile, + options.qos_profile(), + options.event_callbacks(), group, - ignore_local_publications, + options.ignore_local_publications(), this->get_node_options().use_intra_process_comms(), msg_mem_strat, allocator); } -template< - typename MessageT, - typename CallbackT, - typename Alloc, - typename SubscriptionT> -std::shared_ptr -Node::create_subscription( - const std::string & topic_name, - CallbackT && callback, - size_t qos_history_depth, - rclcpp::callback_group::CallbackGroup::SharedPtr group, - bool ignore_local_publications, - typename rclcpp::message_memory_strategy::MessageMemoryStrategy< - typename rclcpp::subscription_traits::has_message_type::type, Alloc>::SharedPtr - msg_mem_strat, - std::shared_ptr allocator) -{ - rmw_qos_profile_t qos = rmw_qos_profile_default; - qos.depth = qos_history_depth; - - return this->create_subscription( - topic_name, - std::forward(callback), - qos, - group, - ignore_local_publications, - msg_mem_strat, - allocator); -} - template typename rclcpp::WallTimer::SharedPtr Node::create_wall_timer( diff --git a/rclcpp/include/rclcpp/node_interfaces/node_topics.hpp b/rclcpp/include/rclcpp/node_interfaces/node_topics.hpp index 31cf62e54c..40beb3aacb 100644 --- a/rclcpp/include/rclcpp/node_interfaces/node_topics.hpp +++ b/rclcpp/include/rclcpp/node_interfaces/node_topics.hpp @@ -58,7 +58,8 @@ class NodeTopics : public NodeTopicsInterface virtual void add_publisher( - rclcpp::PublisherBase::SharedPtr publisher); + rclcpp::PublisherBase::SharedPtr publisher, + rclcpp::callback_group::CallbackGroup::SharedPtr callback_group); RCLCPP_PUBLIC virtual diff --git a/rclcpp/include/rclcpp/node_interfaces/node_topics_interface.hpp b/rclcpp/include/rclcpp/node_interfaces/node_topics_interface.hpp index b3d46ff334..d888e3ccd2 100644 --- a/rclcpp/include/rclcpp/node_interfaces/node_topics_interface.hpp +++ b/rclcpp/include/rclcpp/node_interfaces/node_topics_interface.hpp @@ -57,7 +57,8 @@ class NodeTopicsInterface virtual void add_publisher( - rclcpp::PublisherBase::SharedPtr publisher) = 0; + rclcpp::PublisherBase::SharedPtr publisher, + rclcpp::callback_group::CallbackGroup::SharedPtr callback_group) = 0; RCLCPP_PUBLIC virtual diff --git a/rclcpp/include/rclcpp/parameter_client.hpp b/rclcpp/include/rclcpp/parameter_client.hpp index 879f261836..6b68a793fa 100644 --- a/rclcpp/include/rclcpp/parameter_client.hpp +++ b/rclcpp/include/rclcpp/parameter_client.hpp @@ -127,6 +127,7 @@ class AsyncParametersClient "parameter_events", std::forward(callback), rmw_qos_profile_default, + SubscriptionEventCallbacks(), nullptr, // group, false, // ignore_local_publications, false, // use_intra_process_comms_, diff --git a/rclcpp/include/rclcpp/publisher.hpp b/rclcpp/include/rclcpp/publisher.hpp index 4500fedab5..b02a3b7a03 100644 --- a/rclcpp/include/rclcpp/publisher.hpp +++ b/rclcpp/include/rclcpp/publisher.hpp @@ -21,19 +21,23 @@ #include #include #include +#include #include #include #include "rcl/error_handling.h" #include "rcl/publisher.h" +#include "rosidl_typesupport_cpp/message_type_support.hpp" #include "rcl_interfaces/msg/intra_process_message.hpp" +#include "rclcpp/publisher_options.hpp" +#include "rclcpp/waitable.hpp" +#include "rclcpp/qos_event.hpp" +#include "rclcpp/exceptions.hpp" #include "rclcpp/allocator/allocator_common.hpp" #include "rclcpp/allocator/allocator_deleter.hpp" #include "rclcpp/macros.hpp" -#include "rclcpp/node_interfaces/node_base_interface.hpp" -#include "rclcpp/type_support_decl.hpp" #include "rclcpp/visibility_control.hpp" namespace rclcpp @@ -42,6 +46,7 @@ namespace rclcpp // Forward declaration is used for friend statement. namespace node_interfaces { +class NodeBaseInterface; class NodeTopicsInterface; } @@ -116,6 +121,10 @@ class PublisherBase const rcl_publisher_t * get_publisher_handle() const; + RCLCPP_PUBLIC + const std::vector> & + get_event_handlers() const; + /// Get subscription count /** \return The number of subscriptions. */ RCLCPP_PUBLIC @@ -128,6 +137,17 @@ class PublisherBase size_t get_intra_process_subscription_count() const; + /// Manually assert that this Publisher is alive (for RMW_QOS_POLICY_MANUAL_BY_TOPIC) + /** + * If the rmw Liveliness policy is set to RMW_QOS_POLICY_MANUAL_BY_TOPIC, the creator of this + * Publisher must manually call `assert_liveliness` on a regular basis to signal to the rest of + * the system that this Node is still alive. + * This function must be called at least as often as the qos_profile's liveliness_lease_duration + */ + RCLCPP_PUBLIC + void + assert_liveliness() {} + /// Compare this publisher to a gid. /** * Note that this function calls the next function. @@ -162,10 +182,25 @@ class PublisherBase const rcl_publisher_options_t & intra_process_options); protected: + template + void + add_event_handler( + const EventCallbackT & callback, + const rcl_publisher_event_type_t event_type) + { + event_handlers_.emplace_back(std::make_shared>( + callback, + rcl_publisher_event_init, + &publisher_handle_, + event_type)); + } + std::shared_ptr rcl_node_handle_; rcl_publisher_t publisher_handle_ = rcl_get_zero_initialized_publisher(); rcl_publisher_t intra_process_publisher_handle_ = rcl_get_zero_initialized_publisher(); + + std::vector> event_handlers_; using IntraProcessManagerWeakPtr = std::weak_ptr; @@ -194,6 +229,7 @@ class Publisher : public PublisherBase rclcpp::node_interfaces::NodeBaseInterface * node_base, const std::string & topic, const rcl_publisher_options_t & publisher_options, + const PublisherEventCallbacks & event_callbacks, const std::shared_ptr & allocator) : PublisherBase( node_base, @@ -203,6 +239,15 @@ class Publisher : public PublisherBase message_allocator_(allocator) { allocator::set_allocator_for_deleter(&message_deleter_, message_allocator_.get()); + + if (event_callbacks.deadline_callback_) { + this->add_event_handler(event_callbacks.deadline_callback_, + RCL_PUBLISHER_OFFERED_DEADLINE_MISSED); + } + if (event_callbacks.liveliness_callback_) { + this->add_event_handler(event_callbacks.liveliness_callback_, + RCL_PUBLISHER_LIVELINESS_LOST); + } } virtual ~Publisher() diff --git a/rclcpp/include/rclcpp/publisher_factory.hpp b/rclcpp/include/rclcpp/publisher_factory.hpp index 4044d1a6cc..98ab04dc6a 100644 --- a/rclcpp/include/rclcpp/publisher_factory.hpp +++ b/rclcpp/include/rclcpp/publisher_factory.hpp @@ -75,13 +75,15 @@ struct PublisherFactory /// Return a PublisherFactory with functions setup for creating a PublisherT. template PublisherFactory -create_publisher_factory(std::shared_ptr allocator) +create_publisher_factory( + const PublisherEventCallbacks & event_callbacks, + std::shared_ptr allocator) { PublisherFactory factory; // factory function that creates a MessageT specific PublisherT factory.create_typed_publisher = - [allocator]( + [event_callbacks, allocator]( rclcpp::node_interfaces::NodeBaseInterface * node_base, const std::string & topic_name, rcl_publisher_options_t & publisher_options) -> std::shared_ptr @@ -89,7 +91,8 @@ create_publisher_factory(std::shared_ptr allocator) auto message_alloc = std::make_shared(*allocator.get()); publisher_options.allocator = allocator::get_rcl_allocator(*message_alloc.get()); - return std::make_shared(node_base, topic_name, publisher_options, message_alloc); + return std::make_shared(node_base, topic_name, publisher_options, + event_callbacks, message_alloc); }; // function to add a publisher to the intra process manager diff --git a/rclcpp/include/rclcpp/publisher_options.hpp b/rclcpp/include/rclcpp/publisher_options.hpp new file mode 100644 index 0000000000..1b87f6c4c2 --- /dev/null +++ b/rclcpp/include/rclcpp/publisher_options.hpp @@ -0,0 +1,191 @@ +// Copyright 2019 Open Source Robotics Foundation, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#ifndef RCLCPP__PUBLISHER_OPTIONS_HPP_ +#define RCLCPP__PUBLISHER_OPTIONS_HPP_ + +#include +#include +#include + +#include "rclcpp/visibility_control.hpp" +#include "rclcpp/qos_event.hpp" + + +namespace rclcpp +{ + +struct PublisherEventCallbacks +{ + QOSDeadlineOfferedCallbackType deadline_callback_; + QOSLivelinessLostCallbackType liveliness_callback_; +}; + +template> +class PublisherOptions +{ +public: + /// Create PublisherOptions with default values, optionally specifying the allocator to use. + /** + * Default values for the node options: + * + * - context = rclcpp::contexts::default_context::get_global_default_context() + * - arguments = {} + * - initial_parameters = {} + * - use_global_arguments = true + * - use_intra_process_comms = false + * - start_parameter_services = true + * - start_publisher_publisher = true + * - publisher_qos_profile = rmw_qos_profile_default + * - allocator = rcl_get_default_allocator() + * + * \param[in] allocator allocator to use in construction of PublisherOptions. + */ + RCLCPP_PUBLIC + PublisherOptions() = default; + + RCLCPP_PUBLIC + explicit PublisherOptions(const rmw_qos_profile_t & qos_profile) + : publisher_qos_profile_(qos_profile) {} + + /// Destructor. + RCLCPP_PUBLIC + virtual + ~PublisherOptions() = default; + + /// Copy constructor. + RCLCPP_PUBLIC + PublisherOptions(const PublisherOptions & other) = default; + + /// Assignment operator. + RCLCPP_PUBLIC + PublisherOptions & + operator=(const PublisherOptions & other) = default; + + + /// Return a reference to the publisher_qos_profile QoS. + RCLCPP_PUBLIC + const rmw_qos_profile_t & + qos_profile() const + { + return publisher_qos_profile_; + } + + RCLCPP_PUBLIC + rmw_qos_profile_t & + qos_profile() + { + return publisher_qos_profile_; + } + + /// Set the publisher_qos_profile QoS, return this for parameter idiom. + /** + * The QoS settings to be used for the publisher + */ + RCLCPP_PUBLIC + PublisherOptions & + qos_profile(const rmw_qos_profile_t & publisher_qos_profile) + { + publisher_qos_profile_ = publisher_qos_profile; + return *this; + } + + RCLCPP_PUBLIC + size_t & + qos_history_depth() + { + return publisher_qos_profile_.depth; + } + + RCLCPP_PUBLIC + PublisherOptions & + qos_history_depth(size_t depth) + { + publisher_qos_profile_.depth = depth; + return *this; + } + + + RCLCPP_PUBLIC + const QOSDeadlineOfferedCallbackType & + deadline_callback() const + { + return callbacks_.deadline_callback_; + } + + RCLCPP_PUBLIC + PublisherOptions & + deadline_callback(const QOSDeadlineOfferedCallbackType & callback) + { + callbacks_.deadline_callback_ = callback; + return *this; + } + + RCLCPP_PUBLIC + const QOSLivelinessLostCallbackType & + liveliness_callback() const + { + return callbacks_.liveliness_callback_; + } + + RCLCPP_PUBLIC + PublisherOptions & + liveliness_callback(const QOSLivelinessLostCallbackType & callback) + { + callbacks_.liveliness_callback_ = callback; + return *this; + } + + RCLCPP_PUBLIC + const PublisherEventCallbacks & + event_callbacks() const + { + return callbacks_; + } + + + /// Return the rcl_allocator_t to be used. + RCLCPP_PUBLIC + std::shared_ptr + allocator() const + { + return allocator_; + } + + /// Set the rcl_allocator_t to be used, may cause deallocation of existing rcl_publisher_options_t + /** + * This will cause the internal rcl_publisher_options_t struct to be invalidated. + */ + RCLCPP_PUBLIC + PublisherOptions & + allocator(std::shared_ptr allocator) + { + allocator_ = allocator; + return *this; + } + +private: + // IMPORTANT: if any of these default values are changed, please update the + // documentation in this class. + + PublisherEventCallbacks callbacks_; + + rmw_qos_profile_t publisher_qos_profile_ = rmw_qos_profile_default; + + std::shared_ptr allocator_ = nullptr; +}; + +} // namespace rclcpp + +#endif // RCLCPP__PUBLISHER_OPTIONS_HPP_ diff --git a/rclcpp/include/rclcpp/qos_event.hpp b/rclcpp/include/rclcpp/qos_event.hpp new file mode 100644 index 0000000000..08833a6a1d --- /dev/null +++ b/rclcpp/include/rclcpp/qos_event.hpp @@ -0,0 +1,119 @@ +// Copyright 2016 Open Source Robotics Foundation, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#ifndef RCLCPP__QOS_EVENT_HPP_ +#define RCLCPP__QOS_EVENT_HPP_ + +#include + +#include "rcl/error_handling.h" + +#include "rcutils/logging_macros.h" + +#include "rclcpp/waitable.hpp" +#include "rclcpp/exceptions.hpp" +#include "rclcpp/function_traits.hpp" + + +namespace rclcpp +{ + +using QOSDeadlineRequestedInfo = rmw_requested_deadline_missed_status_t; +using QOSDeadlineOfferedInfo = rmw_offered_deadline_missed_status_t; +using QOSLivelinessChangedInfo = rmw_liveliness_changed_status_t; +using QOSLivelinessLostInfo = rmw_liveliness_lost_status_t; + +using QOSDeadlineRequestedCallbackType = std::function; +using QOSDeadlineOfferedCallbackType = std::function; +using QOSLivelinessChangedCallbackType = std::function; +using QOSLivelinessLostCallbackType = std::function; + + +class QOSEventHandlerBase : public Waitable +{ +public: + /// Get the number of ready events + size_t + get_number_of_ready_events() override; + + /// Add the Waitable to a wait set. + bool + add_to_wait_set(rcl_wait_set_t * wait_set) override; + + /// Check if the Waitable is ready. + bool + is_ready(rcl_wait_set_t * wait_set) override; + +protected: + rcl_event_t event_handle_; + size_t wait_set_event_index_; +}; + + +template +class QOSEventHandler : public QOSEventHandlerBase +{ +public: + template + QOSEventHandler( + const EventCallbackT & callback, + InitFuncT init_func, + ParentHandleT parent_handle, + EventTypeEnum event_type) + : event_callback_(callback) + { + event_handle_ = rcl_get_zero_initialized_event(); + rcl_ret_t ret = init_func(&event_handle_, parent_handle, event_type); + if (ret != RCL_RET_OK) { + rclcpp::exceptions::throw_from_rcl_error(ret, "could not create event"); + } + } + + ~QOSEventHandler() + { + if (rcl_event_fini(&event_handle_) != RCL_RET_OK) { + RCUTILS_LOG_ERROR_NAMED( + "rclcpp", + "Error in destruction of rcl event handle: %s", rcl_get_error_string().str); + rcl_reset_error(); + } + } + + /// Execute any entities of the Waitable that are ready. + void + execute() override + { + EventCallbackInfoT callback_info; + + rcl_ret_t ret = rcl_take_event(&event_handle_, &callback_info); + if (ret != RCL_RET_OK) { + RCUTILS_LOG_ERROR_NAMED( + "rclcpp", + "Couldn't take event info: %s", rcl_get_error_string().str); + return; + } + + event_callback_(callback_info); + } + +private: + using EventCallbackInfoT = typename std::remove_reference::template argument_type<0>>::type; + + EventCallbackT event_callback_; +}; + +} // namespace rclcpp + +#endif // RCLCPP__QOS_EVENT_HPP_ diff --git a/rclcpp/include/rclcpp/strategies/allocator_memory_strategy.hpp b/rclcpp/include/rclcpp/strategies/allocator_memory_strategy.hpp index ca1d2fba3c..1e73d1ccbf 100644 --- a/rclcpp/include/rclcpp/strategies/allocator_memory_strategy.hpp +++ b/rclcpp/include/rclcpp/strategies/allocator_memory_strategy.hpp @@ -164,6 +164,14 @@ class AllocatorMemoryStrategy : public memory_strategy::MemoryStrategy if (!group || !group->can_be_taken_from().load()) { continue; } + for (auto & weak_publisher : group->get_publisher_ptrs()) { + auto publisher = weak_publisher.lock(); + if (publisher) { + for (auto & publisher_event : publisher->get_event_handlers()) { + waitable_handles_.push_back(publisher_event); + } + } + } for (auto & weak_subscription : group->get_subscription_ptrs()) { auto subscription = weak_subscription.lock(); if (subscription) { @@ -172,6 +180,9 @@ class AllocatorMemoryStrategy : public memory_strategy::MemoryStrategy subscription_handles_.push_back( subscription->get_intra_process_subscription_handle()); } + for (auto & subscription_event : subscription->get_event_handlers()) { + waitable_handles_.push_back(subscription_event); + } } } for (auto & weak_service : group->get_service_ptrs()) { @@ -430,6 +441,15 @@ class AllocatorMemoryStrategy : public memory_strategy::MemoryStrategy return number_of_services; } + size_t number_of_ready_events() const + { + size_t number_of_events = 0; + for (auto waitable : waitable_handles_) { + number_of_events += waitable->get_number_of_ready_events(); + } + return number_of_events; + } + size_t number_of_ready_clients() const { size_t number_of_clients = client_handles_.size(); diff --git a/rclcpp/include/rclcpp/subscription.hpp b/rclcpp/include/rclcpp/subscription.hpp index 2a2a3e4eef..6e12d85414 100644 --- a/rclcpp/include/rclcpp/subscription.hpp +++ b/rclcpp/include/rclcpp/subscription.hpp @@ -21,6 +21,7 @@ #include #include #include +#include #include #include @@ -29,7 +30,10 @@ #include "rcl_interfaces/msg/intra_process_message.hpp" +#include "rclcpp/subscription_options.hpp" +#include "rclcpp/waitable.hpp" #include "rclcpp/any_subscription_callback.hpp" +#include "rclcpp/qos_event.hpp" #include "rclcpp/exceptions.hpp" #include "rclcpp/expand_topic_or_service_name.hpp" #include "rclcpp/macros.hpp" @@ -99,6 +103,10 @@ class SubscriptionBase virtual const std::shared_ptr get_intra_process_subscription_handle() const; + RCLCPP_PUBLIC + const std::vector> & + get_event_handlers() const; + /// Borrow a new message. /** \return Shared pointer to the fresh message. */ virtual std::shared_ptr @@ -145,13 +153,34 @@ class SubscriptionBase get_publisher_count() const; protected: - std::shared_ptr intra_process_subscription_handle_; - std::shared_ptr subscription_handle_; - std::shared_ptr node_handle_; + template + void + add_event_handler( + const EventCallbackT & callback, + const rcl_subscription_event_type_t event_type) + { + event_handlers_.emplace_back(std::make_shared>( + callback, + rcl_subscription_event_init, + get_subscription_handle().get(), + event_type)); + } using IntraProcessManagerWeakPtr = std::weak_ptr; + + std::shared_ptr node_handle_; + + std::shared_ptr subscription_handle_; + size_t wait_set_subscription_index_; + bool subscription_ready_; + + std::vector> event_handlers_; + bool use_intra_process_; + std::shared_ptr intra_process_subscription_handle_; + size_t wait_set_intra_process_subscription_index_; + bool intra_process_subscription_ready_; IntraProcessManagerWeakPtr weak_ipm_; uint64_t intra_process_subscription_id_; @@ -195,6 +224,7 @@ class Subscription : public SubscriptionBase const std::string & topic_name, const rcl_subscription_options_t & subscription_options, AnySubscriptionCallback callback, + const SubscriptionEventCallbacks & event_callbacks, typename message_memory_strategy::MessageMemoryStrategy::SharedPtr memory_strategy = message_memory_strategy::MessageMemoryStrategy::create_default()) @@ -208,7 +238,16 @@ class Subscription : public SubscriptionBase message_memory_strategy_(memory_strategy), get_intra_process_message_callback_(nullptr), matches_any_intra_process_publishers_(nullptr) - {} + { + if (event_callbacks.deadline_callback_) { + this->add_event_handler(event_callbacks.deadline_callback_, + RCL_SUBSCRIPTION_REQUESTED_DEADLINE_MISSED); + } + if (event_callbacks.liveliness_callback_) { + this->add_event_handler(event_callbacks.liveliness_callback_, + RCL_SUBSCRIPTION_LIVELINESS_CHANGED); + } + } /// Support dynamically setting the message memory strategy. /** diff --git a/rclcpp/include/rclcpp/subscription_factory.hpp b/rclcpp/include/rclcpp/subscription_factory.hpp index e2bdec7ef2..905e10d3de 100644 --- a/rclcpp/include/rclcpp/subscription_factory.hpp +++ b/rclcpp/include/rclcpp/subscription_factory.hpp @@ -75,6 +75,7 @@ template< SubscriptionFactory create_subscription_factory( CallbackT && callback, + const SubscriptionEventCallbacks & event_callbacks, typename rclcpp::message_memory_strategy::MessageMemoryStrategy< CallbackMessageT, Alloc>::SharedPtr msg_mem_strat, @@ -91,7 +92,7 @@ create_subscription_factory( // factory function that creates a MessageT specific SubscriptionT factory.create_typed_subscription = - [allocator, msg_mem_strat, any_subscription_callback, message_alloc]( + [allocator, msg_mem_strat, any_subscription_callback, event_callbacks, message_alloc]( rclcpp::node_interfaces::NodeBaseInterface * node_base, const std::string & topic_name, rcl_subscription_options_t & subscription_options @@ -109,6 +110,7 @@ create_subscription_factory( topic_name, subscription_options, any_subscription_callback, + event_callbacks, msg_mem_strat); auto sub_base_ptr = std::dynamic_pointer_cast(sub); return sub_base_ptr; diff --git a/rclcpp/include/rclcpp/subscription_options.hpp b/rclcpp/include/rclcpp/subscription_options.hpp new file mode 100644 index 0000000000..6d6dc9fda4 --- /dev/null +++ b/rclcpp/include/rclcpp/subscription_options.hpp @@ -0,0 +1,201 @@ +// Copyright 2019 Open Source Robotics Foundation, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#ifndef RCLCPP__SUBSCRIPTION_OPTIONS_HPP_ +#define RCLCPP__SUBSCRIPTION_OPTIONS_HPP_ + +#include +#include +#include + +#include "rclcpp/visibility_control.hpp" +#include "rclcpp/qos_event.hpp" + + +namespace rclcpp +{ + +struct SubscriptionEventCallbacks +{ + QOSDeadlineRequestedCallbackType deadline_callback_; + QOSLivelinessChangedCallbackType liveliness_callback_; +}; + +template> +class SubscriptionOptions +{ +public: + /// Create SubscriptionOptions with default values, optionally specifying the allocator to use. + /** + * Default values for the node options: + * + * - deadline_callback = nullptr + * - liveliness_callback = nullptr + * - subscription_qos_profile = rmw_qos_profile_default + * - ignore_local_publications = false + * - allocator = nullptr + * + * \param[in] allocator allocator to use in construction of SubscriptionOptions. + */ + RCLCPP_PUBLIC + SubscriptionOptions() = default; + + RCLCPP_PUBLIC + explicit SubscriptionOptions(const rmw_qos_profile_t & qos_profile) + : subscription_qos_profile_(qos_profile) {} + + /// Destructor. + RCLCPP_PUBLIC + virtual + ~SubscriptionOptions() = default; + + /// Copy constructor. + RCLCPP_PUBLIC + SubscriptionOptions(const SubscriptionOptions & other) = default; + + /// Assignment operator. + RCLCPP_PUBLIC + SubscriptionOptions & + operator=(const SubscriptionOptions & other) = default; + + + /// Return a reference to the subscription_qos_profile QoS. + RCLCPP_PUBLIC + const rmw_qos_profile_t & + qos_profile() const + { + return subscription_qos_profile_; + } + + RCLCPP_PUBLIC + rmw_qos_profile_t & + qos_profile() + { + return subscription_qos_profile_; + } + + /// Set the subscription_qos_profile QoS, return this for parameter idiom. + /** + * The QoS settings to be used for the subscription + */ + RCLCPP_PUBLIC + SubscriptionOptions & + qos_profile(const rmw_qos_profile_t & subscription_qos_profile) + { + subscription_qos_profile_ = subscription_qos_profile; + return *this; + } + + RCLCPP_PUBLIC + size_t & + qos_history_depth() + { + return subscription_qos_profile_.depth; + } + + RCLCPP_PUBLIC + SubscriptionOptions & + qos_history_depth(size_t depth) + { + subscription_qos_profile_.depth = depth; + return *this; + } + + + RCLCPP_PUBLIC + const QOSDeadlineRequestedCallbackType & + deadline_callback() const + { + return callbacks_.deadline_callback_; + } + + RCLCPP_PUBLIC + SubscriptionOptions & + deadline_callback(const QOSDeadlineRequestedCallbackType & callback) + { + callbacks_.deadline_callback_ = callback; + return *this; + } + + RCLCPP_PUBLIC + const QOSLivelinessChangedCallbackType & + liveliness_callback() const + { + return callbacks_.liveliness_callback_; + } + + RCLCPP_PUBLIC + SubscriptionOptions & + liveliness_callback(const QOSLivelinessChangedCallbackType & callback) + { + callbacks_.liveliness_callback_ = callback; + return *this; + } + + + RCLCPP_PUBLIC + const SubscriptionEventCallbacks & + event_callbacks() const + { + return callbacks_; + } + + + RCLCPP_PUBLIC + bool + ignore_local_publications() const + { + return ignore_local_publications_; + } + + RCLCPP_PUBLIC + SubscriptionOptions & + ignore_local_publications(bool ignore) + { + ignore_local_publications_ = ignore; + return *this; + } + + /// Return the std::shared_ptr to be used. + RCLCPP_PUBLIC + std::shared_ptr + allocator() const + { + return allocator_; + } + + RCLCPP_PUBLIC + SubscriptionOptions & + allocator(std::shared_ptr allocator) + { + allocator_ = allocator; + return *this; + } + +private: + // IMPORTANT: if any of these default values are changed, please update the + // documentation in this class. + + SubscriptionEventCallbacks callbacks_; + + rmw_qos_profile_t subscription_qos_profile_ = rmw_qos_profile_default; + + bool ignore_local_publications_ = false; + + std::shared_ptr allocator_ = nullptr; +}; + +} // namespace rclcpp + +#endif // RCLCPP__SUBSCRIPTION_OPTIONS_HPP_ diff --git a/rclcpp/include/rclcpp/waitable.hpp b/rclcpp/include/rclcpp/waitable.hpp index 4f48bbc238..5ff01792b7 100644 --- a/rclcpp/include/rclcpp/waitable.hpp +++ b/rclcpp/include/rclcpp/waitable.hpp @@ -61,6 +61,17 @@ class Waitable size_t get_number_of_ready_clients(); + /// Get the number of ready events + /** + * Returns a value of 0 by default. + * This should be overridden if the Waitable contains one or more events. + * \return The number of events associated with the Waitable. + */ + RCLCPP_PUBLIC + virtual + size_t + get_number_of_ready_events(); + /// Get the number of ready services /** * Returns a value of 0 by default. diff --git a/rclcpp/src/rclcpp/callback_group.cpp b/rclcpp/src/rclcpp/callback_group.cpp index b812afc516..e269a6120b 100644 --- a/rclcpp/src/rclcpp/callback_group.cpp +++ b/rclcpp/src/rclcpp/callback_group.cpp @@ -23,6 +23,13 @@ CallbackGroup::CallbackGroup(CallbackGroupType group_type) : type_(group_type), can_be_taken_from_(true) {} +const std::vector & +CallbackGroup::get_publisher_ptrs() const +{ + std::lock_guard lock(mutex_); + return publisher_ptrs_; +} + const std::vector & CallbackGroup::get_subscription_ptrs() const { @@ -70,6 +77,14 @@ CallbackGroup::type() const return type_; } +void +CallbackGroup::add_publisher( + const rclcpp::PublisherBase::SharedPtr publisher_ptr) +{ + std::lock_guard lock(mutex_); + publisher_ptrs_.push_back(publisher_ptr); +} + void CallbackGroup::add_subscription( const rclcpp::SubscriptionBase::SharedPtr subscription_ptr) diff --git a/rclcpp/src/rclcpp/executor.cpp b/rclcpp/src/rclcpp/executor.cpp index ac837314a4..f833bcf0a3 100644 --- a/rclcpp/src/rclcpp/executor.cpp +++ b/rclcpp/src/rclcpp/executor.cpp @@ -60,7 +60,8 @@ Executor::Executor(const ExecutorArgs & args) // Store the context for later use. context_ = args.context; - ret = rcl_wait_set_init(&wait_set_, 0, 2, 0, 0, 0, context_->get_rcl_context().get(), allocator); + ret = rcl_wait_set_init(&wait_set_, 0, 2, 0, 0, 0, 0, + context_->get_rcl_context().get(), allocator); if (RCL_RET_OK != ret) { RCUTILS_LOG_ERROR_NAMED( "rclcpp", @@ -434,7 +435,8 @@ Executor::wait_for_work(std::chrono::nanoseconds timeout) rcl_ret_t ret = rcl_wait_set_resize( &wait_set_, memory_strategy_->number_of_ready_subscriptions(), memory_strategy_->number_of_guard_conditions(), memory_strategy_->number_of_ready_timers(), - memory_strategy_->number_of_ready_clients(), memory_strategy_->number_of_ready_services()); + memory_strategy_->number_of_ready_clients(), memory_strategy_->number_of_ready_services(), + memory_strategy_->number_of_ready_events()); if (RCL_RET_OK != ret) { throw std::runtime_error( std::string("Couldn't resize the wait set : ") + rcl_get_error_string().str); diff --git a/rclcpp/src/rclcpp/executors/multi_threaded_executor.cpp b/rclcpp/src/rclcpp/executors/multi_threaded_executor.cpp index 08318534b2..6c04d81315 100644 --- a/rclcpp/src/rclcpp/executors/multi_threaded_executor.cpp +++ b/rclcpp/src/rclcpp/executors/multi_threaded_executor.cpp @@ -82,7 +82,6 @@ MultiThreadedExecutor::run(size_t) } if (any_exec.timer) { // Guard against multiple threads getting the same timer. - std::lock_guard lock(scheduled_timers_mutex_); if (scheduled_timers_.count(any_exec.timer) != 0) { continue; } @@ -96,7 +95,7 @@ MultiThreadedExecutor::run(size_t) execute_any_executable(any_exec); if (any_exec.timer) { - std::lock_guard lock(scheduled_timers_mutex_); + std::lock_guard wait_lock(wait_mutex_); auto it = scheduled_timers_.find(any_exec.timer); if (it != scheduled_timers_.end()) { scheduled_timers_.erase(it); diff --git a/rclcpp/src/rclcpp/graph_listener.cpp b/rclcpp/src/rclcpp/graph_listener.cpp index 1a181898c4..b097564333 100644 --- a/rclcpp/src/rclcpp/graph_listener.cpp +++ b/rclcpp/src/rclcpp/graph_listener.cpp @@ -79,6 +79,7 @@ GraphListener::start_if_not_started() 0, // number_of_timers 0, // number_of_clients 0, // number_of_services + 0, // number_of_events this->parent_context_->get_rcl_context().get(), rcl_get_default_allocator()); if (RCL_RET_OK != ret) { @@ -145,7 +146,7 @@ GraphListener::run_loop() const size_t node_graph_interfaces_size = node_graph_interfaces_.size(); // Add 2 for the interrupt and shutdown guard conditions if (wait_set_.size_of_guard_conditions < (node_graph_interfaces_size + 2)) { - ret = rcl_wait_set_resize(&wait_set_, 0, node_graph_interfaces_size + 2, 0, 0, 0); + ret = rcl_wait_set_resize(&wait_set_, 0, node_graph_interfaces_size + 2, 0, 0, 0, 0); if (RCL_RET_OK != ret) { throw_from_rcl_error(ret, "failed to resize wait set"); } diff --git a/rclcpp/src/rclcpp/node_interfaces/node_parameters.cpp b/rclcpp/src/rclcpp/node_interfaces/node_parameters.cpp index 8c4d14b94c..345715bd6f 100644 --- a/rclcpp/src/rclcpp/node_interfaces/node_parameters.cpp +++ b/rclcpp/src/rclcpp/node_interfaces/node_parameters.cpp @@ -59,6 +59,8 @@ NodeParameters::NodeParameters( node_topics.get(), "parameter_events", parameter_event_qos_profile, + PublisherEventCallbacks(), + nullptr, use_intra_process, allocator); } diff --git a/rclcpp/src/rclcpp/node_interfaces/node_topics.cpp b/rclcpp/src/rclcpp/node_interfaces/node_topics.cpp index b380e3c2ec..99e73b586a 100644 --- a/rclcpp/src/rclcpp/node_interfaces/node_topics.cpp +++ b/rclcpp/src/rclcpp/node_interfaces/node_topics.cpp @@ -64,11 +64,19 @@ NodeTopics::create_publisher( void NodeTopics::add_publisher( - rclcpp::PublisherBase::SharedPtr publisher) + rclcpp::PublisherBase::SharedPtr publisher, + rclcpp::callback_group::CallbackGroup::SharedPtr callback_group) { - // The publisher is not added to a callback group or anthing like that for now. - // It may be stored within the NodeTopics class or the NodeBase class in the future. - (void)publisher; + // Assign to a group. + if (callback_group) { + if (!node_base_->callback_group_in_node(callback_group)) { + throw std::runtime_error("Cannot create publisher, callback group not in node."); + } + callback_group->add_publisher(publisher); + } else { + node_base_->get_default_callback_group()->add_publisher(publisher); + } + // Notify the executor that a new publisher was created using the parent Node. { auto notify_guard_condition_lock = node_base_->acquire_notify_guard_condition_lock(); diff --git a/rclcpp/src/rclcpp/publisher.cpp b/rclcpp/src/rclcpp/publisher.cpp index ce6ebadaee..addbcf114b 100644 --- a/rclcpp/src/rclcpp/publisher.cpp +++ b/rclcpp/src/rclcpp/publisher.cpp @@ -20,6 +20,7 @@ #include #include #include +#include #include #include @@ -154,6 +155,12 @@ PublisherBase::get_publisher_handle() const return &publisher_handle_; } +const std::vector> & +PublisherBase::get_event_handlers() const +{ + return event_handlers_; +} + size_t PublisherBase::get_subscription_count() const { diff --git a/rclcpp/src/rclcpp/qos_event.cpp b/rclcpp/src/rclcpp/qos_event.cpp new file mode 100644 index 0000000000..d3058f99f4 --- /dev/null +++ b/rclcpp/src/rclcpp/qos_event.cpp @@ -0,0 +1,49 @@ +// Copyright 2016 Open Source Robotics Foundation, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#include "rclcpp/qos_event.hpp" + + +namespace rclcpp +{ + +/// Get the number of ready events +size_t +QOSEventHandlerBase::get_number_of_ready_events() +{ + return 1; +} + +/// Add the Waitable to a wait set. +bool +QOSEventHandlerBase::add_to_wait_set(rcl_wait_set_t * wait_set) +{ + if (rcl_wait_set_add_event(wait_set, &event_handle_, &wait_set_event_index_) != RCL_RET_OK) { + RCUTILS_LOG_ERROR_NAMED( + "rclcpp", + "Couldn't add event to wait set: %s", rcl_get_error_string().str); + return false; + } + + return true; +} + +/// Check if the Waitable is ready. +bool +QOSEventHandlerBase::is_ready(rcl_wait_set_t * wait_set) +{ + return wait_set->events[wait_set_event_index_] == &event_handle_; +} + +} // namespace rclcpp diff --git a/rclcpp/src/rclcpp/subscription.cpp b/rclcpp/src/rclcpp/subscription.cpp index ed26acaf17..438f2e2ed8 100644 --- a/rclcpp/src/rclcpp/subscription.cpp +++ b/rclcpp/src/rclcpp/subscription.cpp @@ -121,6 +121,12 @@ SubscriptionBase::get_intra_process_subscription_handle() const return intra_process_subscription_handle_; } +const std::vector> & +SubscriptionBase::get_event_handlers() const +{ + return event_handlers_; +} + const rosidl_message_type_support_t & SubscriptionBase::get_message_type_support_handle() const { diff --git a/rclcpp/src/rclcpp/time_source.cpp b/rclcpp/src/rclcpp/time_source.cpp index 173ccf054f..8ea80664da 100644 --- a/rclcpp/src/rclcpp/time_source.cpp +++ b/rclcpp/src/rclcpp/time_source.cpp @@ -214,6 +214,7 @@ void TimeSource::create_clock_sub() topic_name, std::move(cb), rmw_qos_profile_default, + rclcpp::SubscriptionEventCallbacks(), group, false, false, diff --git a/rclcpp/src/rclcpp/waitable.cpp b/rclcpp/src/rclcpp/waitable.cpp index a29b80de1d..542b10a016 100644 --- a/rclcpp/src/rclcpp/waitable.cpp +++ b/rclcpp/src/rclcpp/waitable.cpp @@ -34,6 +34,12 @@ Waitable::get_number_of_ready_clients() return 0u; } +size_t +Waitable::get_number_of_ready_events() +{ + return 0u; +} + size_t Waitable::get_number_of_ready_services() { diff --git a/rclcpp/test/test_time_source.cpp b/rclcpp/test/test_time_source.cpp index da84b82d05..02a7a8e7f2 100644 --- a/rclcpp/test/test_time_source.cpp +++ b/rclcpp/test/test_time_source.cpp @@ -129,8 +129,7 @@ void trigger_clock_changes( std::shared_ptr clock, bool expect_time_update = true) { - auto clock_pub = node->create_publisher("clock", - rmw_qos_profile_default); + auto clock_pub = node->create_publisher("clock"); for (int i = 0; i < 5; ++i) { if (!rclcpp::ok()) { diff --git a/rclcpp_action/test/test_client.cpp b/rclcpp_action/test/test_client.cpp index 32afe75310..6199d29af2 100644 --- a/rclcpp_action/test/test_client.cpp +++ b/rclcpp_action/test/test_client.cpp @@ -204,8 +204,8 @@ class TestClient : public ::testing::Test ret = rcl_action_get_status_topic_name( action_name, allocator, &status_topic_name); ASSERT_EQ(RCL_RET_OK, ret); - status_publisher = server_node->create_publisher( - status_topic_name, rcl_action_qos_profile_status_default); + status_publisher = server_node->create_publisher(status_topic_name, + nullptr, rclcpp::PublisherOptions<>(rcl_action_qos_profile_status_default)); ASSERT_TRUE(status_publisher != nullptr); allocator.deallocate(status_topic_name, allocator.state); server_executor.add_node(server_node); diff --git a/rclcpp_lifecycle/include/rclcpp_lifecycle/lifecycle_node.hpp b/rclcpp_lifecycle/include/rclcpp_lifecycle/lifecycle_node.hpp index 2e368dac8c..32459d3593 100644 --- a/rclcpp_lifecycle/include/rclcpp_lifecycle/lifecycle_node.hpp +++ b/rclcpp_lifecycle/include/rclcpp_lifecycle/lifecycle_node.hpp @@ -43,6 +43,7 @@ #include "rclcpp/node_interfaces/node_logging_interface.hpp" #include "rclcpp/node_interfaces/node_parameters_interface.hpp" #include "rclcpp/node_interfaces/node_services_interface.hpp" +#include "rclcpp/node_interfaces/node_time_source_interface.hpp" #include "rclcpp/node_interfaces/node_timers_interface.hpp" #include "rclcpp/node_interfaces/node_topics_interface.hpp" #include "rclcpp/node_interfaces/node_waitables_interface.hpp" @@ -254,6 +255,26 @@ class LifecycleNode : public node_interfaces::LifecycleNodeInterface, rcl_interfaces::msg::SetParametersResult set_parameters_atomically(const std::vector & parameters); + template + void + set_parameter_if_not_set( + const std::string & name, + const ParameterT & value); + + /// Set a map of parameters with the same prefix. + /** + * For each key in the map, a parameter with a name of "name.key" will be set + * to the value in the map. + * + * \param[in] name The prefix of the parameters to set. + * \param[in] values The parameters to set in the given prefix. + */ + template + void + set_parameters_if_not_set( + const std::string & name, + const std::map & values); + RCLCPP_LIFECYCLE_PUBLIC std::vector get_parameters(const std::vector & names) const; @@ -272,6 +293,49 @@ class LifecycleNode : public node_interfaces::LifecycleNodeInterface, bool get_parameter(const std::string & name, ParameterT & parameter) const; + /// Assign the value of the map parameter if set into the values argument. + /** + * Parameter names that are part of a map are of the form "name.member". + * This API gets all parameters that begin with "name", storing them into the + * map with the name of the parameter and their value. + * If there are no members in the named map, then the "values" argument is not changed. + * + * \param[in] name The prefix of the parameters to get. + * \param[out] values The map of output values, with one std::string,MapValueT + * per parameter. + * \returns true if values was changed, false otherwise + */ + template + bool + get_parameters( + const std::string & name, + std::map & values) const; + + template + bool + get_parameter_or( + const std::string & name, + ParameterT & value, + const ParameterT & alternative_value) const; + + /// Get the parameter value; if not set, set the "alternative value" and store it in the node. + /** + * If the parameter is set, then the "value" argument is assigned the value + * in the parameter. + * If the parameter is not set, then the "value" argument is assigned the "alternative_value", + * and the parameter is set to the "alternative_value" on the node. + * + * \param[in] name The name of the parameter to get. + * \param[out] value The output where the value of the parameter should be assigned. + * \param[in] alternative_value Value to be stored in output and parameter if the parameter was not set. + */ + template + void + get_parameter_or_set( + const std::string & name, + ParameterT & value, + const ParameterT & alternative_value); + RCLCPP_LIFECYCLE_PUBLIC std::vector describe_parameters(const std::vector & names) const; @@ -383,6 +447,11 @@ class LifecycleNode : public node_interfaces::LifecycleNodeInterface, rclcpp::node_interfaces::NodeParametersInterface::SharedPtr get_node_parameters_interface(); + /// Return the Node's internal NodeParametersInterface implementation. + RCLCPP_LIFECYCLE_PUBLIC + rclcpp::node_interfaces::NodeTimeSourceInterface::SharedPtr + get_node_time_source_interface(); + /// Return the Node's internal NodeWaitablesInterface implementation. RCLCPP_LIFECYCLE_PUBLIC rclcpp::node_interfaces::NodeWaitablesInterface::SharedPtr @@ -513,6 +582,7 @@ class LifecycleNode : public node_interfaces::LifecycleNodeInterface, rclcpp::node_interfaces::NodeServicesInterface::SharedPtr node_services_; rclcpp::node_interfaces::NodeClockInterface::SharedPtr node_clock_; rclcpp::node_interfaces::NodeParametersInterface::SharedPtr node_parameters_; + rclcpp::node_interfaces::NodeTimeSourceInterface::SharedPtr node_time_source_; rclcpp::node_interfaces::NodeWaitablesInterface::SharedPtr node_waitables_; bool use_intra_process_comms_; diff --git a/rclcpp_lifecycle/include/rclcpp_lifecycle/lifecycle_node_impl.hpp b/rclcpp_lifecycle/include/rclcpp_lifecycle/lifecycle_node_impl.hpp index 69d8a48e96..ae83aca436 100644 --- a/rclcpp_lifecycle/include/rclcpp_lifecycle/lifecycle_node_impl.hpp +++ b/rclcpp_lifecycle/include/rclcpp_lifecycle/lifecycle_node_impl.hpp @@ -15,12 +15,15 @@ #ifndef RCLCPP_LIFECYCLE__LIFECYCLE_NODE_IMPL_HPP_ #define RCLCPP_LIFECYCLE__LIFECYCLE_NODE_IMPL_HPP_ +#include #include #include #include +#include #include "rclcpp/contexts/default_context.hpp" #include "rclcpp/intra_process_manager.hpp" +#include "rclcpp/event.hpp" #include "rclcpp/parameter.hpp" #include "rclcpp/create_publisher.hpp" #include "rclcpp/create_service.hpp" @@ -63,6 +66,8 @@ LifecycleNode::create_publisher( this->node_topics_.get(), topic_name, qos_profile, + rclcpp::PublisherEventCallbacks(), + nullptr, use_intra_process_comms_, allocator); } @@ -97,6 +102,7 @@ LifecycleNode::create_subscription( topic_name, std::forward(callback), qos_profile, + rclcpp::SubscriptionEventCallbacks(), group, ignore_local_publications, use_intra_process_comms_, @@ -201,5 +207,91 @@ LifecycleNode::register_param_change_callback(CallbackT && callback) this->node_parameters_->register_param_change_callback(std::forward(callback)); } +template +void +LifecycleNode::set_parameter_if_not_set( + const std::string & name, + const ParameterT & value) +{ + rclcpp::Parameter parameter; + if (!this->get_parameter(name, parameter)) { + this->set_parameters({ + rclcpp::Parameter(name, value), + }); + } +} + +// this is a partially-specialized version of set_parameter_if_not_set above, +// where our concrete type for ParameterT is std::map, but the to-be-determined +// type is the value in the map. +template +void +LifecycleNode::set_parameters_if_not_set( + const std::string & name, + const std::map & values) +{ + std::vector params; + + for (const auto & val : values) { + std::string param_name = name + "." + val.first; + rclcpp::Parameter parameter; + if (!this->get_parameter(param_name, parameter)) { + params.push_back(rclcpp::Parameter(param_name, val.second)); + } + } + + this->set_parameters(params); +} + +// this is a partially-specialized version of get_parameter above, +// where our concrete type for ParameterT is std::map, but the to-be-determined +// type is the value in the map. +template +bool +LifecycleNode::get_parameters( + const std::string & name, + std::map & values) const +{ + std::map params; + bool result = node_parameters_->get_parameters_by_prefix(name, params); + if (result) { + for (const auto & param : params) { + values[param.first] = param.second.get_value(); + } + } + + return result; +} + +template +bool +LifecycleNode::get_parameter_or( + const std::string & name, + ParameterT & value, + const ParameterT & alternative_value) const +{ + bool got_parameter = get_parameter(name, value); + if (!got_parameter) { + value = alternative_value; + } + return got_parameter; +} + +template +void +LifecycleNode::get_parameter_or_set( + const std::string & name, + ParameterT & value, + const ParameterT & alternative_value) +{ + bool got_parameter = get_parameter(name, value); + if (!got_parameter) { + this->set_parameters({ + rclcpp::Parameter(name, alternative_value), + }); + value = alternative_value; + } +} + } // namespace rclcpp_lifecycle #endif // RCLCPP_LIFECYCLE__LIFECYCLE_NODE_IMPL_HPP_ diff --git a/rclcpp_lifecycle/include/rclcpp_lifecycle/lifecycle_publisher.hpp b/rclcpp_lifecycle/include/rclcpp_lifecycle/lifecycle_publisher.hpp index 867714e600..10096a84c4 100644 --- a/rclcpp_lifecycle/include/rclcpp_lifecycle/lifecycle_publisher.hpp +++ b/rclcpp_lifecycle/include/rclcpp_lifecycle/lifecycle_publisher.hpp @@ -61,9 +61,10 @@ class LifecyclePublisher : public LifecyclePublisherInterface, rclcpp::node_interfaces::NodeBaseInterface * node_base, const std::string & topic, const rcl_publisher_options_t & publisher_options, + const rclcpp::PublisherEventCallbacks event_callbacks, std::shared_ptr allocator) : rclcpp::Publisher( - node_base, topic, publisher_options, allocator), + node_base, topic, publisher_options, event_callbacks, allocator), enabled_(false), logger_(rclcpp::get_logger("LifecyclePublisher")) { diff --git a/rclcpp_lifecycle/src/lifecycle_node.cpp b/rclcpp_lifecycle/src/lifecycle_node.cpp index 63d1dfbcba..b9a1665f4b 100644 --- a/rclcpp_lifecycle/src/lifecycle_node.cpp +++ b/rclcpp_lifecycle/src/lifecycle_node.cpp @@ -33,6 +33,7 @@ #include "rclcpp/node_interfaces/node_logging.hpp" #include "rclcpp/node_interfaces/node_parameters.hpp" #include "rclcpp/node_interfaces/node_services.hpp" +#include "rclcpp/node_interfaces/node_time_source.hpp" #include "rclcpp/node_interfaces/node_timers.hpp" #include "rclcpp/node_interfaces/node_topics.hpp" #include "rclcpp/node_interfaces/node_waitables.hpp" @@ -81,6 +82,15 @@ LifecycleNode::LifecycleNode( options.start_parameter_event_publisher(), options.parameter_event_qos_profile() )), + node_time_source_(new rclcpp::node_interfaces::NodeTimeSource( + node_base_, + node_topics_, + node_graph_, + node_services_, + node_logging_, + node_clock_, + node_parameters_ + )), node_waitables_(new rclcpp::node_interfaces::NodeWaitables(node_base_.get())), use_intra_process_comms_(options.use_intra_process_comms()), impl_(new LifecycleNodeInterfaceImpl(node_base_, node_services_)) @@ -274,6 +284,12 @@ LifecycleNode::get_node_logging_interface() return node_logging_; } +rclcpp::node_interfaces::NodeTimeSourceInterface::SharedPtr +LifecycleNode::get_node_time_source_interface() +{ + return node_time_source_; +} + rclcpp::node_interfaces::NodeTimersInterface::SharedPtr LifecycleNode::get_node_timers_interface() {