From 55f58bb719ba65361774e7c5f49eb512cbf8c5bc Mon Sep 17 00:00:00 2001 From: Miguel Company Date: Mon, 9 Aug 2021 16:19:08 +0200 Subject: [PATCH] [Galactic] Loan messages implementation (#547) * Loan messages implementation (#523) * Added is_plain_ attribute to base TypeSupport. * Added new methods to base TypeSupport. * Implementation of rmw_borrow_loaned_message. * Implementation of rmw_return_loaned_message_from_publisher. * Enable loan messages on publishers of plain types. * Implementation for taking loaned messages. * Enable loan messages on subscriptions of plain types. Signed-off-by: Miguel Company Co-authored-by: Michel Hidalgo * Changes to work with galactic type support. Signed-off-by: Miguel Company * Improve return code. Signed-off-by: Miguel Company * Prepare type size for alignment requirements. Signed-off-by: Miguel Company * Add comments on alignment code. Signed-off-by: Miguel Company Co-authored-by: Michel Hidalgo --- rmw_fastrtps_cpp/src/publisher.cpp | 7 +- rmw_fastrtps_cpp/src/rmw_client.cpp | 4 + rmw_fastrtps_cpp/src/rmw_publish.cpp | 8 +- rmw_fastrtps_cpp/src/rmw_publisher.cpp | 16 +-- rmw_fastrtps_cpp/src/rmw_service.cpp | 4 + rmw_fastrtps_cpp/src/rmw_take.cpp | 30 ++-- rmw_fastrtps_cpp/src/subscription.cpp | 5 +- rmw_fastrtps_cpp/src/type_support_common.cpp | 2 + .../MessageTypeSupport_impl.hpp | 2 + .../ServiceTypeSupport_impl.hpp | 4 + rmw_fastrtps_dynamic_cpp/src/publisher.cpp | 6 +- rmw_fastrtps_dynamic_cpp/src/rmw_client.cpp | 4 + rmw_fastrtps_dynamic_cpp/src/rmw_publish.cpp | 8 +- .../src/rmw_publisher.cpp | 16 +-- rmw_fastrtps_dynamic_cpp/src/rmw_service.cpp | 4 + rmw_fastrtps_dynamic_cpp/src/rmw_take.cpp | 31 ++-- rmw_fastrtps_dynamic_cpp/src/subscription.cpp | 5 +- .../src/type_support_proxy.cpp | 1 + .../rmw_fastrtps_shared_cpp/TypeSupport.hpp | 18 +++ .../custom_subscriber_info.hpp | 7 + .../rmw_fastrtps_shared_cpp/rmw_common.hpp | 39 +++++ .../rmw_fastrtps_shared_cpp/subscription.hpp | 5 + rmw_fastrtps_shared_cpp/src/rmw_publish.cpp | 31 ++++ rmw_fastrtps_shared_cpp/src/rmw_publisher.cpp | 55 +++++++ rmw_fastrtps_shared_cpp/src/rmw_take.cpp | 136 ++++++++++++++++++ 25 files changed, 365 insertions(+), 83 deletions(-) diff --git a/rmw_fastrtps_cpp/src/publisher.cpp b/rmw_fastrtps_cpp/src/publisher.cpp index a4b80fa30..db401b761 100644 --- a/rmw_fastrtps_cpp/src/publisher.cpp +++ b/rmw_fastrtps_cpp/src/publisher.cpp @@ -49,6 +49,8 @@ #include "type_support_common.hpp" +using DataSharingKind = eprosima::fastdds::dds::DataSharingKind; + rmw_publisher_t * rmw_fastrtps_cpp::create_publisher( const CustomParticipantInfo * participant_info, @@ -248,6 +250,8 @@ rmw_fastrtps_cpp::create_publisher( writer_qos.endpoint().history_memory_policy = eprosima::fastrtps::rtps::PREALLOCATED_WITH_REALLOC_MEMORY_MODE; + + writer_qos.data_sharing().off(); } // Get QoS from RMW @@ -291,7 +295,8 @@ rmw_fastrtps_cpp::create_publisher( rmw_publisher_free(rmw_publisher); }); - rmw_publisher->can_loan_messages = false; + bool has_data_sharing = DataSharingKind::OFF != writer_qos.data_sharing().kind(); + rmw_publisher->can_loan_messages = has_data_sharing && info->type_support_->is_plain(); rmw_publisher->implementation_identifier = eprosima_fastrtps_identifier; rmw_publisher->data = info; diff --git a/rmw_fastrtps_cpp/src/rmw_client.cpp b/rmw_fastrtps_cpp/src/rmw_client.cpp index 171fbdc50..d10a61f41 100644 --- a/rmw_fastrtps_cpp/src/rmw_client.cpp +++ b/rmw_fastrtps_cpp/src/rmw_client.cpp @@ -311,6 +311,8 @@ rmw_create_client( if (!participant_info->leave_middleware_default_qos) { reader_qos.endpoint().history_memory_policy = eprosima::fastrtps::rtps::PREALLOCATED_WITH_REALLOC_MEMORY_MODE; + + reader_qos.data_sharing().off(); } if (!get_datareader_qos(*qos_policies, reader_qos)) { @@ -359,6 +361,8 @@ rmw_create_client( writer_qos.endpoint().history_memory_policy = eprosima::fastrtps::rtps::PREALLOCATED_WITH_REALLOC_MEMORY_MODE; + + writer_qos.data_sharing().off(); } if (!get_datawriter_qos(*qos_policies, writer_qos)) { diff --git a/rmw_fastrtps_cpp/src/rmw_publish.cpp b/rmw_fastrtps_cpp/src/rmw_publish.cpp index f51354ced..9a7331f26 100644 --- a/rmw_fastrtps_cpp/src/rmw_publish.cpp +++ b/rmw_fastrtps_cpp/src/rmw_publish.cpp @@ -51,11 +51,7 @@ rmw_publish_loaned_message( void * ros_message, rmw_publisher_allocation_t * allocation) { - (void) publisher; - (void) ros_message; - (void) allocation; - - RMW_SET_ERROR_MSG("rmw_publish_loaned_message not implemented for rmw_fastrtps_cpp"); - return RMW_RET_UNSUPPORTED; + return rmw_fastrtps_shared_cpp::__rmw_publish_loaned_message( + eprosima_fastrtps_identifier, publisher, ros_message, allocation); } } // extern "C" diff --git a/rmw_fastrtps_cpp/src/rmw_publisher.cpp b/rmw_fastrtps_cpp/src/rmw_publisher.cpp index 81ad0c1e7..6ee0fd154 100644 --- a/rmw_fastrtps_cpp/src/rmw_publisher.cpp +++ b/rmw_fastrtps_cpp/src/rmw_publisher.cpp @@ -169,12 +169,8 @@ rmw_borrow_loaned_message( const rosidl_message_type_support_t * type_support, void ** ros_message) { - (void) publisher; - (void) type_support; - (void) ros_message; - - RMW_SET_ERROR_MSG("rmw_borrow_loaned_message not implemented for rmw_fastrtps_cpp"); - return RMW_RET_UNSUPPORTED; + return rmw_fastrtps_shared_cpp::__rmw_borrow_loaned_message( + eprosima_fastrtps_identifier, publisher, type_support, ros_message); } rmw_ret_t @@ -182,12 +178,8 @@ rmw_return_loaned_message_from_publisher( const rmw_publisher_t * publisher, void * loaned_message) { - (void) publisher; - (void) loaned_message; - - RMW_SET_ERROR_MSG( - "rmw_return_loaned_message_from_publisher not implemented for rmw_fastrtps_cpp"); - return RMW_RET_UNSUPPORTED; + return rmw_fastrtps_shared_cpp::__rmw_return_loaned_message_from_publisher( + eprosima_fastrtps_identifier, publisher, loaned_message); } rmw_ret_t diff --git a/rmw_fastrtps_cpp/src/rmw_service.cpp b/rmw_fastrtps_cpp/src/rmw_service.cpp index 2345b71a6..7ac758fa6 100644 --- a/rmw_fastrtps_cpp/src/rmw_service.cpp +++ b/rmw_fastrtps_cpp/src/rmw_service.cpp @@ -310,6 +310,8 @@ rmw_create_service( if (!participant_info->leave_middleware_default_qos) { reader_qos.endpoint().history_memory_policy = eprosima::fastrtps::rtps::PREALLOCATED_WITH_REALLOC_MEMORY_MODE; + + reader_qos.data_sharing().off(); } if (!get_datareader_qos(*qos_policies, reader_qos)) { @@ -362,6 +364,8 @@ rmw_create_service( writer_qos.endpoint().history_memory_policy = eprosima::fastrtps::rtps::PREALLOCATED_WITH_REALLOC_MEMORY_MODE; + + writer_qos.data_sharing().off(); } if (!get_datawriter_qos(*qos_policies, writer_qos)) { diff --git a/rmw_fastrtps_cpp/src/rmw_take.cpp b/rmw_fastrtps_cpp/src/rmw_take.cpp index e6f39d3c4..797c1c76c 100644 --- a/rmw_fastrtps_cpp/src/rmw_take.cpp +++ b/rmw_fastrtps_cpp/src/rmw_take.cpp @@ -91,13 +91,9 @@ rmw_take_loaned_message( bool * taken, rmw_subscription_allocation_t * allocation) { - (void) subscription; - (void) loaned_message; - (void) taken; - (void) allocation; - - RMW_SET_ERROR_MSG("rmw_take_loaned_message not implemented for rmw_fastrtps_cpp"); - return RMW_RET_UNSUPPORTED; + static_cast(allocation); + return rmw_fastrtps_shared_cpp::__rmw_take_loaned_message_internal( + eprosima_fastrtps_identifier, subscription, loaned_message, taken, nullptr); } rmw_ret_t @@ -108,14 +104,10 @@ rmw_take_loaned_message_with_info( rmw_message_info_t * message_info, rmw_subscription_allocation_t * allocation) { - (void) subscription; - (void) loaned_message; - (void) taken; - (void) message_info; - (void) allocation; - - RMW_SET_ERROR_MSG("rmw_take_loaned_message_with_info not implemented for rmw_fastrtps_cpp"); - return RMW_RET_UNSUPPORTED; + static_cast(allocation); + RMW_CHECK_ARGUMENT_FOR_NULL(message_info, RMW_RET_INVALID_ARGUMENT); + return rmw_fastrtps_shared_cpp::__rmw_take_loaned_message_internal( + eprosima_fastrtps_identifier, subscription, loaned_message, taken, message_info); } rmw_ret_t @@ -123,12 +115,8 @@ rmw_return_loaned_message_from_subscription( const rmw_subscription_t * subscription, void * loaned_message) { - (void) subscription; - (void) loaned_message; - - RMW_SET_ERROR_MSG( - "rmw_return_loaned_message_from_subscription not implemented for rmw_fastrtps_cpp"); - return RMW_RET_UNSUPPORTED; + return rmw_fastrtps_shared_cpp::__rmw_return_loaned_message_from_subscription( + eprosima_fastrtps_identifier, subscription, loaned_message); } rmw_ret_t diff --git a/rmw_fastrtps_cpp/src/subscription.cpp b/rmw_fastrtps_cpp/src/subscription.cpp index 4affb8ec1..557ad0549 100644 --- a/rmw_fastrtps_cpp/src/subscription.cpp +++ b/rmw_fastrtps_cpp/src/subscription.cpp @@ -43,6 +43,7 @@ #include "rmw_fastrtps_shared_cpp/namespace_prefix.hpp" #include "rmw_fastrtps_shared_cpp/qos.hpp" #include "rmw_fastrtps_shared_cpp/rmw_common.hpp" +#include "rmw_fastrtps_shared_cpp/subscription.hpp" #include "rmw_fastrtps_shared_cpp/utils.hpp" #include "rmw_fastrtps_cpp/identifier.hpp" @@ -242,6 +243,8 @@ create_subscription( if (!participant_info->leave_middleware_default_qos) { reader_qos.endpoint().history_memory_policy = eprosima::fastrtps::rtps::PREALLOCATED_WITH_REALLOC_MEMORY_MODE; + + reader_qos.data_sharing().off(); } if (!get_datareader_qos(*qos_policies, reader_qos)) { @@ -324,7 +327,7 @@ create_subscription( return nullptr; } rmw_subscription->options = *subscription_options; - rmw_subscription->can_loan_messages = false; + rmw_fastrtps_shared_cpp::__init_subscription_for_loans(rmw_subscription); topic.should_be_deleted = false; cleanup_rmw_subscription.cancel(); diff --git a/rmw_fastrtps_cpp/src/type_support_common.cpp b/rmw_fastrtps_cpp/src/type_support_common.cpp index df8cfa101..f5fac45c5 100644 --- a/rmw_fastrtps_cpp/src/type_support_common.cpp +++ b/rmw_fastrtps_cpp/src/type_support_common.cpp @@ -47,6 +47,8 @@ void TypeSupport::set_members(const message_type_support_callbacks_t * members) // Total size is encapsulation size + data size m_typeSize = 4 + data_size; + // Account for RTPS submessage alignment + m_typeSize = (m_typeSize + 3) & ~3; } size_t TypeSupport::getEstimatedSerializedSize(const void * ros_message, const void * impl) const diff --git a/rmw_fastrtps_dynamic_cpp/include/rmw_fastrtps_dynamic_cpp/MessageTypeSupport_impl.hpp b/rmw_fastrtps_dynamic_cpp/include/rmw_fastrtps_dynamic_cpp/MessageTypeSupport_impl.hpp index 298e2485f..fef6455cb 100644 --- a/rmw_fastrtps_dynamic_cpp/include/rmw_fastrtps_dynamic_cpp/MessageTypeSupport_impl.hpp +++ b/rmw_fastrtps_dynamic_cpp/include/rmw_fastrtps_dynamic_cpp/MessageTypeSupport_impl.hpp @@ -59,6 +59,8 @@ MessageTypeSupport::MessageTypeSupport( } else { this->m_typeSize++; } + // Account for RTPS submessage alignment + this->m_typeSize = (this->m_typeSize + 3) & ~3; } } // namespace rmw_fastrtps_dynamic_cpp diff --git a/rmw_fastrtps_dynamic_cpp/include/rmw_fastrtps_dynamic_cpp/ServiceTypeSupport_impl.hpp b/rmw_fastrtps_dynamic_cpp/include/rmw_fastrtps_dynamic_cpp/ServiceTypeSupport_impl.hpp index b14959bfb..9f6dd5572 100644 --- a/rmw_fastrtps_dynamic_cpp/include/rmw_fastrtps_dynamic_cpp/ServiceTypeSupport_impl.hpp +++ b/rmw_fastrtps_dynamic_cpp/include/rmw_fastrtps_dynamic_cpp/ServiceTypeSupport_impl.hpp @@ -58,6 +58,8 @@ RequestTypeSupport::RequestTypeSupport( } else { this->m_typeSize++; } + // Account for RTPS submessage alignment + this->m_typeSize = (this->m_typeSize + 3) & ~3; } template @@ -88,6 +90,8 @@ ResponseTypeSupport::ResponseTypeSupport } else { this->m_typeSize++; } + // Account for RTPS submessage alignment + this->m_typeSize = (this->m_typeSize + 3) & ~3; } } // namespace rmw_fastrtps_dynamic_cpp diff --git a/rmw_fastrtps_dynamic_cpp/src/publisher.cpp b/rmw_fastrtps_dynamic_cpp/src/publisher.cpp index e5e51dba7..6235cf776 100644 --- a/rmw_fastrtps_dynamic_cpp/src/publisher.cpp +++ b/rmw_fastrtps_dynamic_cpp/src/publisher.cpp @@ -49,6 +49,7 @@ #include "type_support_common.hpp" #include "type_support_registry.hpp" +using DataSharingKind = eprosima::fastdds::dds::DataSharingKind; using TypeSupportProxy = rmw_fastrtps_dynamic_cpp::TypeSupportProxy; rmw_publisher_t * @@ -263,6 +264,8 @@ rmw_fastrtps_dynamic_cpp::create_publisher( writer_qos.endpoint().history_memory_policy = eprosima::fastrtps::rtps::PREALLOCATED_WITH_REALLOC_MEMORY_MODE; + + writer_qos.data_sharing().off(); } // Get QoS from RMW @@ -306,7 +309,8 @@ rmw_fastrtps_dynamic_cpp::create_publisher( rmw_publisher_free(rmw_publisher); }); - rmw_publisher->can_loan_messages = false; + bool has_data_sharing = DataSharingKind::OFF != writer_qos.data_sharing().kind(); + rmw_publisher->can_loan_messages = has_data_sharing && info->type_support_->is_plain(); rmw_publisher->implementation_identifier = eprosima_fastrtps_identifier; rmw_publisher->data = info; diff --git a/rmw_fastrtps_dynamic_cpp/src/rmw_client.cpp b/rmw_fastrtps_dynamic_cpp/src/rmw_client.cpp index 696dab61b..0f8a82391 100644 --- a/rmw_fastrtps_dynamic_cpp/src/rmw_client.cpp +++ b/rmw_fastrtps_dynamic_cpp/src/rmw_client.cpp @@ -342,6 +342,8 @@ rmw_create_client( if (!participant_info->leave_middleware_default_qos) { reader_qos.endpoint().history_memory_policy = eprosima::fastrtps::rtps::PREALLOCATED_WITH_REALLOC_MEMORY_MODE; + + reader_qos.data_sharing().off(); } if (!get_datareader_qos(*qos_policies, reader_qos)) { @@ -390,6 +392,8 @@ rmw_create_client( writer_qos.endpoint().history_memory_policy = eprosima::fastrtps::rtps::PREALLOCATED_WITH_REALLOC_MEMORY_MODE; + + writer_qos.data_sharing().off(); } if (!get_datawriter_qos(*qos_policies, writer_qos)) { diff --git a/rmw_fastrtps_dynamic_cpp/src/rmw_publish.cpp b/rmw_fastrtps_dynamic_cpp/src/rmw_publish.cpp index a9d09e3b2..c3c396577 100644 --- a/rmw_fastrtps_dynamic_cpp/src/rmw_publish.cpp +++ b/rmw_fastrtps_dynamic_cpp/src/rmw_publish.cpp @@ -41,12 +41,8 @@ rmw_publish_loaned_message( void * ros_message, rmw_publisher_allocation_t * allocation) { - (void) publisher; - (void) ros_message; - (void) allocation; - - RMW_SET_ERROR_MSG("rmw_publish_loaned_message is not implemented for rmw_fastrtps_dynamic_cpp"); - return RMW_RET_UNSUPPORTED; + return rmw_fastrtps_shared_cpp::__rmw_publish_loaned_message( + eprosima_fastrtps_identifier, publisher, ros_message, allocation); } rmw_ret_t diff --git a/rmw_fastrtps_dynamic_cpp/src/rmw_publisher.cpp b/rmw_fastrtps_dynamic_cpp/src/rmw_publisher.cpp index 596bdded6..1559b42be 100644 --- a/rmw_fastrtps_dynamic_cpp/src/rmw_publisher.cpp +++ b/rmw_fastrtps_dynamic_cpp/src/rmw_publisher.cpp @@ -170,12 +170,8 @@ rmw_borrow_loaned_message( const rosidl_message_type_support_t * type_support, void ** ros_message) { - (void) publisher; - (void) type_support; - (void) ros_message; - - RMW_SET_ERROR_MSG("rmw_borrow_loaned_message is not implemented for rmw_fastrtps_dynamic_cpp"); - return RMW_RET_UNSUPPORTED; + return rmw_fastrtps_shared_cpp::__rmw_borrow_loaned_message( + eprosima_fastrtps_identifier, publisher, type_support, ros_message); } rmw_ret_t @@ -183,12 +179,8 @@ rmw_return_loaned_message_from_publisher( const rmw_publisher_t * publisher, void * loaned_message) { - (void) publisher; - (void) loaned_message; - - RMW_SET_ERROR_MSG( - "rmw_return_loaned_message_from_publisher is not implemented for rmw_fastrtps_dynamic_cpp"); - return RMW_RET_UNSUPPORTED; + return rmw_fastrtps_shared_cpp::__rmw_return_loaned_message_from_publisher( + eprosima_fastrtps_identifier, publisher, loaned_message); } using BaseTypeSupport = rmw_fastrtps_dynamic_cpp::BaseTypeSupport; diff --git a/rmw_fastrtps_dynamic_cpp/src/rmw_service.cpp b/rmw_fastrtps_dynamic_cpp/src/rmw_service.cpp index bb0f7b567..214d4e1a5 100644 --- a/rmw_fastrtps_dynamic_cpp/src/rmw_service.cpp +++ b/rmw_fastrtps_dynamic_cpp/src/rmw_service.cpp @@ -341,6 +341,8 @@ rmw_create_service( if (!participant_info->leave_middleware_default_qos) { reader_qos.endpoint().history_memory_policy = eprosima::fastrtps::rtps::PREALLOCATED_WITH_REALLOC_MEMORY_MODE; + + reader_qos.data_sharing().off(); } if (!get_datareader_qos(*qos_policies, reader_qos)) { @@ -393,6 +395,8 @@ rmw_create_service( writer_qos.endpoint().history_memory_policy = eprosima::fastrtps::rtps::PREALLOCATED_WITH_REALLOC_MEMORY_MODE; + + writer_qos.data_sharing().off(); } if (!get_datawriter_qos(*qos_policies, writer_qos)) { diff --git a/rmw_fastrtps_dynamic_cpp/src/rmw_take.cpp b/rmw_fastrtps_dynamic_cpp/src/rmw_take.cpp index b1780efa4..29e36633a 100644 --- a/rmw_fastrtps_dynamic_cpp/src/rmw_take.cpp +++ b/rmw_fastrtps_dynamic_cpp/src/rmw_take.cpp @@ -91,13 +91,9 @@ rmw_take_loaned_message( bool * taken, rmw_subscription_allocation_t * allocation) { - (void) subscription; - (void) loaned_message; - (void) taken; - (void) allocation; - - RMW_SET_ERROR_MSG("rmw_take_loaned_message is not implemented for rmw_fastrtps_dynamic_cpp"); - return RMW_RET_UNSUPPORTED; + static_cast(allocation); + return rmw_fastrtps_shared_cpp::__rmw_take_loaned_message_internal( + eprosima_fastrtps_identifier, subscription, loaned_message, taken, nullptr); } rmw_ret_t @@ -108,15 +104,10 @@ rmw_take_loaned_message_with_info( rmw_message_info_t * message_info, rmw_subscription_allocation_t * allocation) { - (void) subscription; - (void) loaned_message; - (void) taken; - (void) message_info; - (void) allocation; - - RMW_SET_ERROR_MSG( - "rmw_take_loaned_message_with_info is not implemented for rmw_fastrtps_dynamic_cpp"); - return RMW_RET_UNSUPPORTED; + static_cast(allocation); + RMW_CHECK_ARGUMENT_FOR_NULL(message_info, RMW_RET_INVALID_ARGUMENT); + return rmw_fastrtps_shared_cpp::__rmw_take_loaned_message_internal( + eprosima_fastrtps_identifier, subscription, loaned_message, taken, message_info); } rmw_ret_t @@ -124,12 +115,8 @@ rmw_return_loaned_message_from_subscription( const rmw_subscription_t * subscription, void * loaned_message) { - (void) subscription; - (void) loaned_message; - - RMW_SET_ERROR_MSG( - "rmw_return_loaned_message_from_subscription is not implemented for rmw_fastrtps_dynamic_cpp"); - return RMW_RET_UNSUPPORTED; + return rmw_fastrtps_shared_cpp::__rmw_return_loaned_message_from_subscription( + eprosima_fastrtps_identifier, subscription, loaned_message); } rmw_ret_t diff --git a/rmw_fastrtps_dynamic_cpp/src/subscription.cpp b/rmw_fastrtps_dynamic_cpp/src/subscription.cpp index 86301ff17..4d2415220 100644 --- a/rmw_fastrtps_dynamic_cpp/src/subscription.cpp +++ b/rmw_fastrtps_dynamic_cpp/src/subscription.cpp @@ -40,6 +40,7 @@ #include "rmw_fastrtps_shared_cpp/namespace_prefix.hpp" #include "rmw_fastrtps_shared_cpp/qos.hpp" #include "rmw_fastrtps_shared_cpp/rmw_common.hpp" +#include "rmw_fastrtps_shared_cpp/subscription.hpp" #include "rmw_fastrtps_shared_cpp/utils.hpp" #include "fastrtps/participant/Participant.h" @@ -256,6 +257,8 @@ create_subscription( if (!participant_info->leave_middleware_default_qos) { reader_qos.endpoint().history_memory_policy = eprosima::fastrtps::rtps::PREALLOCATED_WITH_REALLOC_MEMORY_MODE; + + reader_qos.data_sharing().off(); } if (!get_datareader_qos(*qos_policies, reader_qos)) { @@ -338,7 +341,7 @@ create_subscription( memcpy(const_cast(rmw_subscription->topic_name), topic_name, strlen(topic_name) + 1); rmw_subscription->options = *subscription_options; - rmw_subscription->can_loan_messages = false; + rmw_fastrtps_shared_cpp::__init_subscription_for_loans(rmw_subscription); topic.should_be_deleted = false; cleanup_rmw_subscription.cancel(); diff --git a/rmw_fastrtps_dynamic_cpp/src/type_support_proxy.cpp b/rmw_fastrtps_dynamic_cpp/src/type_support_proxy.cpp index cc9eead5d..6bf24a479 100644 --- a/rmw_fastrtps_dynamic_cpp/src/type_support_proxy.cpp +++ b/rmw_fastrtps_dynamic_cpp/src/type_support_proxy.cpp @@ -21,6 +21,7 @@ TypeSupportProxy::TypeSupportProxy(rmw_fastrtps_shared_cpp::TypeSupport * inner_ { setName(inner_type->getName()); m_typeSize = inner_type->m_typeSize; + max_size_bound_ = inner_type->is_plain(); } size_t TypeSupportProxy::getEstimatedSerializedSize( diff --git a/rmw_fastrtps_shared_cpp/include/rmw_fastrtps_shared_cpp/TypeSupport.hpp b/rmw_fastrtps_shared_cpp/include/rmw_fastrtps_shared_cpp/TypeSupport.hpp index b31469318..e21142ea6 100644 --- a/rmw_fastrtps_shared_cpp/include/rmw_fastrtps_shared_cpp/TypeSupport.hpp +++ b/rmw_fastrtps_shared_cpp/include/rmw_fastrtps_shared_cpp/TypeSupport.hpp @@ -77,6 +77,24 @@ class TypeSupport : public eprosima::fastdds::dds::TopicDataType RMW_FASTRTPS_SHARED_CPP_PUBLIC void deleteData(void * data) override; + RMW_FASTRTPS_SHARED_CPP_PUBLIC + inline bool is_bounded() const +#ifdef TOPIC_DATA_TYPE_API_HAS_IS_BOUNDED + override +#endif + { + return max_size_bound_; + } + + RMW_FASTRTPS_SHARED_CPP_PUBLIC + inline bool is_plain() const +#ifdef TOPIC_DATA_TYPE_API_HAS_IS_PLAIN + override +#endif + { + return max_size_bound_; + } + RMW_FASTRTPS_SHARED_CPP_PUBLIC virtual ~TypeSupport() {} diff --git a/rmw_fastrtps_shared_cpp/include/rmw_fastrtps_shared_cpp/custom_subscriber_info.hpp b/rmw_fastrtps_shared_cpp/include/rmw_fastrtps_shared_cpp/custom_subscriber_info.hpp index 105a71ad1..f9c261581 100644 --- a/rmw_fastrtps_shared_cpp/include/rmw_fastrtps_shared_cpp/custom_subscriber_info.hpp +++ b/rmw_fastrtps_shared_cpp/include/rmw_fastrtps_shared_cpp/custom_subscriber_info.hpp @@ -17,6 +17,7 @@ #include #include +#include #include #include #include @@ -40,6 +41,11 @@ class SubListener; +namespace rmw_fastrtps_shared_cpp +{ +struct LoanManager; +} // namespace rmw_fastrtps_shared_cpp + struct CustomSubscriberInfo : public CustomEventInfo { virtual ~CustomSubscriberInfo() = default; @@ -50,6 +56,7 @@ struct CustomSubscriberInfo : public CustomEventInfo const void * type_support_impl_{nullptr}; rmw_gid_t subscription_gid_{}; const char * typesupport_identifier_{nullptr}; + std::shared_ptr loan_manager_; RMW_FASTRTPS_SHARED_CPP_PUBLIC EventListenerInterface * diff --git a/rmw_fastrtps_shared_cpp/include/rmw_fastrtps_shared_cpp/rmw_common.hpp b/rmw_fastrtps_shared_cpp/include/rmw_fastrtps_shared_cpp/rmw_common.hpp index 082e5629a..7c187b31d 100644 --- a/rmw_fastrtps_shared_cpp/include/rmw_fastrtps_shared_cpp/rmw_common.hpp +++ b/rmw_fastrtps_shared_cpp/include/rmw_fastrtps_shared_cpp/rmw_common.hpp @@ -144,6 +144,29 @@ __rmw_publish_serialized_message( const rmw_serialized_message_t * serialized_message, rmw_publisher_allocation_t * allocation); +RMW_FASTRTPS_SHARED_CPP_PUBLIC +rmw_ret_t +__rmw_borrow_loaned_message( + const char * identifier, + const rmw_publisher_t * publisher, + const rosidl_message_type_support_t * type_support, + void ** ros_message); + +RMW_FASTRTPS_SHARED_CPP_PUBLIC +rmw_ret_t +__rmw_return_loaned_message_from_publisher( + const char * identifier, + const rmw_publisher_t * publisher, + void * loaned_message); + +RMW_FASTRTPS_SHARED_CPP_PUBLIC +rmw_ret_t +__rmw_publish_loaned_message( + const char * identifier, + const rmw_publisher_t * publisher, + const void * ros_message, + rmw_publisher_allocation_t * allocation); + RMW_FASTRTPS_SHARED_CPP_PUBLIC rmw_ret_t __rmw_publisher_assert_liveliness( @@ -307,6 +330,22 @@ __rmw_take_sequence( size_t * taken, rmw_subscription_allocation_t * allocation); +RMW_FASTRTPS_SHARED_CPP_PUBLIC +rmw_ret_t +__rmw_take_loaned_message_internal( + const char * identifier, + const rmw_subscription_t * subscription, + void ** loaned_message, + bool * taken, + rmw_message_info_t * message_info); + +RMW_FASTRTPS_SHARED_CPP_PUBLIC +rmw_ret_t +__rmw_return_loaned_message_from_subscription( + const char * identifier, + const rmw_subscription_t * subscription, + void * loaned_message); + RMW_FASTRTPS_SHARED_CPP_PUBLIC rmw_ret_t __rmw_take_event( diff --git a/rmw_fastrtps_shared_cpp/include/rmw_fastrtps_shared_cpp/subscription.hpp b/rmw_fastrtps_shared_cpp/include/rmw_fastrtps_shared_cpp/subscription.hpp index 523b916b3..1a3394b47 100644 --- a/rmw_fastrtps_shared_cpp/include/rmw_fastrtps_shared_cpp/subscription.hpp +++ b/rmw_fastrtps_shared_cpp/include/rmw_fastrtps_shared_cpp/subscription.hpp @@ -22,6 +22,11 @@ namespace rmw_fastrtps_shared_cpp { +RMW_FASTRTPS_SHARED_CPP_PUBLIC +void +__init_subscription_for_loans( + rmw_subscription_t * subscription); + RMW_FASTRTPS_SHARED_CPP_PUBLIC rmw_ret_t destroy_subscription( diff --git a/rmw_fastrtps_shared_cpp/src/rmw_publish.cpp b/rmw_fastrtps_shared_cpp/src/rmw_publish.cpp index cc1d7e23f..a6e07a7c7 100644 --- a/rmw_fastrtps_shared_cpp/src/rmw_publish.cpp +++ b/rmw_fastrtps_shared_cpp/src/rmw_publish.cpp @@ -108,4 +108,35 @@ __rmw_publish_serialized_message( return RMW_RET_OK; } +rmw_ret_t +__rmw_publish_loaned_message( + const char * identifier, + const rmw_publisher_t * publisher, + const void * ros_message, + rmw_publisher_allocation_t * allocation) +{ + static_cast(allocation); + RCUTILS_CAN_RETURN_WITH_ERROR_OF(RMW_RET_INVALID_ARGUMENT); + RCUTILS_CAN_RETURN_WITH_ERROR_OF(RMW_RET_INCORRECT_RMW_IMPLEMENTATION); + RCUTILS_CAN_RETURN_WITH_ERROR_OF(RMW_RET_ERROR); + + RMW_CHECK_ARGUMENT_FOR_NULL(publisher, RMW_RET_INVALID_ARGUMENT); + RMW_CHECK_TYPE_IDENTIFIERS_MATCH( + publisher, publisher->implementation_identifier, identifier, + return RMW_RET_INCORRECT_RMW_IMPLEMENTATION); + if (!publisher->can_loan_messages) { + RMW_SET_ERROR_MSG("Loaning is not supported"); + return RMW_RET_UNSUPPORTED; + } + + RMW_CHECK_ARGUMENT_FOR_NULL(ros_message, RMW_RET_INVALID_ARGUMENT); + + auto info = static_cast(publisher->data); + if (!info->data_writer_->write(const_cast(ros_message))) { + RMW_SET_ERROR_MSG("cannot publish data"); + return RMW_RET_ERROR; + } + + return RMW_RET_OK; +} } // namespace rmw_fastrtps_shared_cpp diff --git a/rmw_fastrtps_shared_cpp/src/rmw_publisher.cpp b/rmw_fastrtps_shared_cpp/src/rmw_publisher.cpp index 8a1bd6a0d..73f6d0f5e 100644 --- a/rmw_fastrtps_shared_cpp/src/rmw_publisher.cpp +++ b/rmw_fastrtps_shared_cpp/src/rmw_publisher.cpp @@ -132,4 +132,59 @@ __rmw_publisher_get_actual_qos( return RMW_RET_OK; } + +rmw_ret_t +__rmw_borrow_loaned_message( + const char * identifier, + const rmw_publisher_t * publisher, + const rosidl_message_type_support_t * type_support, + void ** ros_message) +{ + RMW_CHECK_ARGUMENT_FOR_NULL(publisher, RMW_RET_INVALID_ARGUMENT); + RMW_CHECK_TYPE_IDENTIFIERS_MATCH( + publisher, publisher->implementation_identifier, identifier, + return RMW_RET_INCORRECT_RMW_IMPLEMENTATION); + if (!publisher->can_loan_messages) { + RMW_SET_ERROR_MSG("Loaning is not supported"); + return RMW_RET_UNSUPPORTED; + } + + RMW_CHECK_ARGUMENT_FOR_NULL(type_support, RMW_RET_INVALID_ARGUMENT); + RMW_CHECK_ARGUMENT_FOR_NULL(ros_message, RMW_RET_INVALID_ARGUMENT); + if (nullptr != *ros_message) { + return RMW_RET_INVALID_ARGUMENT; + } + + auto info = static_cast(publisher->data); + if (!info->data_writer_->loan_sample(*ros_message)) { + return RMW_RET_ERROR; + } + + return RMW_RET_OK; +} + +rmw_ret_t +__rmw_return_loaned_message_from_publisher( + const char * identifier, + const rmw_publisher_t * publisher, + void * loaned_message) +{ + RMW_CHECK_ARGUMENT_FOR_NULL(publisher, RMW_RET_INVALID_ARGUMENT); + RMW_CHECK_TYPE_IDENTIFIERS_MATCH( + publisher, publisher->implementation_identifier, identifier, + return RMW_RET_INCORRECT_RMW_IMPLEMENTATION); + if (!publisher->can_loan_messages) { + RMW_SET_ERROR_MSG("Loaning is not supported"); + return RMW_RET_UNSUPPORTED; + } + + RMW_CHECK_ARGUMENT_FOR_NULL(loaned_message, RMW_RET_INVALID_ARGUMENT); + + auto info = static_cast(publisher->data); + if (!info->data_writer_->discard_loan(loaned_message)) { + return RMW_RET_ERROR; + } + + return RMW_RET_OK; +} } // namespace rmw_fastrtps_shared_cpp diff --git a/rmw_fastrtps_shared_cpp/src/rmw_take.cpp b/rmw_fastrtps_shared_cpp/src/rmw_take.cpp index 729db99ba..a2274eed1 100644 --- a/rmw_fastrtps_shared_cpp/src/rmw_take.cpp +++ b/rmw_fastrtps_shared_cpp/src/rmw_take.cpp @@ -12,6 +12,8 @@ // See the License for the specific language governing permissions and // limitations under the License. +#include + #include "rmw/allocators.h" #include "rmw/error_handling.h" #include "rmw/serialized_message.h" @@ -19,18 +21,23 @@ #include "fastdds/dds/subscriber/SampleInfo.hpp" +#include "fastrtps/utils/collections/ResourceLimitedVector.hpp" + #include "fastcdr/Cdr.h" #include "fastcdr/FastBuffer.h" #include "rmw_fastrtps_shared_cpp/custom_subscriber_info.hpp" #include "rmw_fastrtps_shared_cpp/guid_utils.hpp" #include "rmw_fastrtps_shared_cpp/rmw_common.hpp" +#include "rmw_fastrtps_shared_cpp/subscription.hpp" #include "rmw_fastrtps_shared_cpp/TypeSupport.hpp" #include "rmw_fastrtps_shared_cpp/utils.hpp" namespace rmw_fastrtps_shared_cpp { +using DataSharingKind = eprosima::fastdds::dds::DataSharingKind; + void _assign_message_info( const char * identifier, @@ -367,4 +374,133 @@ __rmw_take_serialized_message_with_info( return _take_serialized_message( identifier, subscription, serialized_message, taken, message_info, allocation); } + +// ----------------- Loans related code ------------------------- // + +struct GenericSequence : public eprosima::fastdds::dds::LoanableCollection +{ + GenericSequence() = default; + + void resize(size_type /*new_length*/) override + { + // This kind of collection should only be used with loans + throw std::bad_alloc(); + } +}; + +struct LoanManager +{ + struct Item + { + GenericSequence data_seq{}; + eprosima::fastdds::dds::SampleInfoSeq info_seq{}; + }; + + explicit LoanManager(const eprosima::fastrtps::ResourceLimitedContainerConfig & items_cfg) + : items(items_cfg) + { + } + + std::mutex mtx; + eprosima::fastrtps::ResourceLimitedVector items RCPPUTILS_TSA_GUARDED_BY(mtx); +}; + +void +__init_subscription_for_loans( + rmw_subscription_t * subscription) +{ + auto info = static_cast(subscription->data); + const auto & qos = info->data_reader_->get_qos(); + bool has_data_sharing = DataSharingKind::OFF != qos.data_sharing().kind(); + subscription->can_loan_messages = has_data_sharing && info->type_support_->is_plain(); + if (subscription->can_loan_messages) { + const auto & allocation_qos = qos.reader_resource_limits().outstanding_reads_allocation; + info->loan_manager_ = std::make_shared(allocation_qos); + } +} + +rmw_ret_t +__rmw_take_loaned_message_internal( + const char * identifier, + const rmw_subscription_t * subscription, + void ** loaned_message, + bool * taken, + rmw_message_info_t * message_info) +{ + RMW_CHECK_ARGUMENT_FOR_NULL(subscription, RMW_RET_INVALID_ARGUMENT); + RMW_CHECK_TYPE_IDENTIFIERS_MATCH( + subscription, subscription->implementation_identifier, identifier, + return RMW_RET_INCORRECT_RMW_IMPLEMENTATION); + if (!subscription->can_loan_messages) { + RMW_SET_ERROR_MSG("Loaning is not supported"); + return RMW_RET_UNSUPPORTED; + } + + RMW_CHECK_ARGUMENT_FOR_NULL(loaned_message, RMW_RET_INVALID_ARGUMENT); + RMW_CHECK_ARGUMENT_FOR_NULL(taken, RMW_RET_INVALID_ARGUMENT); + + auto info = static_cast(subscription->data); + auto loan_mgr = info->loan_manager_; + std::unique_lock guard(loan_mgr->mtx); + auto item = loan_mgr->items.emplace_back(); + if (nullptr == item) { + RMW_SET_ERROR_MSG("Out of resources for loaned message info"); + return RMW_RET_BAD_ALLOC; + } + + while (ReturnCode_t::RETCODE_OK == info->data_reader_->take(item->data_seq, item->info_seq, 1)) { + if (item->info_seq[0].valid_data) { + if (nullptr != message_info) { + _assign_message_info(identifier, message_info, &item->info_seq[0]); + } + *loaned_message = item->data_seq.buffer()[0]; + *taken = true; + info->listener_->update_has_data(info->data_reader_); + return RMW_RET_OK; + } + + // Should return loan before taking again + info->data_reader_->return_loan(item->data_seq, item->info_seq); + } + + // No data available, return loan information. + loan_mgr->items.pop_back(); + *taken = false; + info->listener_->update_has_data(info->data_reader_); + return RMW_RET_OK; +} + +rmw_ret_t +__rmw_return_loaned_message_from_subscription( + const char * identifier, + const rmw_subscription_t * subscription, + void * loaned_message) +{ + RMW_CHECK_ARGUMENT_FOR_NULL(subscription, RMW_RET_INVALID_ARGUMENT); + RMW_CHECK_TYPE_IDENTIFIERS_MATCH( + subscription, subscription->implementation_identifier, identifier, + return RMW_RET_INCORRECT_RMW_IMPLEMENTATION); + if (!subscription->can_loan_messages) { + RMW_SET_ERROR_MSG("Loaning is not supported"); + return RMW_RET_UNSUPPORTED; + } + RMW_CHECK_ARGUMENT_FOR_NULL(loaned_message, RMW_RET_INVALID_ARGUMENT); + + auto info = static_cast(subscription->data); + auto loan_mgr = info->loan_manager_; + std::lock_guard guard(loan_mgr->mtx); + for (auto it = loan_mgr->items.begin(); it != loan_mgr->items.end(); ++it) { + if (loaned_message == it->data_seq.buffer()[0]) { + if (!info->data_reader_->return_loan(it->data_seq, it->info_seq)) { + RMW_SET_ERROR_MSG("Error returning loan"); + return RMW_RET_ERROR; + } + loan_mgr->items.erase(it); + return RMW_RET_OK; + } + } + + RMW_SET_ERROR_MSG("Trying to return message not loaned by this subscription"); + return RMW_RET_ERROR; +} } // namespace rmw_fastrtps_shared_cpp