Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix matched event issues #683

Merged
merged 8 commits into from
Apr 12, 2023
29 changes: 15 additions & 14 deletions rmw_fastrtps_shared_cpp/src/custom_publisher_info.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -171,20 +171,18 @@ bool RMWPublisherEvent::take_event(
{
auto rmw_data = static_cast<rmw_matched_status_t *>(event_info);

eprosima::fastdds::dds::PublicationMatchedStatus matched_status;
publisher_info_->data_writer_->get_publication_matched_status(matched_status);

rmw_data->total_count = static_cast<size_t>(matched_status.total_count);
rmw_data->current_count = static_cast<size_t>(matched_status.current_count);
rmw_data->total_count_change = static_cast<size_t>(matched_status.total_count_change);
rmw_data->current_count_change = matched_status.current_count_change;

if (matched_changes_) {
rmw_data->total_count = static_cast<size_t>(matched_status_.total_count);
rmw_data->total_count_change = static_cast<size_t>(matched_status_.total_count_change);
rmw_data->current_count = static_cast<size_t>(matched_status_.current_count);
rmw_data->current_count_change = matched_status_.current_count_change;
rmw_data->total_count_change += static_cast<size_t>(matched_status_.total_count_change);
rmw_data->current_count_change += matched_status_.current_count_change;
matched_changes_ = false;
} else {
eprosima::fastdds::dds::PublicationMatchedStatus matched_status;
publisher_info_->data_writer_->get_publication_matched_status(matched_status);

rmw_data->total_count = static_cast<size_t>(matched_status.total_count);
rmw_data->total_count_change = static_cast<size_t>(matched_status.total_count_change);
rmw_data->current_count = static_cast<size_t>(matched_status.current_count);
rmw_data->current_count_change = matched_status.current_count_change;
}

matched_status_.total_count_change = 0;
Expand Down Expand Up @@ -267,7 +265,11 @@ void RMWPublisherEvent::set_on_new_event_callback(
user_data_[event_type] = nullptr;
on_new_event_cb_[event_type] = nullptr;

status_mask &= ~rmw_fastrtps_shared_cpp::internal::rmw_event_to_dds_statusmask(event_type);
// publication_matched status should be kept enabled, since we need to
// keep tracking matched subscriptions
if (RMW_EVENT_PUBLICATION_MATCHED != event_type) {
status_mask &= ~rmw_fastrtps_shared_cpp::internal::rmw_event_to_dds_statusmask(event_type);
}
}

publisher_info_->data_writer_->set_listener(publisher_info_->data_writer_listener_, status_mask);
Expand Down Expand Up @@ -362,7 +364,6 @@ void RMWPublisherEvent::update_matched(
matched_status_.total_count_change += total_count_change;
matched_status_.current_count = current_count;
matched_status_.current_count_change += current_count_change;

matched_changes_ = true;

trigger_event(RMW_EVENT_PUBLICATION_MATCHED);
Expand Down
30 changes: 16 additions & 14 deletions rmw_fastrtps_shared_cpp/src/custom_subscriber_info.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -206,21 +206,20 @@ bool RMWSubscriptionEvent::take_event(
{
auto rmw_data = static_cast<rmw_matched_status_t *>(event_info);

eprosima::fastdds::dds::SubscriptionMatchedStatus matched_status;
subscriber_info_->data_reader_->get_subscription_matched_status(matched_status);

rmw_data->total_count = static_cast<size_t>(matched_status.total_count);
rmw_data->total_count_change = static_cast<size_t>(matched_status.total_count_change);
rmw_data->current_count = static_cast<size_t>(matched_status.current_count);
rmw_data->current_count_change = matched_status.current_count_change;

if (matched_changes_) {
rmw_data->total_count = static_cast<size_t>(matched_status_.total_count);
rmw_data->total_count_change = static_cast<size_t>(matched_status_.total_count_change);
rmw_data->current_count = static_cast<size_t>(matched_status_.current_count);
rmw_data->current_count_change = matched_status_.current_count_change;
rmw_data->total_count_change += static_cast<size_t>(matched_status_.total_count_change);
rmw_data->current_count_change += matched_status_.current_count_change;
matched_changes_ = false;
} else {
eprosima::fastdds::dds::SubscriptionMatchedStatus matched_status;
subscriber_info_->data_reader_->get_subscription_matched_status(matched_status);

rmw_data->total_count = static_cast<size_t>(matched_status.total_count);
rmw_data->total_count_change = static_cast<size_t>(matched_status.total_count_change);
rmw_data->current_count = static_cast<size_t>(matched_status.current_count);
rmw_data->current_count_change = matched_status.current_count_change;
}

matched_status_.total_count_change = 0;
matched_status_.current_count_change = 0;
}
Expand Down Expand Up @@ -323,7 +322,11 @@ void RMWSubscriptionEvent::set_on_new_event_callback(
user_data_[event_type] = nullptr;
on_new_event_cb_[event_type] = nullptr;

status_mask &= ~rmw_fastrtps_shared_cpp::internal::rmw_event_to_dds_statusmask(event_type);
// subscription_matched status should be kept enabled, since we need to
// keep tracking matched publications
if (RMW_EVENT_SUBSCRIPTION_MATCHED != event_type) {
status_mask &= ~rmw_fastrtps_shared_cpp::internal::rmw_event_to_dds_statusmask(event_type);
}
}

subscriber_info_->data_reader_->set_listener(
Expand Down Expand Up @@ -488,7 +491,6 @@ void RMWSubscriptionEvent::update_matched(
matched_status_.total_count_change += total_count_change;
matched_status_.current_count = current_count;
matched_status_.current_count_change += current_count_change;

matched_changes_ = true;

trigger_event(RMW_EVENT_SUBSCRIPTION_MATCHED);
Expand Down