diff --git a/src/cpp/rtps/history/WriterHistory.cpp b/src/cpp/rtps/history/WriterHistory.cpp index 28fdf946986..6a7184b52a8 100644 --- a/src/cpp/rtps/history/WriterHistory.cpp +++ b/src/cpp/rtps/history/WriterHistory.cpp @@ -358,9 +358,16 @@ void WriterHistory::set_fragments( // If inlineqos for related_sample_identity is required, then remove its size from the final fragment size. if (0 < inline_qos_size) { - final_high_mark_for_frag -= ( - fastdds::dds::ParameterSerializer::PARAMETER_SENTINEL_SIZE + - inline_qos_size); + uint32_t overhead = fastdds::dds::ParameterSerializer::PARAMETER_SENTINEL_SIZE + inline_qos_size; + constexpr uint32_t min_fragment_size = 4; + if (final_high_mark_for_frag < (overhead + min_fragment_size)) + { + final_high_mark_for_frag = min_fragment_size; + } + else + { + final_high_mark_for_frag -= overhead; + } } // If it is big data, fragment it. diff --git a/src/cpp/rtps/participant/RTPSParticipantImpl.cpp b/src/cpp/rtps/participant/RTPSParticipantImpl.cpp index a0b48ad210f..cc063ded78f 100644 --- a/src/cpp/rtps/participant/RTPSParticipantImpl.cpp +++ b/src/cpp/rtps/participant/RTPSParticipantImpl.cpp @@ -2573,20 +2573,22 @@ uint32_t RTPSParticipantImpl::getMaxDataSize() uint32_t RTPSParticipantImpl::calculateMaxDataSize( uint32_t length) { - uint32_t maxDataSize = length; - + // RTPS header + uint32_t overhead = RTPSMESSAGE_HEADER_SIZE; #if HAVE_SECURITY // If there is rtps messsage protection, reduce max size for messages, // because extra data is added on encryption. if (security_attributes_.is_rtps_protected) { - maxDataSize -= m_security_manager.calculate_extra_size_for_rtps_message(); + overhead += m_security_manager.calculate_extra_size_for_rtps_message(); } #endif // if HAVE_SECURITY - // RTPS header - maxDataSize -= RTPSMESSAGE_HEADER_SIZE; - return maxDataSize; + if (length <= overhead) + { + return 0; + } + return length - overhead; } bool RTPSParticipantImpl::networkFactoryHasRegisteredTransports() const diff --git a/test/blackbox/common/DDSBlackboxTestsListeners.cpp b/test/blackbox/common/DDSBlackboxTestsListeners.cpp index 873537a8da3..9304681085a 100644 --- a/test/blackbox/common/DDSBlackboxTestsListeners.cpp +++ b/test/blackbox/common/DDSBlackboxTestsListeners.cpp @@ -2942,7 +2942,7 @@ TEST(DDSStatus, sample_rejected_waitset) .disable_heartbeat_piggyback(true) .asynchronously(eprosima::fastdds::dds::PublishModeQosPolicyKind::ASYNCHRONOUS_PUBLISH_MODE) .add_throughput_controller_descriptor_to_pparams( // Be sure are sent in separate submessage each DATA. - eprosima::fastdds::rtps::FlowControllerSchedulerPolicy::FIFO, 100, 50) + eprosima::fastdds::rtps::FlowControllerSchedulerPolicy::FIFO, 300, 300) .init(); reader.history_kind(eprosima::fastdds::dds::KEEP_ALL_HISTORY_QOS) diff --git a/test/unittest/rtps/history/CMakeLists.txt b/test/unittest/rtps/history/CMakeLists.txt index 73c78eceb21..7fbe6a31203 100644 --- a/test/unittest/rtps/history/CMakeLists.txt +++ b/test/unittest/rtps/history/CMakeLists.txt @@ -84,6 +84,8 @@ set(TOPICPAYLOADPOOLTESTS_SOURCE ${PROJECT_SOURCE_DIR}/src/cpp/utils/IPLocator.cpp ${PROJECT_SOURCE_DIR}/src/cpp/utils/SystemInfo.cpp) +set(WRITERHISTORYTESTS_SOURCE WriterHistoryTests.cpp) + if(WIN32) add_definitions(-D_WIN32_WINNT=0x0601) endif() @@ -164,3 +166,20 @@ target_link_libraries(TopicPayloadPoolTests GTest::gtest ${CMAKE_DL_LIBS}) gtest_discover_tests(TopicPayloadPoolTests) + + + +add_executable(WriterHistoryTests ${WRITERHISTORYTESTS_SOURCE}) +target_compile_definitions(WriterHistoryTests PRIVATE + BOOST_ASIO_STANDALONE + ASIO_STANDALONE + $<$>,$>:__DEBUG> + $<$:__INTERNALDEBUG> # Internal debug activated. + ) +target_link_libraries(WriterHistoryTests + fastcdr + fastrtps + foonathan_memory + GTest::gtest + ${CMAKE_DL_LIBS}) +gtest_discover_tests(WriterHistoryTests) diff --git a/test/unittest/rtps/history/WriterHistoryTests.cpp b/test/unittest/rtps/history/WriterHistoryTests.cpp new file mode 100644 index 00000000000..39b2be7cd7b --- /dev/null +++ b/test/unittest/rtps/history/WriterHistoryTests.cpp @@ -0,0 +1,105 @@ +// Copyright 2020 Proyectos y Sistemas de Mantenimiento SL (eProsima). +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#include + +#include +#include +#include +#include + + +namespace eprosima { +namespace fastrtps { +namespace rtps { + +using namespace testing; + +#define MAX_MESSAGE_SIZE 300 + +void cache_change_fragment( + uint32_t max_message_size, + uint32_t inline_qos_length, + bool expected_fragmentation) +{ + uint32_t domain_id = 0; + uint32_t initial_reserved_caches = 10; + std::string max_message_size_str = std::to_string(max_message_size); + + RTPSParticipantAttributes p_attr; + p_attr.properties.properties().emplace_back("fastdds.max_message_size", max_message_size_str); + RTPSParticipant* participant = RTPSDomain::createParticipant( + domain_id, true, p_attr); + + ASSERT_NE(participant, nullptr); + + HistoryAttributes h_attr; + h_attr.memoryPolicy = DYNAMIC_RESERVE_MEMORY_MODE; + h_attr.initialReservedCaches = initial_reserved_caches; + h_attr.payloadMaxSize = 250; + WriterHistory* history = new WriterHistory(h_attr); + + WriterAttributes w_attr; + RTPSWriter* writer = RTPSDomain::createRTPSWriter(participant, w_attr, history); + + ASSERT_NE(writer, nullptr); + + CacheChange_t* change = writer->new_change(ALIVE); + if (expected_fragmentation) + { + change->serializedPayload.length = 3 * max_message_size; + } + else + { + change->serializedPayload.length = max_message_size / 3; + } + change->inline_qos.length = inline_qos_length; + history->add_change(change); + + auto result = change->getFragmentSize(); + std::cout << "Fragment size: " << result << std::endl; + if (expected_fragmentation) + { + ASSERT_NE(result, 0); + } + else + { + ASSERT_EQ(result, 0); + } +} + +/** + * This test checks the fragment size calculation for a cache change depending on the inline qos length. + * The change.serializedPayload.length is set to 3 times the max_allowed_payload_size, so the fragment size should always be set. + * In case of an overflow in the attribute high_mark_for_frag_ the fragment size will not be set, which is an error. + */ +TEST(WriterHistoryTests, final_high_mark_for_frag_overflow) +{ + for (uint32_t inline_qos_length = 0; inline_qos_length < MAX_MESSAGE_SIZE; inline_qos_length += 40) + { + cache_change_fragment(MAX_MESSAGE_SIZE, inline_qos_length, true); + } +} + +} // namespace rtps +} // namespace fastdds +} // namespace eprosima + +int main( + int argc, + char** argv) +{ + testing::InitGoogleTest(&argc, argv); + return RUN_ALL_TESTS(); +}