Skip to content

Commit

Permalink
[Galactic] Loan messages implementation (ros2#547)
Browse files Browse the repository at this point in the history
* Loan messages implementation (ros2#523)

* Added is_plain_ attribute to base TypeSupport.
* Added new methods to base TypeSupport.
* Implementation of rmw_borrow_loaned_message.
* Implementation of rmw_return_loaned_message_from_publisher.
* Enable loan messages on publishers of plain types.
* Implementation for taking loaned messages.
* Enable loan messages on subscriptions of plain types.

Signed-off-by: Miguel Company <MiguelCompany@eprosima.com>
Co-authored-by: Michel Hidalgo <michel@ekumenlabs.com>

* Changes to work with galactic type support.

Signed-off-by: Miguel Company <MiguelCompany@eprosima.com>

* Improve return code.

Signed-off-by: Miguel Company <MiguelCompany@eprosima.com>

* Prepare type size for alignment requirements.

Signed-off-by: Miguel Company <MiguelCompany@eprosima.com>

* Add comments on alignment code.

Signed-off-by: Miguel Company <MiguelCompany@eprosima.com>

Co-authored-by: Michel Hidalgo <michel@ekumenlabs.com>
  • Loading branch information
2 people authored and mehmetkillioglu committed Aug 10, 2022
1 parent 416cb16 commit 55f58bb
Show file tree
Hide file tree
Showing 25 changed files with 365 additions and 83 deletions.
7 changes: 6 additions & 1 deletion rmw_fastrtps_cpp/src/publisher.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,8 @@

#include "type_support_common.hpp"

using DataSharingKind = eprosima::fastdds::dds::DataSharingKind;

rmw_publisher_t *
rmw_fastrtps_cpp::create_publisher(
const CustomParticipantInfo * participant_info,
Expand Down Expand Up @@ -248,6 +250,8 @@ rmw_fastrtps_cpp::create_publisher(

writer_qos.endpoint().history_memory_policy =
eprosima::fastrtps::rtps::PREALLOCATED_WITH_REALLOC_MEMORY_MODE;

writer_qos.data_sharing().off();
}

// Get QoS from RMW
Expand Down Expand Up @@ -291,7 +295,8 @@ rmw_fastrtps_cpp::create_publisher(
rmw_publisher_free(rmw_publisher);
});

rmw_publisher->can_loan_messages = false;
bool has_data_sharing = DataSharingKind::OFF != writer_qos.data_sharing().kind();
rmw_publisher->can_loan_messages = has_data_sharing && info->type_support_->is_plain();
rmw_publisher->implementation_identifier = eprosima_fastrtps_identifier;
rmw_publisher->data = info;

Expand Down
4 changes: 4 additions & 0 deletions rmw_fastrtps_cpp/src/rmw_client.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -311,6 +311,8 @@ rmw_create_client(
if (!participant_info->leave_middleware_default_qos) {
reader_qos.endpoint().history_memory_policy =
eprosima::fastrtps::rtps::PREALLOCATED_WITH_REALLOC_MEMORY_MODE;

reader_qos.data_sharing().off();
}

if (!get_datareader_qos(*qos_policies, reader_qos)) {
Expand Down Expand Up @@ -359,6 +361,8 @@ rmw_create_client(

writer_qos.endpoint().history_memory_policy =
eprosima::fastrtps::rtps::PREALLOCATED_WITH_REALLOC_MEMORY_MODE;

writer_qos.data_sharing().off();
}

if (!get_datawriter_qos(*qos_policies, writer_qos)) {
Expand Down
8 changes: 2 additions & 6 deletions rmw_fastrtps_cpp/src/rmw_publish.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -51,11 +51,7 @@ rmw_publish_loaned_message(
void * ros_message,
rmw_publisher_allocation_t * allocation)
{
(void) publisher;
(void) ros_message;
(void) allocation;

RMW_SET_ERROR_MSG("rmw_publish_loaned_message not implemented for rmw_fastrtps_cpp");
return RMW_RET_UNSUPPORTED;
return rmw_fastrtps_shared_cpp::__rmw_publish_loaned_message(
eprosima_fastrtps_identifier, publisher, ros_message, allocation);
}
} // extern "C"
16 changes: 4 additions & 12 deletions rmw_fastrtps_cpp/src/rmw_publisher.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -169,25 +169,17 @@ rmw_borrow_loaned_message(
const rosidl_message_type_support_t * type_support,
void ** ros_message)
{
(void) publisher;
(void) type_support;
(void) ros_message;

RMW_SET_ERROR_MSG("rmw_borrow_loaned_message not implemented for rmw_fastrtps_cpp");
return RMW_RET_UNSUPPORTED;
return rmw_fastrtps_shared_cpp::__rmw_borrow_loaned_message(
eprosima_fastrtps_identifier, publisher, type_support, ros_message);
}

rmw_ret_t
rmw_return_loaned_message_from_publisher(
const rmw_publisher_t * publisher,
void * loaned_message)
{
(void) publisher;
(void) loaned_message;

RMW_SET_ERROR_MSG(
"rmw_return_loaned_message_from_publisher not implemented for rmw_fastrtps_cpp");
return RMW_RET_UNSUPPORTED;
return rmw_fastrtps_shared_cpp::__rmw_return_loaned_message_from_publisher(
eprosima_fastrtps_identifier, publisher, loaned_message);
}

rmw_ret_t
Expand Down
4 changes: 4 additions & 0 deletions rmw_fastrtps_cpp/src/rmw_service.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -310,6 +310,8 @@ rmw_create_service(
if (!participant_info->leave_middleware_default_qos) {
reader_qos.endpoint().history_memory_policy =
eprosima::fastrtps::rtps::PREALLOCATED_WITH_REALLOC_MEMORY_MODE;

reader_qos.data_sharing().off();
}

if (!get_datareader_qos(*qos_policies, reader_qos)) {
Expand Down Expand Up @@ -362,6 +364,8 @@ rmw_create_service(

writer_qos.endpoint().history_memory_policy =
eprosima::fastrtps::rtps::PREALLOCATED_WITH_REALLOC_MEMORY_MODE;

writer_qos.data_sharing().off();
}

if (!get_datawriter_qos(*qos_policies, writer_qos)) {
Expand Down
30 changes: 9 additions & 21 deletions rmw_fastrtps_cpp/src/rmw_take.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -91,13 +91,9 @@ rmw_take_loaned_message(
bool * taken,
rmw_subscription_allocation_t * allocation)
{
(void) subscription;
(void) loaned_message;
(void) taken;
(void) allocation;

RMW_SET_ERROR_MSG("rmw_take_loaned_message not implemented for rmw_fastrtps_cpp");
return RMW_RET_UNSUPPORTED;
static_cast<void>(allocation);
return rmw_fastrtps_shared_cpp::__rmw_take_loaned_message_internal(
eprosima_fastrtps_identifier, subscription, loaned_message, taken, nullptr);
}

rmw_ret_t
Expand All @@ -108,27 +104,19 @@ rmw_take_loaned_message_with_info(
rmw_message_info_t * message_info,
rmw_subscription_allocation_t * allocation)
{
(void) subscription;
(void) loaned_message;
(void) taken;
(void) message_info;
(void) allocation;

RMW_SET_ERROR_MSG("rmw_take_loaned_message_with_info not implemented for rmw_fastrtps_cpp");
return RMW_RET_UNSUPPORTED;
static_cast<void>(allocation);
RMW_CHECK_ARGUMENT_FOR_NULL(message_info, RMW_RET_INVALID_ARGUMENT);
return rmw_fastrtps_shared_cpp::__rmw_take_loaned_message_internal(
eprosima_fastrtps_identifier, subscription, loaned_message, taken, message_info);
}

rmw_ret_t
rmw_return_loaned_message_from_subscription(
const rmw_subscription_t * subscription,
void * loaned_message)
{
(void) subscription;
(void) loaned_message;

RMW_SET_ERROR_MSG(
"rmw_return_loaned_message_from_subscription not implemented for rmw_fastrtps_cpp");
return RMW_RET_UNSUPPORTED;
return rmw_fastrtps_shared_cpp::__rmw_return_loaned_message_from_subscription(
eprosima_fastrtps_identifier, subscription, loaned_message);
}

rmw_ret_t
Expand Down
5 changes: 4 additions & 1 deletion rmw_fastrtps_cpp/src/subscription.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@
#include "rmw_fastrtps_shared_cpp/namespace_prefix.hpp"
#include "rmw_fastrtps_shared_cpp/qos.hpp"
#include "rmw_fastrtps_shared_cpp/rmw_common.hpp"
#include "rmw_fastrtps_shared_cpp/subscription.hpp"
#include "rmw_fastrtps_shared_cpp/utils.hpp"

#include "rmw_fastrtps_cpp/identifier.hpp"
Expand Down Expand Up @@ -242,6 +243,8 @@ create_subscription(
if (!participant_info->leave_middleware_default_qos) {
reader_qos.endpoint().history_memory_policy =
eprosima::fastrtps::rtps::PREALLOCATED_WITH_REALLOC_MEMORY_MODE;

reader_qos.data_sharing().off();
}

if (!get_datareader_qos(*qos_policies, reader_qos)) {
Expand Down Expand Up @@ -324,7 +327,7 @@ create_subscription(
return nullptr;
}
rmw_subscription->options = *subscription_options;
rmw_subscription->can_loan_messages = false;
rmw_fastrtps_shared_cpp::__init_subscription_for_loans(rmw_subscription);

topic.should_be_deleted = false;
cleanup_rmw_subscription.cancel();
Expand Down
2 changes: 2 additions & 0 deletions rmw_fastrtps_cpp/src/type_support_common.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,8 @@ void TypeSupport::set_members(const message_type_support_callbacks_t * members)

// Total size is encapsulation size + data size
m_typeSize = 4 + data_size;
// Account for RTPS submessage alignment
m_typeSize = (m_typeSize + 3) & ~3;
}

size_t TypeSupport::getEstimatedSerializedSize(const void * ros_message, const void * impl) const
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,8 @@ MessageTypeSupport<MembersType>::MessageTypeSupport(
} else {
this->m_typeSize++;
}
// Account for RTPS submessage alignment
this->m_typeSize = (this->m_typeSize + 3) & ~3;
}

} // namespace rmw_fastrtps_dynamic_cpp
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,8 @@ RequestTypeSupport<ServiceMembersType, MessageMembersType>::RequestTypeSupport(
} else {
this->m_typeSize++;
}
// Account for RTPS submessage alignment
this->m_typeSize = (this->m_typeSize + 3) & ~3;
}

template<typename ServiceMembersType, typename MessageMembersType>
Expand Down Expand Up @@ -88,6 +90,8 @@ ResponseTypeSupport<ServiceMembersType, MessageMembersType>::ResponseTypeSupport
} else {
this->m_typeSize++;
}
// Account for RTPS submessage alignment
this->m_typeSize = (this->m_typeSize + 3) & ~3;
}

} // namespace rmw_fastrtps_dynamic_cpp
Expand Down
6 changes: 5 additions & 1 deletion rmw_fastrtps_dynamic_cpp/src/publisher.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@
#include "type_support_common.hpp"
#include "type_support_registry.hpp"

using DataSharingKind = eprosima::fastdds::dds::DataSharingKind;
using TypeSupportProxy = rmw_fastrtps_dynamic_cpp::TypeSupportProxy;

rmw_publisher_t *
Expand Down Expand Up @@ -263,6 +264,8 @@ rmw_fastrtps_dynamic_cpp::create_publisher(

writer_qos.endpoint().history_memory_policy =
eprosima::fastrtps::rtps::PREALLOCATED_WITH_REALLOC_MEMORY_MODE;

writer_qos.data_sharing().off();
}

// Get QoS from RMW
Expand Down Expand Up @@ -306,7 +309,8 @@ rmw_fastrtps_dynamic_cpp::create_publisher(
rmw_publisher_free(rmw_publisher);
});

rmw_publisher->can_loan_messages = false;
bool has_data_sharing = DataSharingKind::OFF != writer_qos.data_sharing().kind();
rmw_publisher->can_loan_messages = has_data_sharing && info->type_support_->is_plain();
rmw_publisher->implementation_identifier = eprosima_fastrtps_identifier;
rmw_publisher->data = info;

Expand Down
4 changes: 4 additions & 0 deletions rmw_fastrtps_dynamic_cpp/src/rmw_client.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -342,6 +342,8 @@ rmw_create_client(
if (!participant_info->leave_middleware_default_qos) {
reader_qos.endpoint().history_memory_policy =
eprosima::fastrtps::rtps::PREALLOCATED_WITH_REALLOC_MEMORY_MODE;

reader_qos.data_sharing().off();
}

if (!get_datareader_qos(*qos_policies, reader_qos)) {
Expand Down Expand Up @@ -390,6 +392,8 @@ rmw_create_client(

writer_qos.endpoint().history_memory_policy =
eprosima::fastrtps::rtps::PREALLOCATED_WITH_REALLOC_MEMORY_MODE;

writer_qos.data_sharing().off();
}

if (!get_datawriter_qos(*qos_policies, writer_qos)) {
Expand Down
8 changes: 2 additions & 6 deletions rmw_fastrtps_dynamic_cpp/src/rmw_publish.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -41,12 +41,8 @@ rmw_publish_loaned_message(
void * ros_message,
rmw_publisher_allocation_t * allocation)
{
(void) publisher;
(void) ros_message;
(void) allocation;

RMW_SET_ERROR_MSG("rmw_publish_loaned_message is not implemented for rmw_fastrtps_dynamic_cpp");
return RMW_RET_UNSUPPORTED;
return rmw_fastrtps_shared_cpp::__rmw_publish_loaned_message(
eprosima_fastrtps_identifier, publisher, ros_message, allocation);
}

rmw_ret_t
Expand Down
16 changes: 4 additions & 12 deletions rmw_fastrtps_dynamic_cpp/src/rmw_publisher.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -170,25 +170,17 @@ rmw_borrow_loaned_message(
const rosidl_message_type_support_t * type_support,
void ** ros_message)
{
(void) publisher;
(void) type_support;
(void) ros_message;

RMW_SET_ERROR_MSG("rmw_borrow_loaned_message is not implemented for rmw_fastrtps_dynamic_cpp");
return RMW_RET_UNSUPPORTED;
return rmw_fastrtps_shared_cpp::__rmw_borrow_loaned_message(
eprosima_fastrtps_identifier, publisher, type_support, ros_message);
}

rmw_ret_t
rmw_return_loaned_message_from_publisher(
const rmw_publisher_t * publisher,
void * loaned_message)
{
(void) publisher;
(void) loaned_message;

RMW_SET_ERROR_MSG(
"rmw_return_loaned_message_from_publisher is not implemented for rmw_fastrtps_dynamic_cpp");
return RMW_RET_UNSUPPORTED;
return rmw_fastrtps_shared_cpp::__rmw_return_loaned_message_from_publisher(
eprosima_fastrtps_identifier, publisher, loaned_message);
}

using BaseTypeSupport = rmw_fastrtps_dynamic_cpp::BaseTypeSupport;
Expand Down
4 changes: 4 additions & 0 deletions rmw_fastrtps_dynamic_cpp/src/rmw_service.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -341,6 +341,8 @@ rmw_create_service(
if (!participant_info->leave_middleware_default_qos) {
reader_qos.endpoint().history_memory_policy =
eprosima::fastrtps::rtps::PREALLOCATED_WITH_REALLOC_MEMORY_MODE;

reader_qos.data_sharing().off();
}

if (!get_datareader_qos(*qos_policies, reader_qos)) {
Expand Down Expand Up @@ -393,6 +395,8 @@ rmw_create_service(

writer_qos.endpoint().history_memory_policy =
eprosima::fastrtps::rtps::PREALLOCATED_WITH_REALLOC_MEMORY_MODE;

writer_qos.data_sharing().off();
}

if (!get_datawriter_qos(*qos_policies, writer_qos)) {
Expand Down
31 changes: 9 additions & 22 deletions rmw_fastrtps_dynamic_cpp/src/rmw_take.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -91,13 +91,9 @@ rmw_take_loaned_message(
bool * taken,
rmw_subscription_allocation_t * allocation)
{
(void) subscription;
(void) loaned_message;
(void) taken;
(void) allocation;

RMW_SET_ERROR_MSG("rmw_take_loaned_message is not implemented for rmw_fastrtps_dynamic_cpp");
return RMW_RET_UNSUPPORTED;
static_cast<void>(allocation);
return rmw_fastrtps_shared_cpp::__rmw_take_loaned_message_internal(
eprosima_fastrtps_identifier, subscription, loaned_message, taken, nullptr);
}

rmw_ret_t
Expand All @@ -108,28 +104,19 @@ rmw_take_loaned_message_with_info(
rmw_message_info_t * message_info,
rmw_subscription_allocation_t * allocation)
{
(void) subscription;
(void) loaned_message;
(void) taken;
(void) message_info;
(void) allocation;

RMW_SET_ERROR_MSG(
"rmw_take_loaned_message_with_info is not implemented for rmw_fastrtps_dynamic_cpp");
return RMW_RET_UNSUPPORTED;
static_cast<void>(allocation);
RMW_CHECK_ARGUMENT_FOR_NULL(message_info, RMW_RET_INVALID_ARGUMENT);
return rmw_fastrtps_shared_cpp::__rmw_take_loaned_message_internal(
eprosima_fastrtps_identifier, subscription, loaned_message, taken, message_info);
}

rmw_ret_t
rmw_return_loaned_message_from_subscription(
const rmw_subscription_t * subscription,
void * loaned_message)
{
(void) subscription;
(void) loaned_message;

RMW_SET_ERROR_MSG(
"rmw_return_loaned_message_from_subscription is not implemented for rmw_fastrtps_dynamic_cpp");
return RMW_RET_UNSUPPORTED;
return rmw_fastrtps_shared_cpp::__rmw_return_loaned_message_from_subscription(
eprosima_fastrtps_identifier, subscription, loaned_message);
}

rmw_ret_t
Expand Down
Loading

0 comments on commit 55f58bb

Please sign in to comment.