Skip to content

Commit

Permalink
Fix issue with exclusive ownership and unordered samples (#5182) (#5216)
Browse files Browse the repository at this point in the history
* Refs #20866. Regression test.

Signed-off-by: Miguel Company <miguelcompany@eprosima.com>

* Refs #20866. Additional regression test.

Signed-off-by: Miguel Company <miguelcompany@eprosima.com>

* Refs #20866. Fix issue.

Signed-off-by: Miguel Company <miguelcompany@eprosima.com>

* Refs #20866. Fix unit tests.

Signed-off-by: Miguel Company <miguelcompany@eprosima.com>

* Refs #20866. Refactor test to run several cases.

Signed-off-by: Miguel Company <miguelcompany@eprosima.com>

---------

Signed-off-by: Miguel Company <miguelcompany@eprosima.com>
(cherry picked from commit b1a7fe2)
Signed-off-by: JesusPoderoso <jesuspoderoso@eprosima.com>

# Conflicts:
#	test/unittest/dds/subscriber/DataReaderHistoryTests.cpp

Co-authored-by: Miguel Company <miguelcompany@eprosima.com>
  • Loading branch information
mergify[bot] and MiguelCompany authored Sep 11, 2024
1 parent e004c2b commit 624575f
Show file tree
Hide file tree
Showing 3 changed files with 208 additions and 81 deletions.
26 changes: 26 additions & 0 deletions src/cpp/fastdds/subscriber/history/DataReaderHistory.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -830,13 +830,39 @@ bool DataReaderHistory::update_instance_nts(

assert(vit != instances_.end());
assert(false == change->isRead);
auto previous_owner = vit->second->current_owner.first;
++counters_.samples_unread;
bool ret =
vit->second->update_state(counters_, change->kind, change->writerGUID,
change->reader_info.writer_ownership_strength);
change->reader_info.disposed_generation_count = vit->second->disposed_generation_count;
change->reader_info.no_writers_generation_count = vit->second->no_writers_generation_count;

auto current_owner = vit->second->current_owner.first;
if (current_owner != previous_owner)
{
assert(current_owner == change->writerGUID);

// Remove all changes from different owners after the change.
DataReaderInstance::ChangeCollection& changes = vit->second->cache_changes;
auto it = std::lower_bound(changes.begin(), changes.end(), change, rtps::history_order_cmp);
assert(it != changes.end());
assert(*it == change);
++it;
while (it != changes.end())
{
if ((*it)->writerGUID != current_owner)
{
// Remove from history
remove_change_sub(*it, it);

// Current iterator will point to change next to the one removed. Avoid incrementing.
continue;
}
++it;
}
}

return ret;
}

Expand Down
136 changes: 136 additions & 0 deletions test/blackbox/common/DDSBlackboxTestsOwnershipQos.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -2161,6 +2161,142 @@ TEST_P(OwnershipQos, exclusive_kind_keyed_besteffort_disposing_instance)
exclusive_kind_keyed_disposing_instance(false);
}

/*!
* This is a regression test for redmine issue 20866.
*
* This test checks that a reader keeping a long number of samples and with an exclusive ownership policy only
* returns the data from the writer with the highest strength.
*
* @param use_keep_all_history Whether to use KEEP_ALL history or KEEP_LAST(20).
* @param mixed_data Whether to send data from both writers in an interleaved way.
*/
static void test_exclusive_kind_big_history(
bool use_keep_all_history,
bool mixed_data)
{
PubSubReader<KeyedHelloWorldPubSubType> reader(TEST_TOPIC_NAME);
PubSubWriter<KeyedHelloWorldPubSubType> low_strength_writer(TEST_TOPIC_NAME);
PubSubWriter<KeyedHelloWorldPubSubType> high_strength_writer(TEST_TOPIC_NAME);

// Configure history QoS.
if (use_keep_all_history)
{
reader.history_kind(eprosima::fastdds::dds::KEEP_ALL_HISTORY_QOS);
low_strength_writer.history_kind(eprosima::fastdds::dds::KEEP_ALL_HISTORY_QOS);
high_strength_writer.history_kind(eprosima::fastdds::dds::KEEP_ALL_HISTORY_QOS);
}
else
{
reader.history_kind(eprosima::fastdds::dds::KEEP_LAST_HISTORY_QOS).history_depth(20);
low_strength_writer.history_kind(eprosima::fastdds::dds::KEEP_LAST_HISTORY_QOS).history_depth(20);
high_strength_writer.history_kind(eprosima::fastdds::dds::KEEP_LAST_HISTORY_QOS).history_depth(20);
}

// Prepare data.
std::list<KeyedHelloWorld> generated_data = default_keyedhelloworld_data_generator(20);
auto middle = std::next(generated_data.begin(), 10);
std::list<KeyedHelloWorld> low_strength_data(generated_data.begin(), middle);
std::list<KeyedHelloWorld> high_strength_data(middle, generated_data.end());
auto expected_data = high_strength_data;

if (mixed_data)
{
// Expect reception of the first two samples from the low strength writer (one per instance).
auto it = low_strength_data.begin();
expected_data.push_front(*it++);
expected_data.push_front(*it);
}

// Initialize writers.
low_strength_writer.ownership_strength(3)
.reliability(eprosima::fastdds::dds::RELIABLE_RELIABILITY_QOS)
.init();
ASSERT_TRUE(low_strength_writer.isInitialized());

// High strength writer will use a custom transport to ensure its data is received after the low strength data.
auto test_transport = std::make_shared<test_UDPv4TransportDescriptor>();
std::atomic<bool> drop_messages(false);
test_transport->messages_filter_ = [&drop_messages](eprosima::fastrtps::rtps::CDRMessage_t&)
{
return drop_messages.load();
};
high_strength_writer.ownership_strength(4)
.reliability(eprosima::fastdds::dds::RELIABLE_RELIABILITY_QOS)
.disable_builtin_transport()
.add_user_transport_to_pparams(test_transport)
.init();
ASSERT_TRUE(high_strength_writer.isInitialized());

// Initialize reader.
reader.ownership_exclusive()
.reliability(eprosima::fastdds::dds::RELIABLE_RELIABILITY_QOS)
.init();
ASSERT_TRUE(reader.isInitialized());

// Wait for discovery.
low_strength_writer.wait_discovery();
high_strength_writer.wait_discovery();
reader.wait_discovery(std::chrono::seconds::zero(), 2);

// Drop the messages from the high strength writer, so they arrive later to the reader.
drop_messages.store(true);

if (mixed_data)
{
// Send one sample from each writer, with low strength data first.
while (!low_strength_data.empty() && !high_strength_data.empty())
{
EXPECT_TRUE(low_strength_writer.send_sample(low_strength_data.front()));
EXPECT_TRUE(high_strength_writer.send_sample(high_strength_data.front()));
low_strength_data.pop_front();
high_strength_data.pop_front();
}
}
else
{
// Send high strength data first, so it has the lowest source timestamps, but drop the messages, so they arrive
// later to the reader.
high_strength_writer.send(high_strength_data);
EXPECT_TRUE(high_strength_data.empty());

// Send low strength data, so it has the highest source timestamps.
low_strength_writer.send(low_strength_data);
EXPECT_TRUE(low_strength_data.empty());
}

// Wait for the reader to receive the low strength data.
EXPECT_TRUE(low_strength_writer.waitForAllAcked(std::chrono::seconds(1)));

// Let high strength writer send the data, and wait for the reader to receive it.
drop_messages.store(false);
EXPECT_TRUE(high_strength_writer.waitForAllAcked(std::chrono::seconds(1)));

// Make the reader process the data, expecting only the required data.
// The issue was reproduced by the reader complaining about reception of unexpected data.
reader.startReception(expected_data);
reader.block_for_all();
}

TEST(OwnershipQos, exclusive_kind_keep_all_reliable)
{
test_exclusive_kind_big_history(true, false);
}

TEST(OwnershipQos, exclusive_kind_keep_all_reliable_mixed)
{
test_exclusive_kind_big_history(true, true);
}

TEST(OwnershipQos, exclusive_kind_keep_last_reliable)
{
test_exclusive_kind_big_history(false, false);
}

TEST(OwnershipQos, exclusive_kind_keep_last_reliable_mixed)
{
test_exclusive_kind_big_history(false, true);
}

#ifdef INSTANTIATE_TEST_SUITE_P
#define GTEST_INSTANTIATE_TEST_MACRO(x, y, z, w) INSTANTIATE_TEST_SUITE_P(x, y, z, w)
#else
Expand Down
Loading

0 comments on commit 624575f

Please sign in to comment.