diff --git a/rclcpp/include/rclcpp/experimental/intra_process_manager.hpp b/rclcpp/include/rclcpp/experimental/intra_process_manager.hpp index e5d7fa39b1..b37c7f5b5a 100644 --- a/rclcpp/include/rclcpp/experimental/intra_process_manager.hpp +++ b/rclcpp/include/rclcpp/experimental/intra_process_manager.hpp @@ -176,8 +176,10 @@ class IntraProcessManager */ template< typename MessageT, + typename ROSMessageType, typename Alloc = std::allocator, - typename Deleter = std::default_delete> + typename Deleter = std::default_delete + > void do_intra_process_publish( uint64_t intra_process_publisher_id, @@ -203,7 +205,7 @@ class IntraProcessManager // None of the buffers require ownership, so we promote the pointer std::shared_ptr msg = std::move(message); - this->template add_shared_msg_to_buffers( + this->template add_shared_msg_to_buffers( msg, sub_ids.take_shared_subscriptions); } else if (!sub_ids.take_ownership_subscriptions.empty() && // NOLINT sub_ids.take_shared_subscriptions.size() <= 1) @@ -214,8 +216,7 @@ class IntraProcessManager concatenated_vector.end(), sub_ids.take_ownership_subscriptions.begin(), sub_ids.take_ownership_subscriptions.end()); - - this->template add_owned_msg_to_buffers( + this->template add_owned_msg_to_buffers( std::move(message), concatenated_vector, allocator); @@ -226,17 +227,19 @@ class IntraProcessManager // for the buffers that do not require ownership auto shared_msg = std::allocate_shared(allocator, *message); - this->template add_shared_msg_to_buffers( + this->template add_shared_msg_to_buffers( shared_msg, sub_ids.take_shared_subscriptions); - this->template add_owned_msg_to_buffers( + this->template add_owned_msg_to_buffers( std::move(message), sub_ids.take_ownership_subscriptions, allocator); } } template< typename MessageT, + typename ROSMessageType, typename Alloc = std::allocator, - typename Deleter = std::default_delete> + typename Deleter = std::default_delete + > std::shared_ptr do_intra_process_publish_and_return_shared( uint64_t intra_process_publisher_id, @@ -262,7 +265,7 @@ class IntraProcessManager // If there are no owning, just convert to shared. std::shared_ptr shared_msg = std::move(message); if (!sub_ids.take_shared_subscriptions.empty()) { - this->template add_shared_msg_to_buffers( + this->template add_shared_msg_to_buffers( shared_msg, sub_ids.take_shared_subscriptions); } return shared_msg; @@ -272,12 +275,12 @@ class IntraProcessManager auto shared_msg = std::allocate_shared(allocator, *message); if (!sub_ids.take_shared_subscriptions.empty()) { - this->template add_shared_msg_to_buffers( + this->template add_shared_msg_to_buffers( shared_msg, sub_ids.take_shared_subscriptions); } if (!sub_ids.take_ownership_subscriptions.empty()) { - this->template add_owned_msg_to_buffers( + this->template add_owned_msg_to_buffers( std::move(message), sub_ids.take_ownership_subscriptions, allocator); @@ -334,14 +337,22 @@ class IntraProcessManager template< typename MessageT, typename Alloc, - typename Deleter> + typename Deleter, + typename ROSMessageType> void add_shared_msg_to_buffers( std::shared_ptr message, std::vector subscription_ids) { using PublishedType = typename rclcpp::TypeAdapter::custom_type; - using ROSMessageType = typename rclcpp::TypeAdapter::ros_message_type; + using ROSMessageTypeAllocatorTraits = allocator::AllocRebind; + using ROSMessageTypeAllocator = typename ROSMessageTypeAllocatorTraits::allocator_type; + using ROSMessageTypeDeleter = allocator::Deleter; + + using PublishedTypeAllocatorTraits = allocator::AllocRebind; + using PublishedTypeAllocator = typename PublishedTypeAllocatorTraits::allocator_type; + using PublishedTypeDeleter = allocator::Deleter; + for (auto id : subscription_ids) { auto subscription_it = subscriptions_.find(id); @@ -351,11 +362,13 @@ class IntraProcessManager auto subscription_base = subscription_it->second.lock(); if (subscription_base) { auto subscription = std::dynamic_pointer_cast< - rclcpp::experimental::SubscriptionIntraProcessBuffer + rclcpp::experimental::SubscriptionIntraProcessBuffer >(subscription_base); if (nullptr == subscription) { auto ros_message_subscription = std::dynamic_pointer_cast< - rclcpp::experimental::ROSMessageIntraProcessBuffer + rclcpp::experimental::ROSMessageIntraProcessBuffer >(subscription_base); if (nullptr == ros_message_subscription) { @@ -368,9 +381,22 @@ class IntraProcessManager if constexpr (rclcpp::TypeAdapter::is_specialized::value) { ROSMessageType ros_msg; rclcpp::TypeAdapter::convert_to_ros_message(*message, ros_msg); - ros_message_subscription->provide_intra_process_message(ros_msg); + ros_message_subscription->provide_intra_process_message( + std::make_shared(ros_msg)); } else { - ros_message_subscription->provide_intra_process_message(message); + if constexpr (std::is_same::value) { + ros_message_subscription->provide_intra_process_message(message); + } else { + if constexpr (std::is_same::ros_message_type, ROSMessageType>::value) + { + ROSMessageType ros_msg; + rclcpp::TypeAdapter::convert_to_ros_message( + *message, ros_msg); + ros_message_subscription->provide_intra_process_message( + std::make_shared(ros_msg)); + } + } } } } else { @@ -385,7 +411,8 @@ class IntraProcessManager template< typename MessageT, typename Alloc = std::allocator, - typename Deleter = std::default_delete> + typename Deleter = std::default_delete, + typename ROSMessageType = MessageT> void add_owned_msg_to_buffers( std::unique_ptr message, @@ -395,7 +422,14 @@ class IntraProcessManager using MessageAllocTraits = allocator::AllocRebind; using MessageUniquePtr = std::unique_ptr; using PublishedType = typename rclcpp::TypeAdapter::custom_type; - using ROSMessageType = typename rclcpp::TypeAdapter::ros_message_type; + + using ROSMessageTypeAllocatorTraits = allocator::AllocRebind; + using ROSMessageTypeAllocator = typename ROSMessageTypeAllocatorTraits::allocator_type; + using ROSMessageTypeDeleter = allocator::Deleter; + + using PublishedTypeAllocatorTraits = allocator::AllocRebind; + using PublishedTypeAllocator = typename PublishedTypeAllocatorTraits::allocator_type; + using PublishedTypeDeleter = allocator::Deleter; for (auto it = subscription_ids.begin(); it != subscription_ids.end(); it++) { auto subscription_it = subscriptions_.find(*it); @@ -405,41 +439,44 @@ class IntraProcessManager auto subscription_base = subscription_it->second.lock(); if (subscription_base) { auto subscription = std::dynamic_pointer_cast< - rclcpp::experimental::SubscriptionIntraProcessBuffer + rclcpp::experimental::SubscriptionIntraProcessBuffer >(subscription_base); if (nullptr == subscription) { auto ros_message_subscription = std::dynamic_pointer_cast< - rclcpp::experimental::ROSMessageIntraProcessBuffer + rclcpp::experimental::ROSMessageIntraProcessBuffer >(subscription_base); if (nullptr == ros_message_subscription) { throw std::runtime_error( - "failed to dynamic cast SubscriptionIntraProcessBase to " + "--failed to dynamic cast SubscriptionIntraProcessBase to " "SubscriptionIntraProcessBuffer, which " "can happen when the publisher and subscription use different " "allocator types, which is not supported"); } else { if constexpr (rclcpp::TypeAdapter::is_specialized::value) { - using ROSMessageTypeAllocatorTraits = allocator::AllocRebind; auto ptr = ROSMessageTypeAllocatorTraits::allocate(allocator, 1); ROSMessageTypeAllocatorTraits::construct(allocator, ptr); - Deleter deleter = message.get_deleter(); - auto ros_msg = std::unique_ptr(ptr, deleter); - rclcpp::TypeAdapter::convert_to_ros_message(message, ros_msg); + ROSMessageTypeDeleter deleter = message.get_deleter(); + rclcpp::TypeAdapter::convert_to_ros_message(message, *ptr); + auto ros_msg = std::unique_ptr(ptr, deleter); ros_message_subscription->provide_intra_process_message(std::move(ros_msg)); } else { - if (std::next(it) == subscription_ids.end()) { - // If this is the last subscription, give up ownership - ros_message_subscription->provide_intra_process_message(std::move(message)); - } else { - // Copy the message since we have additional subscriptions to serve - MessageUniquePtr copy_message; - Deleter deleter = message.get_deleter(); - auto ptr = MessageAllocTraits::allocate(allocator, 1); - MessageAllocTraits::construct(allocator, ptr, *message); - copy_message = MessageUniquePtr(ptr, deleter); - - ros_message_subscription->provide_intra_process_message(std::move(copy_message)); + if constexpr (std::is_same::value) { + if (std::next(it) == subscription_ids.end()) { + // If this is the last subscription, give up ownership + ros_message_subscription->provide_intra_process_message(std::move(message)); + } else { + // Copy the message since we have additional subscriptions to serve + MessageUniquePtr copy_message; + Deleter deleter = message.get_deleter(); + auto ptr = MessageAllocTraits::allocate(allocator, 1); + MessageAllocTraits::construct(allocator, ptr, *message); + copy_message = MessageUniquePtr(ptr, deleter); + + ros_message_subscription->provide_intra_process_message(std::move(copy_message)); + } } } } diff --git a/rclcpp/include/rclcpp/experimental/subscription_intra_process.hpp b/rclcpp/include/rclcpp/experimental/subscription_intra_process.hpp index ee0bd8f163..803d940086 100644 --- a/rclcpp/include/rclcpp/experimental/subscription_intra_process.hpp +++ b/rclcpp/include/rclcpp/experimental/subscription_intra_process.hpp @@ -41,20 +41,24 @@ namespace experimental template< typename MessageT, typename SubscribedType, - typename SubscribedTypeAlloc = std::allocator, - typename SubscribedTypeDeleter = std::default_delete + typename SubscribedTypeAlloc = std::allocator, + typename SubscribedTypeDeleter = std::default_delete, + typename ROSMessageType = SubscribedType, + typename Alloc = std::allocator > class SubscriptionIntraProcess : public SubscriptionIntraProcessBuffer< SubscribedType, SubscribedTypeAlloc, - SubscribedTypeDeleter + SubscribedTypeDeleter, + ROSMessageType > { using SubscriptionIntraProcessBufferT = SubscriptionIntraProcessBuffer< SubscribedType, SubscribedTypeAlloc, - SubscribedTypeDeleter + SubscribedTypeDeleter, + ROSMessageType >; public: @@ -68,15 +72,15 @@ class SubscriptionIntraProcess using BufferUniquePtr = typename SubscriptionIntraProcessBufferT::BufferUniquePtr; SubscriptionIntraProcess( - AnySubscriptionCallback callback, - std::shared_ptr allocator, + AnySubscriptionCallback callback, + std::shared_ptr allocator, rclcpp::Context::SharedPtr context, const std::string & topic_name, const rclcpp::QoS & qos_profile, rclcpp::IntraProcessBufferType buffer_type) : SubscriptionIntraProcessBuffer( - allocator, + SubscribedTypeDeleter, ROSMessageType>( + std::make_shared(*allocator), context, topic_name, qos_profile, @@ -154,7 +158,7 @@ class SubscriptionIntraProcess shared_ptr.reset(); } - AnySubscriptionCallback any_callback_; + AnySubscriptionCallback any_callback_; }; } // namespace experimental diff --git a/rclcpp/include/rclcpp/experimental/subscription_intra_process_buffer.hpp b/rclcpp/include/rclcpp/experimental/subscription_intra_process_buffer.hpp index 43a834503d..b8883cff71 100644 --- a/rclcpp/include/rclcpp/experimental/subscription_intra_process_buffer.hpp +++ b/rclcpp/include/rclcpp/experimental/subscription_intra_process_buffer.hpp @@ -38,14 +38,16 @@ namespace experimental template< typename SubscribedType, - typename Alloc = std::allocator, + typename Alloc = std::allocator, typename Deleter = std::default_delete, /// MessageT::ros_message_type if MessageT is a TypeAdapter, /// otherwise just MessageT. - typename ROSMessageType = typename rclcpp::TypeAdapter::ros_message_type + typename ROSMessageType = SubscribedType > -class SubscriptionIntraProcessBuffer : public ROSMessageIntraProcessBuffer +class SubscriptionIntraProcessBuffer : public ROSMessageIntraProcessBuffer::allocator_type, + allocator::Deleter::allocator_type, + ROSMessageType>> { public: RCLCPP_SMART_PTR_DEFINITIONS(SubscriptionIntraProcessBuffer) @@ -76,8 +78,8 @@ class SubscriptionIntraProcessBuffer : public ROSMessageIntraProcessBuffer(context, topic_name, - qos_profile), + : ROSMessageIntraProcessBuffer( + context, topic_name, qos_profile), subscribed_type_allocator_(*allocator) { allocator::set_allocator_for_deleter(&subscribed_type_deleter_, &subscribed_type_allocator_); @@ -87,7 +89,7 @@ class SubscriptionIntraProcessBuffer : public ROSMessageIntraProcessBuffer( buffer_type, qos_profile, - allocator); + std::make_shared(subscribed_type_allocator_)); } bool diff --git a/rclcpp/include/rclcpp/publisher.hpp b/rclcpp/include/rclcpp/publisher.hpp index aaeedf3768..407e8266bc 100644 --- a/rclcpp/include/rclcpp/publisher.hpp +++ b/rclcpp/include/rclcpp/publisher.hpp @@ -81,7 +81,6 @@ class Publisher : public PublisherBase /// MessageT::custom_type if MessageT is a TypeAdapter, otherwise just MessageT. using PublishedType = typename rclcpp::TypeAdapter::custom_type; - /// MessageT::ros_message_type if MessageT is a TypeAdapter, otherwise just MessageT. using ROSMessageType = typename rclcpp::TypeAdapter::ros_message_type; using PublishedTypeAllocatorTraits = allocator::AllocRebind; @@ -324,7 +323,26 @@ class Publisher : public PublisherBase // process manager and let it decide if it needs to be converted or not. // For now, convert it unconditionally and pass it the ROSMessageType // publish function specialization. - this->do_intra_process_publish(std::move(msg)); + + // Avoid allocating when not using intra process. + if (!intra_process_is_enabled_) { + // In this case we're not using intra process. + ROSMessageType ros_msg; + rclcpp::TypeAdapter::convert_to_ros_message(*msg, ros_msg); + return this->do_inter_process_publish(ros_msg); + } + + bool inter_process_publish_needed = + get_subscription_count() > get_intra_process_subscription_count(); + + if (inter_process_publish_needed) { + auto shared_msg = this->do_intra_process_publish_and_return_shared(std::move(msg)); + ROSMessageType ros_msg; + rclcpp::TypeAdapter::convert_to_ros_message(*msg, ros_msg); + this->do_inter_process_publish(ros_msg); + } else { + this->do_intra_process_publish(std::move(msg)); + } } /// Publish a message on the topic. @@ -501,7 +519,7 @@ class Publisher : public PublisherBase throw std::runtime_error("cannot publish msg which is a null pointer"); } - ipm->template do_intra_process_publish( + ipm->template do_intra_process_publish( intra_process_publisher_id_, std::move(msg), published_type_allocator_); @@ -519,7 +537,7 @@ class Publisher : public PublisherBase throw std::runtime_error("cannot publish msg which is a null pointer"); } - ipm->template do_intra_process_publish( + ipm->template do_intra_process_publish( intra_process_publisher_id_, std::move(msg), ros_message_type_allocator_); @@ -540,7 +558,7 @@ class Publisher : public PublisherBase } return ipm->template do_intra_process_publish_and_return_shared( + ROSMessageType, AllocatorT>( intra_process_publisher_id_, std::move(msg), published_type_allocator_); @@ -559,7 +577,7 @@ class Publisher : public PublisherBase throw std::runtime_error("cannot publish msg which is a null pointer"); } - return ipm->template do_intra_process_publish_and_return_sharedtemplate do_intra_process_publish_and_return_shared( intra_process_publisher_id_, std::move(msg), diff --git a/rclcpp/include/rclcpp/subscription.hpp b/rclcpp/include/rclcpp/subscription.hpp index 376072c92c..acbc513dad 100644 --- a/rclcpp/include/rclcpp/subscription.hpp +++ b/rclcpp/include/rclcpp/subscription.hpp @@ -408,8 +408,10 @@ class Subscription : public SubscriptionBase using SubscriptionIntraProcessT = rclcpp::experimental::SubscriptionIntraProcess< MessageT, SubscribedType, - AllocatorT, - SubscribedTypeDeleter>; + SubscribedTypeAllocator, + SubscribedTypeDeleter, + ROSMessageT, + AllocatorT>; std::shared_ptr subscription_intra_process_; }; diff --git a/rclcpp/test/rclcpp/CMakeLists.txt b/rclcpp/test/rclcpp/CMakeLists.txt index e60b1636b8..c8292a530b 100644 --- a/rclcpp/test/rclcpp/CMakeLists.txt +++ b/rclcpp/test/rclcpp/CMakeLists.txt @@ -364,41 +364,41 @@ if(WIN32) set(append_library_dirs "${append_library_dirs}/$") endif() -# ament_add_gtest(test_publisher_with_type_adapter test_publisher_with_type_adapter.cpp -# APPEND_LIBRARY_DIRS "${append_library_dirs}" -# ) -# if(TARGET test_publisher_with_type_adapter) -# ament_target_dependencies(test_publisher_with_type_adapter -# "rcutils" -# "rcl_interfaces" -# "rmw" -# "rosidl_runtime_cpp" -# "rosidl_typesupport_cpp" -# "test_msgs" -# ) -# target_link_libraries(test_publisher_with_type_adapter -# ${PROJECT_NAME} -# mimick -# ${cpp_typesupport_target}) -# endif() - -# ament_add_gtest(test_subscription_with_type_adapter test_subscription_with_type_adapter.cpp -# APPEND_LIBRARY_DIRS "${append_library_dirs}" -# ) -# if(TARGET test_subscription_with_type_adapter) -# ament_target_dependencies(test_subscription_with_type_adapter -# "rcutils" -# "rcl_interfaces" -# "rmw" -# "rosidl_runtime_cpp" -# "rosidl_typesupport_cpp" -# "test_msgs" -# ) -# target_link_libraries(test_subscription_with_type_adapter -# ${PROJECT_NAME} -# mimick -# ${cpp_typesupport_target}) -# endif() +ament_add_gtest(test_publisher_with_type_adapter test_publisher_with_type_adapter.cpp + APPEND_LIBRARY_DIRS "${append_library_dirs}" +) +if(TARGET test_publisher_with_type_adapter) + ament_target_dependencies(test_publisher_with_type_adapter + "rcutils" + "rcl_interfaces" + "rmw" + "rosidl_runtime_cpp" + "rosidl_typesupport_cpp" + "test_msgs" + ) + target_link_libraries(test_publisher_with_type_adapter + ${PROJECT_NAME} + mimick + ${cpp_typesupport_target}) +endif() + +ament_add_gtest(test_subscription_with_type_adapter test_subscription_with_type_adapter.cpp + APPEND_LIBRARY_DIRS "${append_library_dirs}" +) +if(TARGET test_subscription_with_type_adapter) + ament_target_dependencies(test_subscription_with_type_adapter + "rcutils" + "rcl_interfaces" + "rmw" + "rosidl_runtime_cpp" + "rosidl_typesupport_cpp" + "test_msgs" + ) + target_link_libraries(test_subscription_with_type_adapter + ${PROJECT_NAME} + mimick + ${cpp_typesupport_target}) +endif() ament_add_gtest(test_subscription_publisher_with_same_type_adapter test_subscription_publisher_with_same_type_adapter.cpp APPEND_LIBRARY_DIRS "${append_library_dirs}" diff --git a/rclcpp/test/rclcpp/test_intra_process_manager.cpp b/rclcpp/test/rclcpp/test_intra_process_manager.cpp index 45b781995d..47fc0aa83b 100644 --- a/rclcpp/test/rclcpp/test_intra_process_manager.cpp +++ b/rclcpp/test/rclcpp/test_intra_process_manager.cpp @@ -222,7 +222,9 @@ class SubscriptionIntraProcessBase template< typename MessageT, typename Alloc = std::allocator, - typename Deleter = std::default_delete> + typename Deleter = std::default_delete, + typename ROSMessageType = MessageT +> class SubscriptionIntraProcessBuffer : public SubscriptionIntraProcessBase { public: @@ -279,7 +281,7 @@ class SubscriptionIntraProcessBuffer : public SubscriptionIntraProcessBase template< typename MessageT, - typename Alloc = std::allocator, + typename Alloc = std::allocator, typename Deleter = std::default_delete> class SubscriptionIntraProcess : public SubscriptionIntraProcessBuffer< MessageT, @@ -340,7 +342,7 @@ void Publisher::publish(MessageUniquePtr msg) throw std::runtime_error("cannot publish msg which is a null pointer"); } - ipm->template do_intra_process_publish( + ipm->template do_intra_process_publish( intra_process_publisher_id_, std::move(msg), *message_allocator_);