-
Notifications
You must be signed in to change notification settings - Fork 119
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Magic numbers in serialization #36
Comments
Those "magic numbers" were a consequence of not having fragmentation in Now FastRTPS has fragmentation and you can remove that limitation. |
@richiware I've been trying to remove these restrictions and test out our demos that send images. I am trying to do this by using the FastBuffer with internally managed memory (the default constructor) on publish/serialize: diff --git a/rmw_fastrtps_cpp/include/rmw_fastrtps_cpp/TypeSupport_impl.h b/rmw_fastrtps_cpp/include/rmw_fastrtps_cpp/TypeSupport_impl.h
index eeac5f8..7e1bfdf 100644
--- a/rmw_fastrtps_cpp/include/rmw_fastrtps_cpp/TypeSupport_impl.h
+++ b/rmw_fastrtps_cpp/include/rmw_fastrtps_cpp/TypeSupport_impl.h
@@ -837,7 +837,8 @@ bool TypeSupport<MembersType>::serializeROSmessage(
assert(buffer);
assert(ros_message);
- eprosima::fastcdr::FastBuffer fastbuffer(buffer->pointer, m_typeSize);
+ // eprosima::fastcdr::FastBuffer fastbuffer(buffer->pointer, m_typeSize);
+ eprosima::fastcdr::FastBuffer fastbuffer;
eprosima::fastcdr::Cdr ser(fastbuffer);
if(members_->member_count_ != 0) To do this I had to make some changes to FastCDR so it would try resizing before giving up when deserializing: diff --git a/src/cpp/Cdr.cpp b/src/cpp/Cdr.cpp
index 92ea900..74ffc63 100644
--- a/src/cpp/Cdr.cpp
+++ b/src/cpp/Cdr.cpp
@@ -1025,7 +1025,7 @@ Cdr& Cdr::deserialize(int32_t &long_t)
size_t align = alignment(sizeof(long_t));
size_t sizeAligned = sizeof(long_t) + align;
- if((m_lastPosition - m_currentPosition) >= sizeAligned)
+ if((m_lastPosition - m_currentPosition) >= sizeAligned || resize(sizeAligned))
{
// Save last datasize.
m_lastDataSize = sizeof(long_t);
@@ -1326,7 +1326,7 @@ const char* Cdr::readString(uint32_t &length)
{
return returnedValue;
}
- else if((m_lastPosition - m_currentPosition) >= length)
+ else if((m_lastPosition - m_currentPosition) >= length || resize(length))
{
// Save last datasize.
m_lastDataSize = sizeof(uint8_t);
@@ -1375,7 +1375,7 @@ Cdr& Cdr::deserializeArray(char *char_t, size_t numElements)
{
size_t totalSize = sizeof(*char_t)*numElements;
- if((m_lastPosition - m_currentPosition) >= totalSize)
+ if((m_lastPosition - m_currentPosition) >= totalSize || resize(totalSize))
{
// Save last datasize.
m_lastDataSize = sizeof(*char_t); I think there are other places that will be necessary, but the above is what I had to do to get past the "not enough space in buffer" errors. I managed to get to the point where I could publish large messages, but I'm currently running into the issue that the taken messages are all empty on the subscriber side. I've done a few things:
diff --git a/rmw_fastrtps_cpp/include/rmw_fastrtps_cpp/TypeSupport_impl.h b/rmw_fastrtps_cpp/include/rmw_fastrtps_cpp/TypeSupport_impl.h
index eeac5f8..7e1bfdf 100644
--- a/rmw_fastrtps_cpp/include/rmw_fastrtps_cpp/TypeSupport_impl.h
+++ b/rmw_fastrtps_cpp/include/rmw_fastrtps_cpp/TypeSupport_impl.h
@@ -235,10 +235,10 @@ void serialize_array(
ser.serializeArray((T*)field, member->array_size_);
} else {
std::vector<T> & data = *reinterpret_cast<std::vector<T> *>(field);
- if(data.size() > (member->is_upper_bound_ ? member->array_size_ : (typeTooLarge ? 30 : 101))) {
- printf("vector overcomes the maximum length\n");
- throw std::runtime_error("vector overcomes the maximum length");
- }
+ // if(data.size() > (member->is_upper_bound_ ? member->array_size_ : (typeTooLarge ? 30 : 101))) {
+ // printf("vector overcomes the maximum length\n");
+ // throw std::runtime_error("vector overcomes the maximum length");
+ // }
ser << data;
}
}
@@ -255,10 +255,10 @@ void serialize_array(
ser.serializeArray((T*)field, member->array_size_);
} else {
auto & data = *reinterpret_cast<typename GenericCArray<T>::type *>(field);
- if(data.size > (member->is_upper_bound_ ? member->array_size_ : (typeTooLarge ? 30 : 101))) {
- printf("vector overcomes the maximum length\n");
- throw std::runtime_error("vector overcomes the maximum length");
- }
+ // if(data.size > (member->is_upper_bound_ ? member->array_size_ : (typeTooLarge ? 30 : 101))) {
+ // printf("vector overcomes the maximum length\n");
+ // throw std::runtime_error("vector overcomes the maximum length");
+ // }
ser.serializeArray((T*)data.data, data.size);
}
}
@@ -278,11 +278,11 @@ void serialize_array<std::string>(
ser.serializeArray(string_field->data, member->array_size_);
} else {
auto & string_array_field = *reinterpret_cast<rosidl_generator_c__String__Array *>(field);
- if(
- string_array_field.size > (member->is_upper_bound_ ? member->array_size_ : (typeTooLarge ? 30 : 101))) {
- printf("vector overcomes the maximum length\n");
- throw std::runtime_error("vector overcomes the maximum length");
- }
+ // if(
+ // string_array_field.size > (member->is_upper_bound_ ? member->array_size_ : (typeTooLarge ? 30 : 101))) {
+ // printf("vector overcomes the maximum length\n");
+ // throw std::runtime_error("vector overcomes the maximum length");
+ // }
std::vector<std::string> cpp_string_vector;
for (size_t i = 0; i < string_array_field.size; ++i) {
cpp_string_vector.push_back(CStringHelper::convert_to_std_string(string_array_field.data[i]));
@@ -302,10 +302,10 @@ size_t get_array_size_and_assign_field(
std::vector<unsigned char> *vector = reinterpret_cast<std::vector<unsigned char> *>(field);
void *ptr = (void*)sub_members_size;
size_t vsize = vector->size() / (size_t)align_(max_align, 0, ptr, space);
- if (member->is_upper_bound_ && vsize > member->array_size_) {
- printf("vector overcomes the maximum length\n");
- throw std::runtime_error("vector overcomes the maximum length");
- }
+ // if (member->is_upper_bound_ && vsize > member->array_size_) {
+ // printf("vector overcomes the maximum length\n");
+ // throw std::runtime_error("vector overcomes the maximum length");
+ // }
subros_message = reinterpret_cast<void*>(vector->data());
return vsize;
}
diff --git a/rmw_fastrtps_cpp/src/functions.cpp b/rmw_fastrtps_cpp/src/functions.cpp
index fe09485..f05a564 100644
--- a/rmw_fastrtps_cpp/src/functions.cpp
+++ b/rmw_fastrtps_cpp/src/functions.cpp
@@ -728,6 +728,7 @@ extern "C"
publisherParam.topic.topicKind = NO_KEY;
publisherParam.topic.topicDataType = type_name;
publisherParam.topic.topicName = topic_name;
+ publisherParam.qos.m_publishMode.kind = ASYNCHRONOUS_PUBLISH_MODE;
if(!get_datawriter_qos(*qos_policies, publisherParam))
goto fail;
diff --git a/rmw_fastrtps_cpp/include/rmw_fastrtps_cpp/MessageTypeSupport_impl.h b/rmw_fastrtps_cpp/include/rmw_fastrtps_cpp/MessageTypeSupport_impl.h
index 86799d9..8366285 100644
--- a/rmw_fastrtps_cpp/include/rmw_fastrtps_cpp/MessageTypeSupport_impl.h
+++ b/rmw_fastrtps_cpp/include/rmw_fastrtps_cpp/MessageTypeSupport_impl.h
@@ -22,6 +22,7 @@
#include <fastcdr/Cdr.h>
#include <cassert>
+#include <limits>
#include <memory>
using namespace rmw_fastrtps_cpp;
@@ -38,10 +39,11 @@ MessageTypeSupport<MembersType>::MessageTypeSupport(const MembersType *members)
std::string name = std::string(members->package_name_) + "::msg::dds_::" + members->message_name_ + "_";
this->setName(name.c_str());
- if(members->member_count_ != 0)
- this->m_typeSize = static_cast<uint32_t>(this->calculateMaxSerializedSize(members, 0));
- else
- this->m_typeSize = 1;
+ // if(members->member_count_ != 0)
+ // this->m_typeSize = static_cast<uint32_t>(this->calculateMaxSerializedSize(members, 0));
+ // else
+ // this->m_typeSize = 1;
+ this->m_typeSize = std::numeric_limits<uint32_t>::max();
}
#endif // _RMW_FASTRTPS_CPP_MESSAGETYPESUPPORT_IMPL_H_ That's about it. The total set of changes can be seen in these two branches:
I'm going to continue debugging this issue, but I'd appreciate any input you might have. |
I figured out that my changes around using the dynamically resizing FastBuffer resulted in writing nothing when publishing. So now I'm trying to figure out how to specify the payload size at publish time. It seems to me that the payload size doesn't change per publish but per instance. Maybe an assumption that the "serialized size" doesn't change? I'm going to keep digging... |
Nevermind, again... The size is not passed to write, because the void * there gets passed back around to the virtual function that we define in this repositories code, so we cast it back to a structure which is the data pointer + a size. I'm currently going through and replacing that simple structure with the |
Ok, I have it working now, but the performance isn't as good as I was hoping it would be. Looking at wireshark it looks like we're sending everything twice to a unicast address and a multicast address. Is there a good way to control that? Maybe I'm mistaken about that too. Also, are there any fragmentation buffers or resource limits I can adjust? My naive guess is that the messages aren't arriving in full (some fragments are missing) and so many messages are getting dropped on the subscriber side. To reproduce what I have, you'll need the two branches I listed above. And then I run:
You can use If you guys have time to look into the performance issues we're having or have any suggestions I'll do my best to help test and integrate changes. |
Another interesting data point, thanks to @codebot for suggesting it: If I change the publish period in the cam2image program from the default 30 Hz to something like 1 Hz, or even a period of 5 seconds, I still only receive one image in about 100. I see this with both reliable and best-effort. This points to some kind of race condition on completing the fragments on the subscription side or some issue with sending them out on the publisher's side. The fact that I get more images on the subscriber side with an increased publisher rate seems to indicate to me that this is not a performance problem. Unfortunately there is no command line option for the publish rate of the cam2image, but you can change the value directly and recompile it: https://github.com/ros2/demos/blob/master/image_tools/src/cam2image.cpp#L134 Also, I forgot to mention I'm using a custom branch for that repository that adds the |
@wjwwood I was testing cam2img and showimage applications. I was working in diff --git a/rmw_fastrtps_cpp/include/rmw_fastrtps_cpp/TypeSupport_impl.h b/rmw_fastrtps_cpp/include/rmw_fastrtps_cpp/TypeSupport_impl.h
index eeac5f8..741d2c0 100644
--- a/rmw_fastrtps_cpp/include/rmw_fastrtps_cpp/TypeSupport_impl.h
+++ b/rmw_fastrtps_cpp/include/rmw_fastrtps_cpp/TypeSupport_impl.h
@@ -235,7 +235,7 @@ void serialize_array(
ser.serializeArray((T*)field, member->array_size_);
} else {
std::vector<T> & data = *reinterpret_cast<std::vector<T> *>(field);
- if(data.size() > (member->is_upper_bound_ ? member->array_size_ : (typeTooLarge ? 30 : 101))) {
+ if(data.size() > (member->is_upper_bound_ ? member->array_size_ : 300000)) {
printf("vector overcomes the maximum length\n");
throw std::runtime_error("vector overcomes the maximum length");
}
@@ -255,7 +255,7 @@ void serialize_array(
ser.serializeArray((T*)field, member->array_size_);
} else {
auto & data = *reinterpret_cast<typename GenericCArray<T>::type *>(field);
- if(data.size > (member->is_upper_bound_ ? member->array_size_ : (typeTooLarge ? 30 : 101))) {
+ if(data.size > (member->is_upper_bound_ ? member->array_size_ : 300000)) {
printf("vector overcomes the maximum length\n");
throw std::runtime_error("vector overcomes the maximum length");
}
@@ -279,7 +279,7 @@ void serialize_array<std::string>(
} else {
auto & string_array_field = *reinterpret_cast<rosidl_generator_c__String__Array *>(field);
if(
- string_array_field.size > (member->is_upper_bound_ ? member->array_size_ : (typeTooLarge ? 30 : 101))) {
+ string_array_field.size > (member->is_upper_bound_ ? member->array_size_ : 300000)) {
printf("vector overcomes the maximum length\n");
throw std::runtime_error("vector overcomes the maximum length");
}
@@ -781,7 +781,7 @@ size_t TypeSupport<MembersType>::calculateMaxSerializedSize(
{
current_alignment += 4 + eprosima::fastcdr::Cdr::alignment(current_alignment, 4);
- if(array_size == 0) typeByDefaultLarge() ? array_size = 30 : array_size = 101;
+ if(array_size == 0) array_size = 300000;
}
switch(member->type_id_) Also I added extra configuration in the creation of publishers and subscribers: diff --git a/rmw_fastrtps_cpp/src/functions.cpp b/rmw_fastrtps_cpp/src/functions.cpp
index fe09485..7e09622 100644
--- a/rmw_fastrtps_cpp/src/functions.cpp
+++ b/rmw_fastrtps_cpp/src/functions.cpp
@@ -728,6 +728,16 @@ extern "C"
publisherParam.topic.topicKind = NO_KEY;
publisherParam.topic.topicDataType = type_name;
publisherParam.topic.topicName = topic_name;
+ publisherParam.qos.m_publishMode.kind = ASYNCHRONOUS_PUBLISH_MODE;
+
+ // 1 Heartbeat every 10ms
+ publisherParam.times.heartbeatPeriod.seconds = 0;
+ publisherParam.times.heartbeatPeriod.fraction = 42949673;
+
+ // 300000 bytes each 10ms
+ ThroughputControllerDescriptor throughputController{300000, 10};
+ publisherParam.terminalThroughputController = throughputController;
+
if(!get_datawriter_qos(*qos_policies, publisherParam))
goto fail;
@@ -984,6 +994,10 @@ fail:
subscriberParam.topic.topicKind = NO_KEY;
subscriberParam.topic.topicDataType = type_name;
subscriberParam.topic.topicName = topic_name;
+ Locator_t multicast_locator;
+ multicast_locator.set_IP4_address("239.255.0.1");
+ multicast_locator.port = 7600;
+ subscriberParam.multicastLocatorList.push_back(multicast_locator);
if(!get_datareader_qos(*qos_policies, subscriberParam))
goto fail;
I was testing all this in local machine and works well. Can you test this and check if performance increases? |
@richiprosima I applied the settings for the heartbeat and throughput controller, and narrowed the subscriber to only the multicast address. I believe I did this correctly (the params structure looks right in the debugger). I was already turning on asnyc publishing. However, I'm still seeing the behavior that only a few of the images get through. I have some thoughts on why that might be, first off I'm on OS X. I can try my experiments on Linux (at least in a VM) after I write this comment up. I'm also not working around the |
Ok, so my changes do not work on Linux, because of how I override the On OS X, this just works because of how OS X implements malloc, see: http://www.cocoawithlove.com/2010/05/look-at-how-malloc-works-on-mac.html So it was "hiding" the issue from me. So it seems that I need to figure out how to initially allocate the serialized payload's storage and resize them on demand, as is the case with the serializer now. I'll continue to look into that. |
Actually https://developer.apple.com/library/prerelease/content/documentation/Performance/Conceptual/ManagingMemory/Articles/MemoryAlloc.html#//apple_ref/doc/uid/20001881-99765 may be more enlightening. Thanks to @codebot for pointing that out to me. |
I just wanted to update on this thread that I was able to get this working on Linux and it behaves as expected (most if not all messages are received). I had to make some changes in FastRTPS to support this, however. I'll post those changes up as soon as I am able. |
After discussing with @richiprosima, I'm going to clean up my patches to Fast-CDR and Fast-RTPS. The goal is to get Fast-CDR and Fast-RTPS into a state where we can do all dynamic allocation for publishing and subscribing (no fixed max size for messages). Note: when I say "messages" I typically mean "samples" in DDS terms. Long term, I think it would be good serve these use cases:
The first case is already covered by Fast-RTPS. The second case is most similar to what happens in ROS 1, but is not supported by Fast-RTPS (as far as I know) at the moment. The third case is sort of what I have with my patches, i.e. it pre-allocates, but will realloc when necessary. However, with my patches the default "max message size" for all messages is 0 (I was lazy), so no actual pre-allocation occurs, but it does stop making new allocations in steady state. To meet all of these use cases, I think Fast-RTPS needs to have these options (and so does ROS 2):
A bonus setting would be (in the same vein as the last two "ideas"):
That last one is a bit more complicated to example, and I'm not sure if I explained it well. But it's really just another heuristic to make better decisions in the absence of explicit user data (maybe the user doesn't care, but we still want to deliver the best performance possible). However, we don't have to meet all of these use cases right now. I'll try to get my patches to the point that we can do some form of use case 3 or maybe use case 2, depending on how hard that is. This will incrementally improve the experience by allowing large image demos to work, but will still potentially use lots of memory in steady state (in the case of my simple use case 3) or be really inefficient (in the case of use case 2). Later we can try to do something smarter as the default behavior. Another facet to look at is: what we should do for default behavior of ROS 2. I'd propose these bullets as the default behavior:
I'm going to start working on fixing up my changes to Fast-RTPS and Fast-CDR. @richiprosima let me know if you see any red flags. |
https://github.com/eProsima/ROS-RMW-Fast-RTPS-cpp/blob/master/rmw_fastrtps_cpp/src/TypeSupport.cpp#L180\
https://github.com/eProsima/ROS-RMW-Fast-RTPS-cpp/blob/master/rmw_fastrtps_cpp/src/TypeSupport.cpp#L567
I'm confused about the reasoning behind some of the "magic numbers" in these conditionals (30, 101, 257).
The text was updated successfully, but these errors were encountered: