Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

rclcpp QoS implementation #2

Merged
merged 30 commits into from
Mar 29, 2019
Merged
Show file tree
Hide file tree
Changes from 23 commits
Commits
Show all changes
30 commits
Select commit Hold shift + click to select a range
065a966
Add interfaces for events in memory_strategy
ross-desmond Feb 27, 2019
6ddded4
refactor waitables
mm318 Mar 1, 2019
a078e8e
add event callbacks to publisher, subscriber, client, service
mm318 Mar 4, 2019
1a4ccaa
fix some ros2 build issues
mm318 Mar 5, 2019
e1ad5c2
update client-facing API
mm318 Mar 12, 2019
cc8bb54
improve usability of the SubscriptionOptions and PublisherOptions cla…
mm318 Mar 13, 2019
afca6f4
Attempt to fix cppcheck (#646)
sloretz Mar 4, 2019
ec4476e
Add a method to the LifecycleNode class to get the logging interface …
Mar 6, 2019
40ba3c6
update to use separated action types (#601)
hidmic Mar 12, 2019
6821fa6
Add Doxyfile for rclcpp_action
jacobperron Mar 11, 2019
b7cdf8f
Add documentation to rclcpp_action
jacobperron Mar 12, 2019
22a6484
Don't hardcode int64_t for duration type representations (#648)
emersonknapp Mar 12, 2019
80e3d48
Fix test_time_source test (#639)
pbaughman Mar 13, 2019
bcabadc
fix lint errors
mm318 Mar 15, 2019
993788f
apply uncrustify
mm318 Mar 15, 2019
a77d110
fix wait for service memory leak bug (#656)
mm318 Mar 15, 2019
8d94f1b
Merge remote-tracking branch 'upstream/master' into qos
Mar 19, 2019
43f891d
add section about DCO to CONTRIBUTING.md
dirk-thomas Mar 20, 2019
787fa6f
update for rcl API changes
mm318 Mar 21, 2019
71ba5f3
Fix lint and build warnings and API inconsistency
Mar 21, 2019
0a44344
Avoid race that triggers timer too often (#621)
durko Mar 23, 2019
2462a94
Back out Waitable and GraphEvent-related changes
Mar 22, 2019
142b4d6
add publisher and subscription events to AllocatorMemoryStrategy
mm318 Mar 26, 2019
42b7cbe
Add stub API for assert_liveliness
Mar 26, 2019
b352d45
Fix use_sim_time issue on LifeCycleNode (#651)
vinnamkim Mar 26, 2019
ad2ba8d
revert changes to client and services
mm318 Mar 27, 2019
cb20529
Add parameter-related templates to LifecycleNode (#645)
vinnamkim Mar 28, 2019
80bcf2f
update API calls into rcl
mm318 Mar 28, 2019
4f2f8de
fix linter errors in rclcpp_lifecycle (#672)
Karsten1987 Mar 28, 2019
d8502a7
Merge remote-tracking branch 'origin/master' into qos-min
mm318 Mar 29, 2019
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions rclcpp/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,7 @@ set(${PROJECT_NAME}_SRCS
src/rclcpp/parameter_map.cpp
src/rclcpp/parameter_service.cpp
src/rclcpp/publisher.cpp
src/rclcpp/qos_event.cpp
src/rclcpp/service.cpp
src/rclcpp/signal_handler.cpp
src/rclcpp/subscription.cpp
Expand Down
10 changes: 10 additions & 0 deletions rclcpp/include/rclcpp/callback_group.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@

#include "rclcpp/client.hpp"
#include "rclcpp/service.hpp"
#include "rclcpp/publisher.hpp"
#include "rclcpp/subscription.hpp"
#include "rclcpp/timer.hpp"
#include "rclcpp/visibility_control.hpp"
Expand Down Expand Up @@ -61,6 +62,10 @@ class CallbackGroup
RCLCPP_PUBLIC
explicit CallbackGroup(CallbackGroupType group_type);

RCLCPP_PUBLIC
const std::vector<rclcpp::PublisherBase::WeakPtr> &
get_publisher_ptrs() const;

RCLCPP_PUBLIC
const std::vector<rclcpp::SubscriptionBase::WeakPtr> &
get_subscription_ptrs() const;
Expand Down Expand Up @@ -92,6 +97,10 @@ class CallbackGroup
protected:
RCLCPP_DISABLE_COPY(CallbackGroup)

RCLCPP_PUBLIC
void
add_publisher(const rclcpp::PublisherBase::SharedPtr publisher_ptr);

RCLCPP_PUBLIC
void
add_subscription(const rclcpp::SubscriptionBase::SharedPtr subscription_ptr);
Expand Down Expand Up @@ -119,6 +128,7 @@ class CallbackGroup
CallbackGroupType type_;
// Mutex to protect the subsequent vectors of pointers.
mutable std::mutex mutex_;
std::vector<rclcpp::PublisherBase::WeakPtr> publisher_ptrs_;
std::vector<rclcpp::SubscriptionBase::WeakPtr> subscription_ptrs_;
std::vector<rclcpp::TimerBase::WeakPtr> timer_ptrs_;
std::vector<rclcpp::ServiceBase::WeakPtr> service_ptrs_;
Expand Down
8 changes: 6 additions & 2 deletions rclcpp/include/rclcpp/create_publisher.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,8 @@ create_publisher(
rclcpp::node_interfaces::NodeTopicsInterface * node_topics,
const std::string & topic_name,
const rmw_qos_profile_t & qos_profile,
const PublisherEventCallbacks & event_callbacks,
rclcpp::callback_group::CallbackGroup::SharedPtr group,
bool use_intra_process_comms,
std::shared_ptr<AllocatorT> allocator)
{
Expand All @@ -39,10 +41,12 @@ create_publisher(

auto pub = node_topics->create_publisher(
topic_name,
rclcpp::create_publisher_factory<MessageT, AllocatorT, PublisherT>(allocator),
rclcpp::create_publisher_factory<MessageT, AllocatorT, PublisherT>(event_callbacks, allocator),
publisher_options,
use_intra_process_comms);
node_topics->add_publisher(pub);

node_topics->add_publisher(pub, group);

return std::dynamic_pointer_cast<PublisherT>(pub);
}

Expand Down
6 changes: 5 additions & 1 deletion rclcpp/include/rclcpp/create_subscription.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ create_subscription(
const std::string & topic_name,
CallbackT && callback,
const rmw_qos_profile_t & qos_profile,
const SubscriptionEventCallbacks & event_callbacks,
rclcpp::callback_group::CallbackGroup::SharedPtr group,
bool ignore_local_publications,
bool use_intra_process_comms,
Expand All @@ -52,7 +53,10 @@ create_subscription(

auto factory = rclcpp::create_subscription_factory
<MessageT, CallbackT, AllocatorT, CallbackMessageT, SubscriptionT>(
std::forward<CallbackT>(callback), msg_mem_strat, allocator);
std::forward<CallbackT>(callback),
event_callbacks,
msg_mem_strat,
allocator);

auto sub = node_topics->create_subscription(
topic_name,
Expand Down
1 change: 1 addition & 0 deletions rclcpp/include/rclcpp/memory_strategy.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@ class RCLCPP_PUBLIC MemoryStrategy
virtual size_t number_of_ready_subscriptions() const = 0;
virtual size_t number_of_ready_services() const = 0;
virtual size_t number_of_ready_clients() const = 0;
virtual size_t number_of_ready_events() const = 0;
virtual size_t number_of_ready_timers() const = 0;
virtual size_t number_of_guard_conditions() const = 0;
virtual size_t number_of_waitables() const = 0;
Expand Down
79 changes: 20 additions & 59 deletions rclcpp/include/rclcpp/node.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -142,76 +142,27 @@ class Node : public std::enable_shared_from_this<Node>
/// Create and return a Publisher.
/**
* \param[in] topic_name The topic for this publisher to publish on.
* \param[in] qos_history_depth The depth of the publisher message queue.
* \param[in] allocator Optional custom allocator.
* \param[in] group The callback group for this publisher. NULL for no callback group.
* \param[in] options Additional options to control creation of the publisher.
* \return Shared pointer to the created publisher.
*/
template<
typename MessageT, typename Alloc = std::allocator<void>,
typename PublisherT = ::rclcpp::Publisher<MessageT, Alloc>>
std::shared_ptr<PublisherT>
create_publisher(
const std::string & topic_name, size_t qos_history_depth,
std::shared_ptr<Alloc> allocator = nullptr);

/// Create and return a Publisher.
/**
* \param[in] topic_name The topic for this publisher to publish on.
* \param[in] qos_profile The quality of service profile to pass on to the rmw implementation.
* \param[in] allocator Optional custom allocator.
* \return Shared pointer to the created publisher.
*/
template<
typename MessageT, typename Alloc = std::allocator<void>,
typename MessageT,
typename Alloc = std::allocator<void>,
typename PublisherT = ::rclcpp::Publisher<MessageT, Alloc>>
std::shared_ptr<PublisherT>
Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

reminder: restore pre-existing create_publisher and create_subscriber APIs, and mark as deprecated

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@emersonknapp Will you be carving out the PublisherOptions/SubscriptionOptions PR? Can you restore the to-be-deprecated APIs as part of that? I want to leave them out here so that uses of the to-be-deprecated APIs will error out instead of just producing some warnings.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, I am working on that today - once that's ready we can rebase this review onto that branch

create_publisher(
const std::string & topic_name,
const rmw_qos_profile_t & qos_profile = rmw_qos_profile_default,
std::shared_ptr<Alloc> allocator = nullptr);

/// Create and return a Subscription.
/**
* \param[in] topic_name The topic to subscribe on.
* \param[in] callback The user-defined callback function.
* \param[in] qos_profile The quality of service profile to pass on to the rmw implementation.
* \param[in] group The callback group for this subscription. NULL for no callback group.
* \param[in] ignore_local_publications True to ignore local publications.
* \param[in] msg_mem_strat The message memory strategy to use for allocating messages.
* \param[in] allocator Optional custom allocator.
* \return Shared pointer to the created subscription.
*/
/* TODO(jacquelinekay):
Windows build breaks when static member function passed as default
argument to msg_mem_strat, nullptr is a workaround.
*/
template<
typename MessageT,
typename CallbackT,
typename Alloc = std::allocator<void>,
typename SubscriptionT = rclcpp::Subscription<
typename rclcpp::subscription_traits::has_message_type<CallbackT>::type, Alloc>>
std::shared_ptr<SubscriptionT>
create_subscription(
const std::string & topic_name,
CallbackT && callback,
const rmw_qos_profile_t & qos_profile = rmw_qos_profile_default,
rclcpp::callback_group::CallbackGroup::SharedPtr group = nullptr,
bool ignore_local_publications = false,
typename rclcpp::message_memory_strategy::MessageMemoryStrategy<
typename rclcpp::subscription_traits::has_message_type<CallbackT>::type, Alloc>::SharedPtr
msg_mem_strat = nullptr,
std::shared_ptr<Alloc> allocator = nullptr);
const PublisherOptions<Alloc> & options = PublisherOptions<Alloc>());

/// Create and return a Subscription.
/**
* \param[in] topic_name The topic to subscribe on.
* \param[in] qos_history_depth The depth of the subscription's incoming message queue.
* \param[in] callback The user-defined callback function.
* \param[in] group The callback group for this subscription. NULL for no callback group.
* \param[in] ignore_local_publications True to ignore local publications.
* \param[in] options Additional options to control creation of the subscription.
* \param[in] msg_mem_strat The message memory strategy to use for allocating messages.
* \param[in] allocator Optional custom allocator.
* \return Shared pointer to the created subscription.
*/
/* TODO(jacquelinekay):
Expand All @@ -228,13 +179,11 @@ class Node : public std::enable_shared_from_this<Node>
create_subscription(
const std::string & topic_name,
CallbackT && callback,
size_t qos_history_depth,
rclcpp::callback_group::CallbackGroup::SharedPtr group = nullptr,
bool ignore_local_publications = false,
const SubscriptionOptions<Alloc> & options = SubscriptionOptions<Alloc>(),
typename rclcpp::message_memory_strategy::MessageMemoryStrategy<
typename rclcpp::subscription_traits::has_message_type<CallbackT>::type, Alloc>::SharedPtr
msg_mem_strat = nullptr,
std::shared_ptr<Alloc> allocator = nullptr);
msg_mem_strat = nullptr);

/// Create a timer.
/**
Expand Down Expand Up @@ -600,6 +549,18 @@ class Node : public std::enable_shared_from_this<Node>
const NodeOptions &
get_node_options() const;


/// Manually assert that this Node is alive (for RMW_QOS_POLICY_MANUAL_BY_NODE)
/**
* If the rmw Liveliness policy is set to RMW_QOS_POLICY_MANUAL_BY_NODE, the creator of this
* node must manually call `assert_liveliness` on a regular basis to signal to the rest of the
* system that this Node is still alive.
* This function must be called at least as often as the qos_profile's liveliness_lease_duration
*/
RCLCPP_PUBLIC
void
assert_liveliness() {}

protected:
/// Construct a sub-node, which will extend the namespace of all entities created with it.
/**
Expand Down
66 changes: 13 additions & 53 deletions rclcpp/include/rclcpp/node_impl.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -51,20 +51,6 @@
namespace rclcpp
{

template<typename MessageT, typename Alloc, typename PublisherT>
std::shared_ptr<PublisherT>
Node::create_publisher(
const std::string & topic_name, size_t qos_history_depth,
std::shared_ptr<Alloc> allocator)
{
if (!allocator) {
allocator = std::make_shared<Alloc>();
}
rmw_qos_profile_t qos = rmw_qos_profile_default;
qos.depth = qos_history_depth;
return this->create_publisher<MessageT, Alloc, PublisherT>(topic_name, qos, allocator);
}

RCLCPP_LOCAL
inline
std::string
Expand All @@ -80,17 +66,21 @@ extend_name_with_sub_namespace(const std::string & name, const std::string & sub
template<typename MessageT, typename Alloc, typename PublisherT>
std::shared_ptr<PublisherT>
Node::create_publisher(
const std::string & topic_name, const rmw_qos_profile_t & qos_profile,
std::shared_ptr<Alloc> allocator)
const std::string & topic_name,
rclcpp::callback_group::CallbackGroup::SharedPtr group,
const PublisherOptions<Alloc> & options)
{
std::shared_ptr<Alloc> allocator = options.allocator();
if (!allocator) {
allocator = std::make_shared<Alloc>();
}

return rclcpp::create_publisher<MessageT, Alloc, PublisherT>(
this->node_topics_.get(),
extend_name_with_sub_namespace(topic_name, this->get_sub_namespace()),
qos_profile,
options.qos_profile(),
options.event_callbacks(),
group,
this->get_node_options().use_intra_process_comms(),
allocator);
}
Expand All @@ -104,16 +94,15 @@ std::shared_ptr<SubscriptionT>
Node::create_subscription(
const std::string & topic_name,
CallbackT && callback,
const rmw_qos_profile_t & qos_profile,
rclcpp::callback_group::CallbackGroup::SharedPtr group,
bool ignore_local_publications,
const SubscriptionOptions<Alloc> & options,
typename rclcpp::message_memory_strategy::MessageMemoryStrategy<
typename rclcpp::subscription_traits::has_message_type<CallbackT>::type, Alloc>::SharedPtr
msg_mem_strat,
std::shared_ptr<Alloc> allocator)
msg_mem_strat)
{
using CallbackMessageT = typename rclcpp::subscription_traits::has_message_type<CallbackT>::type;

std::shared_ptr<Alloc> allocator = options.allocator();
if (!allocator) {
allocator = std::make_shared<Alloc>();
}
Expand All @@ -127,44 +116,15 @@ Node::create_subscription(
this->node_topics_.get(),
extend_name_with_sub_namespace(topic_name, this->get_sub_namespace()),
std::forward<CallbackT>(callback),
qos_profile,
options.qos_profile(),
options.event_callbacks(),
group,
ignore_local_publications,
options.ignore_local_publications(),
this->get_node_options().use_intra_process_comms(),
msg_mem_strat,
allocator);
}

template<
typename MessageT,
typename CallbackT,
typename Alloc,
typename SubscriptionT>
std::shared_ptr<SubscriptionT>
Node::create_subscription(
const std::string & topic_name,
CallbackT && callback,
size_t qos_history_depth,
rclcpp::callback_group::CallbackGroup::SharedPtr group,
bool ignore_local_publications,
typename rclcpp::message_memory_strategy::MessageMemoryStrategy<
typename rclcpp::subscription_traits::has_message_type<CallbackT>::type, Alloc>::SharedPtr
msg_mem_strat,
std::shared_ptr<Alloc> allocator)
{
rmw_qos_profile_t qos = rmw_qos_profile_default;
qos.depth = qos_history_depth;

return this->create_subscription<MessageT>(
topic_name,
std::forward<CallbackT>(callback),
qos,
group,
ignore_local_publications,
msg_mem_strat,
allocator);
}

template<typename DurationRepT, typename DurationT, typename CallbackT>
typename rclcpp::WallTimer<CallbackT>::SharedPtr
Node::create_wall_timer(
Expand Down
3 changes: 2 additions & 1 deletion rclcpp/include/rclcpp/node_interfaces/node_topics.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,8 @@ class NodeTopics : public NodeTopicsInterface
virtual
void
add_publisher(
rclcpp::PublisherBase::SharedPtr publisher);
rclcpp::PublisherBase::SharedPtr publisher,
rclcpp::callback_group::CallbackGroup::SharedPtr callback_group);

RCLCPP_PUBLIC
virtual
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,8 @@ class NodeTopicsInterface
virtual
void
add_publisher(
rclcpp::PublisherBase::SharedPtr publisher) = 0;
rclcpp::PublisherBase::SharedPtr publisher,
rclcpp::callback_group::CallbackGroup::SharedPtr callback_group) = 0;

RCLCPP_PUBLIC
virtual
Expand Down
1 change: 1 addition & 0 deletions rclcpp/include/rclcpp/parameter_client.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -127,6 +127,7 @@ class AsyncParametersClient
"parameter_events",
std::forward<CallbackT>(callback),
rmw_qos_profile_default,
SubscriptionEventCallbacks(),
nullptr, // group,
false, // ignore_local_publications,
false, // use_intra_process_comms_,
Expand Down
Loading