Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix TypeAdapted publishing with large messages. #2443

Merged
merged 1 commit into from
Mar 5, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
24 changes: 10 additions & 14 deletions rclcpp/include/rclcpp/publisher.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -290,8 +290,8 @@ class Publisher : public PublisherBase
{
// Avoid allocating when not using intra process.
if (!intra_process_is_enabled_) {
// In this case we're not using intra process.
return this->do_inter_process_publish(msg);
this->do_inter_process_publish(msg);
return;
}
// Otherwise we have to allocate memory in a unique_ptr and pass it along.
// As the message is not const, a copy should be made.
Expand All @@ -318,22 +318,19 @@ class Publisher : public PublisherBase
>
publish(std::unique_ptr<T, PublishedTypeDeleter> msg)
{
// Avoid allocating when not using intra process.
if (!intra_process_is_enabled_) {
// In this case we're not using intra process.
ROSMessageType ros_msg;
rclcpp::TypeAdapter<MessageT>::convert_to_ros_message(*msg, ros_msg);
return this->do_inter_process_publish(ros_msg);
auto ros_msg_ptr = std::make_unique<ROSMessageType>();
rclcpp::TypeAdapter<MessageT>::convert_to_ros_message(*msg, *ros_msg_ptr);
this->do_inter_process_publish(*ros_msg_ptr);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why not also move the unique_ptr as all other functions do it ?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Because do_inter_process_publish takes a constref, not a unique_ptr.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

My idea was more to change the

void
do_inter_process_publish(const ROSMessageType & msg)
to also take a unique_ptr, so that the interface would prevent this type of mistake. I had a further look and found, that it needs to be constref, as the external publish function uses it.

return;
}

bool inter_process_publish_needed =
get_subscription_count() > get_intra_process_subscription_count();

if (inter_process_publish_needed) {
auto ros_msg_ptr = std::make_shared<ROSMessageType>();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We don't need a shared_ptr here, better also use a unique_ptr

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We don't need a shared_ptr here, better also use a unique_ptr

This one needs to be a shared_ptr because of the call to buffer_->add_shared below.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ahh you are right, I overlooked that.

// TODO(clalancette): This is unnecessarily doing an additional conversion
// that may have already been done in do_intra_process_publish_and_return_shared().
// We should just reuse that effort.
rclcpp::TypeAdapter<MessageT>::convert_to_ros_message(*msg, *ros_msg_ptr);
this->do_intra_process_publish(std::move(msg));
this->do_inter_process_publish(*ros_msg_ptr);
Expand Down Expand Up @@ -368,13 +365,12 @@ class Publisher : public PublisherBase
>
publish(const T & msg)
{
// Avoid double allocating when not using intra process.
if (!intra_process_is_enabled_) {
// Convert to the ROS message equivalent and publish it.
ROSMessageType ros_msg;
rclcpp::TypeAdapter<MessageT>::convert_to_ros_message(msg, ros_msg);
// In this case we're not using intra process.
return this->do_inter_process_publish(ros_msg);
auto ros_msg_ptr = std::make_unique<ROSMessageType>();
rclcpp::TypeAdapter<MessageT>::convert_to_ros_message(msg, *ros_msg_ptr);
this->do_inter_process_publish(*ros_msg_ptr);
return;
}

// Otherwise we have to allocate memory in a unique_ptr and pass it along.
Expand Down
3 changes: 3 additions & 0 deletions rclcpp/test/msg/LargeMessage.msg
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
# A message with a size larger than the default Linux stack size
uint8[10485760] data
uint64 size
1 change: 1 addition & 0 deletions rclcpp/test/rclcpp/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ add_definitions(-DTEST_RESOURCES_DIRECTORY="${TEST_RESOURCES_DIRECTORY}")

rosidl_generate_interfaces(${PROJECT_NAME}_test_msgs
../msg/Header.msg
../msg/LargeMessage.msg
../msg/MessageWithHeader.msg
../msg/String.msg
DEPENDENCIES builtin_interfaces
Expand Down
71 changes: 70 additions & 1 deletion rclcpp/test/rclcpp/test_publisher_with_type_adapter.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -12,9 +12,9 @@
// See the License for the specific language governing permissions and
// limitations under the License.


#include <gtest/gtest.h>

#include <algorithm>
#include <chrono>
#include <memory>
#include <string>
Expand All @@ -25,6 +25,7 @@
#include "rclcpp/loaned_message.hpp"
#include "rclcpp/rclcpp.hpp"

#include "rclcpp/msg/large_message.hpp"
#include "rclcpp/msg/string.hpp"


Expand Down Expand Up @@ -105,6 +106,32 @@ struct TypeAdapter<int, rclcpp::msg::String>
}
};

template<>
struct TypeAdapter<std::string, rclcpp::msg::LargeMessage>
{
using is_specialized = std::true_type;
using custom_type = std::string;
using ros_message_type = rclcpp::msg::LargeMessage;

static void
convert_to_ros_message(
const custom_type & source,
ros_message_type & destination)
{
destination.size = source.size();
std::memcpy(destination.data.data(), source.data(), source.size());
}

static void
convert_to_custom(
const ros_message_type & source,
custom_type & destination)
{
destination.resize(source.size);
std::memcpy(destination.data(), source.data.data(), source.size);
}
};

} // namespace rclcpp

/*
Expand Down Expand Up @@ -336,3 +363,45 @@ TEST_F(TestPublisher, check_type_adapted_message_is_sent_and_received) {
assert_message_was_received();
}
}

TEST_F(TestPublisher, test_large_message_unique)
{
// There have been some bugs in the past when trying to type-adapt large messages
// (larger than the stack size). Here we just make sure that a 10MB message works,
// which is larger than the default stack size on Linux.

using StringTypeAdapter = rclcpp::TypeAdapter<std::string, rclcpp::msg::LargeMessage>;

auto node = std::make_shared<rclcpp::Node>("my_node", "/ns", rclcpp::NodeOptions());

const std::string topic_name = "topic_name";

auto pub = node->create_publisher<StringTypeAdapter>(topic_name, 1);

static constexpr size_t length = 10 * 1024 * 1024;
auto message_data = std::make_unique<std::string>();
message_data->reserve(length);
std::fill(message_data->begin(), message_data->begin() + length, '#');
pub->publish(std::move(message_data));
}

TEST_F(TestPublisher, test_large_message_constref)
{
// There have been some bugs in the past when trying to type-adapt large messages
// (larger than the stack size). Here we just make sure that a 10MB message works,
// which is larger than the default stack size on Linux.

using StringTypeAdapter = rclcpp::TypeAdapter<std::string, rclcpp::msg::LargeMessage>;

auto node = std::make_shared<rclcpp::Node>("my_node", "/ns", rclcpp::NodeOptions());

const std::string topic_name = "topic_name";

auto pub = node->create_publisher<StringTypeAdapter>(topic_name, 1);

static constexpr size_t length = 10 * 1024 * 1024;
std::string message_data;
message_data.reserve(length);
std::fill(message_data.begin(), message_data.begin() + length, '#');
pub->publish(message_data);
}