Skip to content

Commit

Permalink
Updated to test rmw matched event
Browse files Browse the repository at this point in the history
Signed-off-by: Barry Xu <barry.xu@sony.com>
  • Loading branch information
Barry-Xu-2018 committed Mar 15, 2023
1 parent 14c9ae3 commit 7c85a32
Showing 1 changed file with 75 additions and 154 deletions.
229 changes: 75 additions & 154 deletions test_rmw_implementation/test/test_event.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -116,7 +116,7 @@ void event_callback(const void * user_data, size_t number_of_events)
}
}

TEST_F(CLASSNAME(TestEvent, RMW_IMPLEMENTATION), basic_publisher_event_matched_unmatched) {
TEST_F(CLASSNAME(TestEvent, RMW_IMPLEMENTATION), basic_publisher_matched_event) {
// Notice: Not support connextdds since it doesn't support rmw_event_set_callback() interface
if (std::string(rmw_get_implementation_identifier()).find("rmw_connextdds") == 0) {
GTEST_SKIP();
Expand All @@ -140,38 +140,21 @@ TEST_F(CLASSNAME(TestEvent, RMW_IMPLEMENTATION), basic_publisher_event_matched_u
RMW_RET_OK, rmw_event_fini(&pub_matched_event)) << rmw_get_error_string().str;
});

rmw_event_t pub_unmatched_event{rmw_get_zero_initialized_event()};
ret = rmw_publisher_event_init(&pub_unmatched_event, pub, RMW_EVENT_PUBLICATION_UNMATCHED);
ASSERT_EQ(RMW_RET_OK, ret);
OSRF_TESTING_TOOLS_CPP_SCOPE_EXIT(
{
EXPECT_EQ(
RMW_RET_OK, rmw_event_fini(&pub_unmatched_event)) << rmw_get_error_string().str;
});

struct EventUserData matched_data;
matched_data.event_count = std::make_shared<std::atomic_size_t>(0);
ret = rmw_event_set_callback(&pub_matched_event, event_callback, &matched_data);
ASSERT_EQ(RMW_RET_OK, ret);
struct EventUserData unmatched_data;
unmatched_data.event_count = std::make_shared<std::atomic_size_t>(0);
ret = rmw_event_set_callback(&pub_unmatched_event, event_callback, &unmatched_data);
ASSERT_EQ(RMW_RET_OK, ret);

// to take event if there is no subscription
{
rmw_matched_status_t matched_status;
bool taken;
EXPECT_EQ(RMW_RET_OK, rmw_take_event(&pub_matched_event, &matched_status, &taken));
EXPECT_EQ(0, matched_status.current_matched_count);
EXPECT_EQ(0, matched_status.total_count);
EXPECT_EQ(0, matched_status.total_count_change);
EXPECT_EQ(0, matched_status.current_count);
EXPECT_EQ(0, matched_status.current_count_change);
EXPECT_TRUE(taken);

rmw_unmatched_status_t unmatched_status;
EXPECT_EQ(RMW_RET_OK, rmw_take_event(&pub_unmatched_event, &unmatched_status, &taken));
EXPECT_EQ(0, unmatched_status.current_matched_count);
EXPECT_EQ(0, unmatched_status.current_count_change);
EXPECT_TRUE(taken);
}

// test the matched event while a subscription coming
Expand All @@ -181,48 +164,47 @@ TEST_F(CLASSNAME(TestEvent, RMW_IMPLEMENTATION), basic_publisher_event_matched_u

std::this_thread::sleep_for(std::chrono::milliseconds(100));
EXPECT_EQ(*matched_data.event_count, 1);
EXPECT_EQ(*unmatched_data.event_count, 0);

rmw_subscription_t * sub2 =
rmw_create_subscription(node, ts, topic_name, &rmw_qos_profile_default, &sub_options);
ASSERT_NE(nullptr, sub2) << rmw_get_error_string().str;

std::this_thread::sleep_for(std::chrono::milliseconds(100));
EXPECT_EQ(*matched_data.event_count, 2);
EXPECT_EQ(*unmatched_data.event_count, 0);

// wait matched event
rmw_matched_status_t matched_status;
wait_and_take_event(&pub_matched_event, &matched_status);
EXPECT_EQ(2, matched_status.current_matched_count);
EXPECT_EQ(2, matched_status.total_count);
EXPECT_EQ(2, matched_status.total_count_change);
EXPECT_EQ(2, matched_status.current_count);
EXPECT_EQ(2, matched_status.current_count_change);

// Next, check unmatched event
// Next, check unmatched status change
*matched_data.event_count = 0;

// test the unmatched event while the subscription exiting
ret = rmw_destroy_subscription(node, sub1);
EXPECT_EQ(RMW_RET_OK, ret) << rmw_get_error_string().str;

std::this_thread::sleep_for(std::chrono::milliseconds(100));
EXPECT_EQ(*unmatched_data.event_count, 1);
EXPECT_EQ(*matched_data.event_count, 0);
EXPECT_EQ(*matched_data.event_count, 1);

ret = rmw_destroy_subscription(node, sub2);
EXPECT_EQ(RMW_RET_OK, ret) << rmw_get_error_string().str;

std::this_thread::sleep_for(std::chrono::milliseconds(100));
EXPECT_EQ(*unmatched_data.event_count, 2);
EXPECT_EQ(*matched_data.event_count, 0);

// wait unmatched event
rmw_unmatched_status_t unmatched_status;
wait_and_take_event(&pub_unmatched_event, &unmatched_status);
EXPECT_EQ(0, unmatched_status.current_matched_count);
EXPECT_EQ(2, unmatched_status.current_count_change);
EXPECT_EQ(*matched_data.event_count, 2);

// wait unmatched status change
wait_and_take_event(&pub_matched_event, &matched_status);
EXPECT_EQ(2, matched_status.total_count);
EXPECT_EQ(0, matched_status.total_count_change);
EXPECT_EQ(0, matched_status.current_count);
EXPECT_EQ(-2, matched_status.current_count_change);
}

TEST_F(CLASSNAME(TestEvent, RMW_IMPLEMENTATION), basic_subscription_event_matched_unmatched) {
TEST_F(CLASSNAME(TestEvent, RMW_IMPLEMENTATION), basic_subscription_matched_event) {
// Notice: Not support connextdds since it doesn't support rmw_event_set_callback() interface
if (std::string(rmw_get_implementation_identifier()).find("rmw_connextdds") == 0) {
GTEST_SKIP();
Expand All @@ -249,39 +231,21 @@ TEST_F(CLASSNAME(TestEvent, RMW_IMPLEMENTATION), basic_subscription_event_matche
RMW_RET_OK, rmw_event_fini(&sub_matched_event)) << rmw_get_error_string().str;
});

rmw_event_t sub_unmatched_event{rmw_get_zero_initialized_event()};
ret = rmw_subscription_event_init(&sub_unmatched_event, sub, RMW_EVENT_SUBSCRIPTION_UNMATCHED);
ASSERT_EQ(RMW_RET_OK, ret);
OSRF_TESTING_TOOLS_CPP_SCOPE_EXIT(
{
EXPECT_EQ(
RMW_RET_OK, rmw_event_fini(&sub_unmatched_event)) << rmw_get_error_string().str;
});

struct EventUserData matched_data;
matched_data.event_count = std::make_shared<std::atomic_size_t>(0);
ret = rmw_event_set_callback(&sub_matched_event, event_callback, &matched_data);
ASSERT_EQ(RMW_RET_OK, ret);

struct EventUserData unmatched_data;
unmatched_data.event_count = std::make_shared<std::atomic_size_t>(0);
ret = rmw_event_set_callback(&sub_unmatched_event, event_callback, &unmatched_data);
ASSERT_EQ(RMW_RET_OK, ret);

// to take event if there is no publisher
{
rmw_matched_status_t matched_status;
bool taken;
EXPECT_EQ(RMW_RET_OK, rmw_take_event(&sub_matched_event, &matched_status, &taken));
EXPECT_EQ(0, matched_status.current_matched_count);
EXPECT_EQ(0, matched_status.total_count);
EXPECT_EQ(0, matched_status.total_count_change);
EXPECT_EQ(0, matched_status.current_count);
EXPECT_EQ(0, matched_status.current_count_change);
EXPECT_TRUE(taken);

rmw_unmatched_status_t unmatched_status;
EXPECT_EQ(RMW_RET_OK, rmw_take_event(&sub_unmatched_event, &unmatched_status, &taken));
EXPECT_EQ(0, unmatched_status.current_matched_count);
EXPECT_EQ(0, unmatched_status.current_count_change);
EXPECT_TRUE(taken);
}

// test the matched event while a publisher coming
Expand All @@ -291,48 +255,47 @@ TEST_F(CLASSNAME(TestEvent, RMW_IMPLEMENTATION), basic_subscription_event_matche

std::this_thread::sleep_for(std::chrono::milliseconds(100));
EXPECT_EQ(*matched_data.event_count, 1);
EXPECT_EQ(*unmatched_data.event_count, 0);

rmw_publisher_t * pub2 =
rmw_create_publisher(node, ts, topic_name, &rmw_qos_profile_default, &pub_options);
ASSERT_NE(nullptr, pub2) << rmw_get_error_string().str;

std::this_thread::sleep_for(std::chrono::milliseconds(100));
EXPECT_EQ(*matched_data.event_count, 2);
EXPECT_EQ(*unmatched_data.event_count, 0);

// wait matched event
rmw_matched_status_t matched_status;
wait_and_take_event(&sub_matched_event, &matched_status);
EXPECT_EQ(2, matched_status.current_matched_count);
EXPECT_EQ(2, matched_status.total_count);
EXPECT_EQ(2, matched_status.total_count_change);
EXPECT_EQ(2, matched_status.current_count);
EXPECT_EQ(2, matched_status.current_count_change);

// Next, check unmatched event
// Next, check unmatched status change
*matched_data.event_count = 0;

// test the unmatched event while the publisher exiting
ret = rmw_destroy_publisher(node, pub1);
EXPECT_EQ(RMW_RET_OK, ret) << rmw_get_error_string().str;

std::this_thread::sleep_for(std::chrono::milliseconds(100));
EXPECT_EQ(*matched_data.event_count, 0);
EXPECT_EQ(*unmatched_data.event_count, 1);
EXPECT_EQ(*matched_data.event_count, 1);

ret = rmw_destroy_publisher(node, pub2);
EXPECT_EQ(RMW_RET_OK, ret) << rmw_get_error_string().str;

std::this_thread::sleep_for(std::chrono::milliseconds(100));
EXPECT_EQ(*matched_data.event_count, 0);
EXPECT_EQ(*unmatched_data.event_count, 2);

// wait unmatched event
rmw_unmatched_status_t unmatched_status;
wait_and_take_event(&sub_unmatched_event, &unmatched_status);
EXPECT_EQ(0, unmatched_status.current_matched_count);
EXPECT_EQ(2, unmatched_status.current_count_change);
EXPECT_EQ(*matched_data.event_count, 2);

// wait unmatched status change
wait_and_take_event(&sub_matched_event, &matched_status);
EXPECT_EQ(2, matched_status.total_count);
EXPECT_EQ(0, matched_status.total_count_change);
EXPECT_EQ(0, matched_status.current_count);
EXPECT_EQ(-2, matched_status.current_count_change);
}

TEST_F(CLASSNAME(TestEvent, RMW_IMPLEMENTATION), one_pub_multi_sub_matched_unmatched_event) {
TEST_F(CLASSNAME(TestEvent, RMW_IMPLEMENTATION), one_pub_multi_sub_connect_disconnect) {
rmw_publisher_t * pub =
rmw_create_publisher(node, ts, topic_name, &rmw_qos_profile_default, &pub_options);
ASSERT_NE(nullptr, pub) << rmw_get_error_string().str;
Expand All @@ -351,15 +314,6 @@ TEST_F(CLASSNAME(TestEvent, RMW_IMPLEMENTATION), one_pub_multi_sub_matched_unmat
RMW_RET_OK, rmw_event_fini(&pub_matched_event)) << rmw_get_error_string().str;
});

rmw_event_t pub_unmatched_event{rmw_get_zero_initialized_event()};
ret = rmw_publisher_event_init(&pub_unmatched_event, pub, RMW_EVENT_PUBLICATION_UNMATCHED);
ASSERT_EQ(RMW_RET_OK, ret);
OSRF_TESTING_TOOLS_CPP_SCOPE_EXIT(
{
EXPECT_EQ(
RMW_RET_OK, rmw_event_fini(&pub_unmatched_event)) << rmw_get_error_string().str;
});

// test the matched event while a subscription coming
rmw_subscription_t * sub1 =
rmw_create_subscription(node, ts, topic_name, &rmw_qos_profile_default, &sub_options);
Expand All @@ -371,45 +325,33 @@ TEST_F(CLASSNAME(TestEvent, RMW_IMPLEMENTATION), one_pub_multi_sub_matched_unmat

std::this_thread::sleep_for(std::chrono::milliseconds(100));

{
rmw_matched_status_t matched_status;
bool taken;
EXPECT_EQ(RMW_RET_OK, rmw_take_event(&pub_matched_event, &matched_status, &taken));
EXPECT_EQ(2, matched_status.current_matched_count);
EXPECT_EQ(2, matched_status.current_count_change);
EXPECT_TRUE(taken);

rmw_unmatched_status_t unmatched_status;
EXPECT_EQ(RMW_RET_OK, rmw_take_event(&pub_unmatched_event, &unmatched_status, &taken));
EXPECT_EQ(2, unmatched_status.current_matched_count);
EXPECT_EQ(0, unmatched_status.current_count_change);
EXPECT_TRUE(taken);
}
rmw_matched_status_t matched_status;
bool taken;
EXPECT_EQ(RMW_RET_OK, rmw_take_event(&pub_matched_event, &matched_status, &taken));
EXPECT_EQ(2, matched_status.total_count);
EXPECT_EQ(2, matched_status.total_count_change);
EXPECT_EQ(2, matched_status.current_count);
EXPECT_EQ(2, matched_status.current_count_change);
EXPECT_TRUE(taken);

// test the unmatched event while the subscription exiting
// test the unmatched status change while the subscription exiting
ret = rmw_destroy_subscription(node, sub1);
EXPECT_EQ(RMW_RET_OK, ret) << rmw_get_error_string().str;

// wait unmatched event
rmw_unmatched_status_t unmatched_status;
wait_and_take_event(&pub_unmatched_event, &unmatched_status);
EXPECT_EQ(1, unmatched_status.current_matched_count);
EXPECT_EQ(1, unmatched_status.current_count_change);
// wait unmatched status change
wait_and_take_event(&pub_matched_event, &matched_status);
EXPECT_EQ(2, matched_status.total_count);
EXPECT_EQ(0, matched_status.total_count_change);
EXPECT_EQ(1, matched_status.current_count);
EXPECT_EQ(-1, matched_status.current_count_change);

ret = rmw_destroy_subscription(node, sub2);
EXPECT_EQ(RMW_RET_OK, ret) << rmw_get_error_string().str;
wait_and_take_event(&pub_unmatched_event, &unmatched_status);
EXPECT_EQ(0, unmatched_status.current_matched_count);
EXPECT_EQ(1, unmatched_status.current_count_change);

{
rmw_matched_status_t matched_status;
bool taken;
EXPECT_EQ(RMW_RET_OK, rmw_take_event(&pub_matched_event, &matched_status, &taken));
EXPECT_EQ(0, matched_status.current_matched_count);
EXPECT_EQ(0, matched_status.current_count_change);
EXPECT_TRUE(taken);
}
wait_and_take_event(&pub_matched_event, &matched_status);
EXPECT_EQ(2, matched_status.total_count);
EXPECT_EQ(0, matched_status.total_count_change);
EXPECT_EQ(0, matched_status.current_count);
EXPECT_EQ(-1, matched_status.current_count_change);
}

TEST_F(CLASSNAME(TestEvent, RMW_IMPLEMENTATION), one_sub_multi_pub_matched_unmatched_event) {
Expand All @@ -434,15 +376,6 @@ TEST_F(CLASSNAME(TestEvent, RMW_IMPLEMENTATION), one_sub_multi_pub_matched_unmat
RMW_RET_OK, rmw_event_fini(&sub_matched_event)) << rmw_get_error_string().str;
});

rmw_event_t sub_unmatched_event{rmw_get_zero_initialized_event()};
ret = rmw_subscription_event_init(&sub_unmatched_event, sub, RMW_EVENT_SUBSCRIPTION_UNMATCHED);
ASSERT_EQ(RMW_RET_OK, ret);
OSRF_TESTING_TOOLS_CPP_SCOPE_EXIT(
{
EXPECT_EQ(
RMW_RET_OK, rmw_event_fini(&sub_unmatched_event)) << rmw_get_error_string().str;
});

// test the matched event while a publisher coming
rmw_publisher_t * pub1 =
rmw_create_publisher(node, ts, topic_name, &rmw_qos_profile_default, &pub_options);
Expand All @@ -454,43 +387,31 @@ TEST_F(CLASSNAME(TestEvent, RMW_IMPLEMENTATION), one_sub_multi_pub_matched_unmat

std::this_thread::sleep_for(std::chrono::milliseconds(100));

{
rmw_matched_status_t matched_status;
bool taken;
EXPECT_EQ(RMW_RET_OK, rmw_take_event(&sub_matched_event, &matched_status, &taken));
EXPECT_EQ(2, matched_status.current_matched_count);
EXPECT_EQ(2, matched_status.current_count_change);
EXPECT_TRUE(taken);

rmw_unmatched_status_t unmatched_status;
EXPECT_EQ(RMW_RET_OK, rmw_take_event(&sub_unmatched_event, &unmatched_status, &taken));
EXPECT_EQ(2, unmatched_status.current_matched_count);
EXPECT_EQ(0, unmatched_status.current_count_change);
EXPECT_TRUE(taken);
}
rmw_matched_status_t matched_status;
bool taken;
EXPECT_EQ(RMW_RET_OK, rmw_take_event(&sub_matched_event, &matched_status, &taken));
EXPECT_EQ(2, matched_status.total_count);
EXPECT_EQ(2, matched_status.total_count_change);
EXPECT_EQ(2, matched_status.current_count);
EXPECT_EQ(2, matched_status.current_count_change);
EXPECT_TRUE(taken);

// test the unmatched event while the publisher exiting
// test the unmatched status change while the publisher exiting
ret = rmw_destroy_publisher(node, pub1);
EXPECT_EQ(RMW_RET_OK, ret) << rmw_get_error_string().str;

// wait unmatched event
rmw_unmatched_status_t unmatched_status;
wait_and_take_event(&sub_unmatched_event, &unmatched_status);
EXPECT_EQ(1, unmatched_status.current_matched_count);
EXPECT_EQ(1, unmatched_status.current_count_change);
// wait unmatched status change
wait_and_take_event(&sub_matched_event, &matched_status);
EXPECT_EQ(2, matched_status.total_count);
EXPECT_EQ(0, matched_status.total_count_change);
EXPECT_EQ(1, matched_status.current_count);
EXPECT_EQ(-1, matched_status.current_count_change);

ret = rmw_destroy_publisher(node, pub2);
EXPECT_EQ(RMW_RET_OK, ret) << rmw_get_error_string().str;
wait_and_take_event(&sub_unmatched_event, &unmatched_status);
EXPECT_EQ(0, unmatched_status.current_matched_count);
EXPECT_EQ(1, unmatched_status.current_count_change);

{
rmw_matched_status_t matched_status;
bool taken;
EXPECT_EQ(RMW_RET_OK, rmw_take_event(&sub_matched_event, &matched_status, &taken));
EXPECT_EQ(0, matched_status.current_matched_count);
EXPECT_EQ(0, matched_status.current_count_change);
EXPECT_TRUE(taken);
}
wait_and_take_event(&sub_matched_event, &matched_status);
EXPECT_EQ(2, matched_status.total_count);
EXPECT_EQ(0, matched_status.total_count_change);
EXPECT_EQ(0, matched_status.current_count);
EXPECT_EQ(-1, matched_status.current_count_change);
}

0 comments on commit 7c85a32

Please sign in to comment.