From f13df4c79ed867488e607251a25c784643724a3e Mon Sep 17 00:00:00 2001 From: Barry Xu Date: Wed, 22 Mar 2023 21:35:10 +0800 Subject: [PATCH] Implement matched event (#645) Signed-off-by: Barry Xu --- .../custom_publisher_info.hpp | 13 +++ .../custom_subscriber_info.hpp | 15 +++ .../src/custom_publisher_info.cpp | 63 +++++++++++- .../src/custom_subscriber_info.cpp | 99 ++++++++++++++----- rmw_fastrtps_shared_cpp/src/rmw_event.cpp | 8 ++ 5 files changed, 170 insertions(+), 28 deletions(-) diff --git a/rmw_fastrtps_shared_cpp/include/rmw_fastrtps_shared_cpp/custom_publisher_info.hpp b/rmw_fastrtps_shared_cpp/include/rmw_fastrtps_shared_cpp/custom_publisher_info.hpp index 56d275096..f3023f72b 100644 --- a/rmw_fastrtps_shared_cpp/include/rmw_fastrtps_shared_cpp/custom_publisher_info.hpp +++ b/rmw_fastrtps_shared_cpp/include/rmw_fastrtps_shared_cpp/custom_publisher_info.hpp @@ -152,6 +152,13 @@ class RMWPublisherEvent final : public EventListenerInterface eprosima::fastdds::dds::QosPolicyId_t last_policy_id, uint32_t total_count, uint32_t total_count_change); + RMW_FASTRTPS_SHARED_CPP_PUBLIC + void update_matched( + int32_t total_count, + int32_t total_count_change, + int32_t current_count, + int32_t current_count_change); + private: CustomPublisherInfo * publisher_info_ = nullptr; @@ -178,6 +185,12 @@ class RMWPublisherEvent final : public EventListenerInterface eprosima::fastdds::dds::OfferedIncompatibleQosStatus incompatible_qos_status_ RCPPUTILS_TSA_GUARDED_BY(on_new_event_m_); + bool matched_changes_ + RCPPUTILS_TSA_GUARDED_BY(on_new_event_m_); + + eprosima::fastdds::dds::PublicationMatchedStatus matched_status_ + RCPPUTILS_TSA_GUARDED_BY(on_new_event_m_); + void trigger_event(rmw_event_type_t event_type); }; diff --git a/rmw_fastrtps_shared_cpp/include/rmw_fastrtps_shared_cpp/custom_subscriber_info.hpp b/rmw_fastrtps_shared_cpp/include/rmw_fastrtps_shared_cpp/custom_subscriber_info.hpp index 2eba28ae3..3b1010b73 100644 --- a/rmw_fastrtps_shared_cpp/include/rmw_fastrtps_shared_cpp/custom_subscriber_info.hpp +++ b/rmw_fastrtps_shared_cpp/include/rmw_fastrtps_shared_cpp/custom_subscriber_info.hpp @@ -196,6 +196,13 @@ class RMWSubscriptionEvent final : public EventListenerInterface eprosima::fastdds::dds::QosPolicyId_t last_policy_id, uint32_t total_count, uint32_t total_count_change); + RMW_FASTRTPS_SHARED_CPP_PUBLIC + void update_matched( + int32_t total_count, + int32_t total_count_change, + int32_t current_count, + int32_t current_count_change); + private: CustomSubscriberInfo * subscriber_info_ = nullptr; @@ -223,6 +230,12 @@ class RMWSubscriptionEvent final : public EventListenerInterface eprosima::fastdds::dds::RequestedIncompatibleQosStatus incompatible_qos_status_ RCPPUTILS_TSA_GUARDED_BY(on_new_event_m_); + bool matched_changes_ + RCPPUTILS_TSA_GUARDED_BY(on_new_event_m_); + + eprosima::fastdds::dds::SubscriptionMatchedStatus matched_status_ + RCPPUTILS_TSA_GUARDED_BY(on_new_event_m_); + std::set publishers_ RCPPUTILS_TSA_GUARDED_BY( publishers_mutex_); @@ -233,6 +246,8 @@ class RMWSubscriptionEvent final : public EventListenerInterface std::mutex on_new_message_m_; mutable std::mutex publishers_mutex_; + + void trigger_event(rmw_event_type_t event_type); }; #endif // RMW_FASTRTPS_SHARED_CPP__CUSTOM_SUBSCRIBER_INFO_HPP_ diff --git a/rmw_fastrtps_shared_cpp/src/custom_publisher_info.cpp b/rmw_fastrtps_shared_cpp/src/custom_publisher_info.cpp index fa0432858..b2563c52b 100644 --- a/rmw_fastrtps_shared_cpp/src/custom_publisher_info.cpp +++ b/rmw_fastrtps_shared_cpp/src/custom_publisher_info.cpp @@ -44,7 +44,15 @@ void CustomDataWriterListener::on_publication_matched( } else if (status.current_count_change == -1) { publisher_event_->untrack_unique_subscription( eprosima::fastrtps::rtps::iHandle2GUID(status.last_subscription_handle)); + } else { + return; } + + publisher_event_->update_matched( + status.total_count, + status.total_count_change, + status.current_count, + status.current_count_change); } @@ -82,7 +90,8 @@ RMWPublisherEvent::RMWPublisherEvent(CustomPublisherInfo * info) : publisher_info_(info), deadline_changed_(false), liveliness_changed_(false), - incompatible_qos_changed_(false) + incompatible_qos_changed_(false), + matched_changes_(false) { } @@ -158,6 +167,30 @@ bool RMWPublisherEvent::take_event( inconsistent_topic_status_.total_count_change = 0; } break; + case RMW_EVENT_PUBLICATION_MATCHED: + { + auto rmw_data = static_cast(event_info); + + if (matched_changes_) { + rmw_data->total_count = static_cast(matched_status_.total_count); + rmw_data->total_count_change = static_cast(matched_status_.total_count_change); + rmw_data->current_count = static_cast(matched_status_.current_count); + rmw_data->current_count_change = matched_status_.current_count_change; + matched_changes_ = false; + } else { + eprosima::fastdds::dds::PublicationMatchedStatus matched_status; + publisher_info_->data_writer_->get_publication_matched_status(matched_status); + + rmw_data->total_count = static_cast(matched_status.total_count); + rmw_data->total_count_change = static_cast(matched_status.total_count_change); + rmw_data->current_count = static_cast(matched_status.current_count); + rmw_data->current_count_change = matched_status.current_count_change; + } + + matched_status_.total_count_change = 0; + matched_status_.current_count_change = 0; + } + break; default: return false; } @@ -212,6 +245,16 @@ void RMWPublisherEvent::set_on_new_event_callback( inconsistent_topic_status_.total_count_change = 0; } break; + case RMW_EVENT_PUBLICATION_MATCHED: + { + if (matched_status_.total_count_change > 0) { + callback(user_data, matched_status_.total_count_change); + publisher_info_->data_writer_->get_publication_matched_status(matched_status_); + matched_status_.total_count_change = 0; + matched_status_.current_count_change = 0; + } + } + break; default: break; } @@ -307,6 +350,24 @@ void RMWPublisherEvent::update_inconsistent_topic(uint32_t total_count, uint32_t trigger_event(RMW_EVENT_PUBLISHER_INCOMPATIBLE_TYPE); } +void RMWPublisherEvent::update_matched( + int32_t total_count, + int32_t total_count_change, + int32_t current_count, + int32_t current_count_change) +{ + std::lock_guard lock(on_new_event_m_); + + matched_status_.total_count = total_count; + matched_status_.total_count_change += total_count_change; + matched_status_.current_count = current_count; + matched_status_.current_count_change += current_count_change; + + matched_changes_ = true; + + trigger_event(RMW_EVENT_PUBLICATION_MATCHED); +} + void RMWPublisherEvent::trigger_event(rmw_event_type_t event_type) { if (on_new_event_cb_[event_type]) { diff --git a/rmw_fastrtps_shared_cpp/src/custom_subscriber_info.cpp b/rmw_fastrtps_shared_cpp/src/custom_subscriber_info.cpp index 61450f348..284b1bb84 100644 --- a/rmw_fastrtps_shared_cpp/src/custom_subscriber_info.cpp +++ b/rmw_fastrtps_shared_cpp/src/custom_subscriber_info.cpp @@ -44,7 +44,15 @@ CustomDataReaderListener::on_subscription_matched( } else if (info.current_count_change == -1) { subscription_event_->untrack_unique_publisher( eprosima::fastrtps::rtps::iHandle2GUID(info.last_publication_handle)); + } else { + return; } + + subscription_event_->update_matched( + info.total_count, + info.total_count_change, + info.current_count, + info.current_count_change); } void @@ -102,7 +110,8 @@ RMWSubscriptionEvent::RMWSubscriptionEvent(CustomSubscriberInfo * info) deadline_changed_(false), liveliness_changed_(false), sample_lost_changed_(false), - incompatible_qos_changed_(false) + incompatible_qos_changed_(false), + matched_changes_(false) { } @@ -193,6 +202,29 @@ bool RMWSubscriptionEvent::take_event( inconsistent_topic_status_.total_count_change = 0; } break; + case RMW_EVENT_SUBSCRIPTION_MATCHED: + { + auto rmw_data = static_cast(event_info); + + if (matched_changes_) { + rmw_data->total_count = static_cast(matched_status_.total_count); + rmw_data->total_count_change = static_cast(matched_status_.total_count_change); + rmw_data->current_count = static_cast(matched_status_.current_count); + rmw_data->current_count_change = matched_status_.current_count_change; + matched_changes_ = false; + } else { + eprosima::fastdds::dds::SubscriptionMatchedStatus matched_status; + subscriber_info_->data_reader_->get_subscription_matched_status(matched_status); + + rmw_data->total_count = static_cast(matched_status.total_count); + rmw_data->total_count_change = static_cast(matched_status.total_count_change); + rmw_data->current_count = static_cast(matched_status.current_count); + rmw_data->current_count_change = matched_status.current_count_change; + } + matched_status_.total_count_change = 0; + matched_status_.current_count_change = 0; + } + break; default: return false; } @@ -270,6 +302,15 @@ void RMWSubscriptionEvent::set_on_new_event_callback( } } break; + case RMW_EVENT_SUBSCRIPTION_MATCHED: + { + if (matched_status_.total_count_change > 0) { + callback(user_data, matched_status_.total_count_change); + subscriber_info_->data_reader_->get_subscription_matched_status(matched_status_); + matched_status_.total_count_change = 0; + matched_status_.current_count_change = 0; + } + } default: break; } @@ -368,12 +409,7 @@ void RMWSubscriptionEvent::update_requested_deadline_missed( deadline_changed_ = true; - if (on_new_event_cb_[RMW_EVENT_REQUESTED_DEADLINE_MISSED]) { - on_new_event_cb_[RMW_EVENT_REQUESTED_DEADLINE_MISSED](user_data_[ - RMW_EVENT_REQUESTED_DEADLINE_MISSED], 1); - } - - event_guard[RMW_EVENT_REQUESTED_DEADLINE_MISSED].set_trigger_value(true); + trigger_event(RMW_EVENT_REQUESTED_DEADLINE_MISSED); } void RMWSubscriptionEvent::update_liveliness_changed( @@ -391,11 +427,7 @@ void RMWSubscriptionEvent::update_liveliness_changed( liveliness_changed_ = true; - if (on_new_event_cb_[RMW_EVENT_LIVELINESS_CHANGED]) { - on_new_event_cb_[RMW_EVENT_LIVELINESS_CHANGED](user_data_[RMW_EVENT_LIVELINESS_CHANGED], 1); - } - - event_guard[RMW_EVENT_LIVELINESS_CHANGED].set_trigger_value(true); + trigger_event(RMW_EVENT_LIVELINESS_CHANGED); } void RMWSubscriptionEvent::update_sample_lost(uint32_t total_count, uint32_t total_count_change) @@ -409,11 +441,7 @@ void RMWSubscriptionEvent::update_sample_lost(uint32_t total_count, uint32_t tot sample_lost_changed_ = true; - if (on_new_event_cb_[RMW_EVENT_MESSAGE_LOST]) { - on_new_event_cb_[RMW_EVENT_MESSAGE_LOST](user_data_[RMW_EVENT_MESSAGE_LOST], 1); - } - - event_guard[RMW_EVENT_MESSAGE_LOST].set_trigger_value(true); + trigger_event(RMW_EVENT_MESSAGE_LOST); } void RMWSubscriptionEvent::update_requested_incompatible_qos( @@ -430,12 +458,7 @@ void RMWSubscriptionEvent::update_requested_incompatible_qos( incompatible_qos_changed_ = true; - if (on_new_event_cb_[RMW_EVENT_REQUESTED_QOS_INCOMPATIBLE]) { - on_new_event_cb_[RMW_EVENT_REQUESTED_QOS_INCOMPATIBLE](user_data_[ - RMW_EVENT_REQUESTED_QOS_INCOMPATIBLE], 1); - } - - event_guard[RMW_EVENT_REQUESTED_QOS_INCOMPATIBLE].set_trigger_value(true); + trigger_event(RMW_EVENT_REQUESTED_QOS_INCOMPATIBLE); } void RMWSubscriptionEvent::update_inconsistent_topic( @@ -450,10 +473,32 @@ void RMWSubscriptionEvent::update_inconsistent_topic( inconsistent_topic_changed_ = true; - if (on_new_event_cb_[RMW_EVENT_SUBSCRIPTION_INCOMPATIBLE_TYPE]) { - on_new_event_cb_[RMW_EVENT_SUBSCRIPTION_INCOMPATIBLE_TYPE]( - user_data_[RMW_EVENT_SUBSCRIPTION_INCOMPATIBLE_TYPE], 1); + trigger_event(RMW_EVENT_SUBSCRIPTION_INCOMPATIBLE_TYPE); +} + +void RMWSubscriptionEvent::update_matched( + int32_t total_count, + int32_t total_count_change, + int32_t current_count, + int32_t current_count_change) +{ + std::lock_guard lock(on_new_event_m_); + + matched_status_.total_count = total_count; + matched_status_.total_count_change += total_count_change; + matched_status_.current_count = current_count; + matched_status_.current_count_change += current_count_change; + + matched_changes_ = true; + + trigger_event(RMW_EVENT_SUBSCRIPTION_MATCHED); +} + +void RMWSubscriptionEvent::trigger_event(rmw_event_type_t event_type) +{ + if (on_new_event_cb_[event_type]) { + on_new_event_cb_[event_type](user_data_[event_type], 1); } - event_guard[RMW_EVENT_SUBSCRIPTION_INCOMPATIBLE_TYPE].set_trigger_value(true); + event_guard[event_type].set_trigger_value(true); } diff --git a/rmw_fastrtps_shared_cpp/src/rmw_event.cpp b/rmw_fastrtps_shared_cpp/src/rmw_event.cpp index 545f4a5a0..0aa43a473 100644 --- a/rmw_fastrtps_shared_cpp/src/rmw_event.cpp +++ b/rmw_fastrtps_shared_cpp/src/rmw_event.cpp @@ -31,6 +31,8 @@ static const std::unordered_set g_rmw_event_type_set{ RMW_EVENT_REQUESTED_QOS_INCOMPATIBLE, RMW_EVENT_SUBSCRIPTION_INCOMPATIBLE_TYPE, RMW_EVENT_PUBLISHER_INCOMPATIBLE_TYPE, + RMW_EVENT_SUBSCRIPTION_MATCHED, + RMW_EVENT_PUBLICATION_MATCHED }; namespace rmw_fastrtps_shared_cpp @@ -70,6 +72,12 @@ eprosima::fastdds::dds::StatusMask rmw_event_to_dds_statusmask( case RMW_EVENT_PUBLISHER_INCOMPATIBLE_TYPE: ret_statusmask = eprosima::fastdds::dds::StatusMask::inconsistent_topic(); break; + case RMW_EVENT_SUBSCRIPTION_MATCHED: + ret_statusmask = eprosima::fastdds::dds::StatusMask::subscription_matched(); + break; + case RMW_EVENT_PUBLICATION_MATCHED: + ret_statusmask = eprosima::fastdds::dds::StatusMask::publication_matched(); + break; default: break; }