diff --git a/rmw_cyclonedds_cpp/src/rmw_node.cpp b/rmw_cyclonedds_cpp/src/rmw_node.cpp index b01846d8..04b65194 100644 --- a/rmw_cyclonedds_cpp/src/rmw_node.cpp +++ b/rmw_cyclonedds_cpp/src/rmw_node.cpp @@ -342,19 +342,9 @@ struct user_callback_data_t rmw_event_callback_t callback {nullptr}; const void * user_data {nullptr}; size_t unread_count {0}; - rmw_event_callback_t event_callback[RMW_EVENT_INVALID + 1] {nullptr}; - const void * event_data[RMW_EVENT_INVALID + 1] {nullptr}; - size_t event_unread_count[RMW_EVENT_INVALID + 1] {0}; - std::mutex status_mutex; - // Save matched status from last time status was read - rmw_matched_status_t saved_matched_status {0, 0}; - bool is_matched_status_saved {false}; - // Save unmatched status from last time status was read - rmw_unmatched_status_t saved_unmatched_status {0, 0}; - bool is_unmatched_status_saved {false}; - // Save the last 'current_count_change' of dds matched event status - // This value is used to determine whether it is matched event or unmatched event - int32_t last_dds_matched_event_current_count_change {0}; + rmw_event_callback_t event_callback[DDS_STATUS_ID_MAX + 1] {nullptr}; + const void * event_data[DDS_STATUS_ID_MAX + 1] {nullptr}; + size_t event_unread_count[DDS_STATUS_ID_MAX + 1] {0}; }; struct CddsPublisher : CddsEntity @@ -499,77 +489,21 @@ static void dds_listener_callback(dds_entity_t entity, void * arg) } } -static inline rmw_event_type_t dds_event_to_rmw_event( - dds_status_id_t rmw_event, - const void * status, - user_callback_data_t * data) -{ - switch (rmw_event) { - // subscription events - case DDS_LIVELINESS_CHANGED_STATUS_ID: - return RMW_EVENT_LIVELINESS_CHANGED; - case DDS_REQUESTED_DEADLINE_MISSED_STATUS_ID: - return RMW_EVENT_REQUESTED_DEADLINE_MISSED; - case DDS_REQUESTED_INCOMPATIBLE_QOS_STATUS_ID: - return RMW_EVENT_REQUESTED_QOS_INCOMPATIBLE; - case DDS_SAMPLE_LOST_STATUS_ID: - return RMW_EVENT_MESSAGE_LOST; - case DDS_SUBSCRIPTION_MATCHED_STATUS_ID: { - rmw_event_type_t ret; - const dds_subscription_matched_status_t * s = - static_cast(status); - - ret = (s->current_count_change > data->last_dds_matched_event_current_count_change) ? - RMW_EVENT_SUBSCRIPTION_MATCHED : - RMW_EVENT_SUBSCRIPTION_UNMATCHED; - data->last_dds_matched_event_current_count_change = s->current_count_change; - - return ret; - } - - // publisher events - case DDS_LIVELINESS_LOST_STATUS_ID: - return RMW_EVENT_LIVELINESS_LOST; - case DDS_OFFERED_DEADLINE_MISSED_STATUS_ID: - return RMW_EVENT_OFFERED_DEADLINE_MISSED; - case DDS_OFFERED_INCOMPATIBLE_QOS_STATUS_ID: - return RMW_EVENT_OFFERED_QOS_INCOMPATIBLE; - case DDS_PUBLICATION_MATCHED_STATUS_ID: { - rmw_event_type_t ret; - const dds_publication_matched_status_t * s = - static_cast(status); - - ret = (s->current_count_change > data->last_dds_matched_event_current_count_change) ? - RMW_EVENT_PUBLICATION_MATCHED : - RMW_EVENT_PUBLICATION_UNMATCHED; - data->last_dds_matched_event_current_count_change = s->current_count_change; - - return ret; - } - - default: - return RMW_EVENT_INVALID; - } -} - #define MAKE_DDS_EVENT_CALLBACK_FN(event_type, EVENT_TYPE) \ static void on_ ## event_type ## _fn( \ dds_entity_t entity, \ const dds_ ## event_type ## _status_t status, \ void * arg) \ { \ + (void)status; \ (void)entity; \ auto data = static_cast(arg); \ std::lock_guard guard(data->mutex); \ - rmw_event_type_t event = dds_event_to_rmw_event( \ - DDS_ ## EVENT_TYPE ## _STATUS_ID, \ - static_cast(&status), \ - data); \ - auto cb = data->event_callback[event]; \ + auto cb = data->event_callback[DDS_ ## EVENT_TYPE ## _STATUS_ID]; \ if (cb) { \ - cb(data->event_data[event], 1); \ + cb(data->event_data[DDS_ ## EVENT_TYPE ## _STATUS_ID], 1); \ } else { \ - data->event_unread_count[event]++; \ + data->event_unread_count[DDS_ ## EVENT_TYPE ## _STATUS_ID]++; \ } \ } @@ -655,6 +589,7 @@ extern "C" rmw_ret_t rmw_service_set_on_new_request_callback( user_callback_data_t * data = &(srv->user_callback_data); std::lock_guard guard(data->mutex); + // Set the user callback data data->callback = callback; data->user_data = user_data; @@ -693,53 +628,121 @@ extern "C" rmw_ret_t rmw_client_set_on_new_response_callback( return RMW_RET_OK; } +template +static void event_set_callback( + T event, + dds_status_id_t status_id, + rmw_event_callback_t callback, + const void * user_data) +{ + user_callback_data_t * data = &(event->user_callback_data); + + std::lock_guard guard(data->mutex); + + // Set the user callback data + data->event_callback[status_id] = callback; + data->event_data[status_id] = user_data; + + if (callback && data->event_unread_count[status_id]) { + // Push events happened before having assigned a callback + callback(user_data, data->event_unread_count[status_id]); + data->event_unread_count[status_id] = 0; + } +} + extern "C" rmw_ret_t rmw_event_set_callback( rmw_event_t * rmw_event, rmw_event_callback_t callback, const void * user_data) { - CddsSubscription * sub_event; - CddsPublisher * pub_event; - user_callback_data_t * data; RMW_CHECK_ARGUMENT_FOR_NULL(rmw_event, RMW_RET_INVALID_ARGUMENT); - switch (rmw_event->event_type) { case RMW_EVENT_LIVELINESS_CHANGED: + { + auto sub_event = static_cast(rmw_event->data); + event_set_callback( + sub_event, DDS_LIVELINESS_CHANGED_STATUS_ID, + callback, user_data); + break; + } + case RMW_EVENT_REQUESTED_DEADLINE_MISSED: + { + auto sub_event = static_cast(rmw_event->data); + event_set_callback( + sub_event, DDS_REQUESTED_DEADLINE_MISSED_STATUS_ID, + callback, user_data); + break; + } + case RMW_EVENT_REQUESTED_QOS_INCOMPATIBLE: + { + auto sub_event = static_cast(rmw_event->data); + event_set_callback( + sub_event, DDS_REQUESTED_INCOMPATIBLE_QOS_STATUS_ID, + callback, user_data); + break; + } + case RMW_EVENT_MESSAGE_LOST: + { + auto sub_event = static_cast(rmw_event->data); + event_set_callback( + sub_event, DDS_SAMPLE_LOST_STATUS_ID, + callback, user_data); + break; + } + case RMW_EVENT_SUBSCRIPTION_MATCHED: - case RMW_EVENT_SUBSCRIPTION_UNMATCHED: { - sub_event = static_cast(rmw_event->data); - data = &sub_event->user_callback_data; + auto sub_event = static_cast(rmw_event->data); + event_set_callback( + sub_event, DDS_SUBSCRIPTION_MATCHED_STATUS_ID, + callback, user_data); break; } + case RMW_EVENT_LIVELINESS_LOST: + { + auto pub_event = static_cast(rmw_event->data); + event_set_callback( + pub_event, DDS_LIVELINESS_LOST_STATUS_ID, + callback, user_data); + break; + } + case RMW_EVENT_OFFERED_DEADLINE_MISSED: + { + auto pub_event = static_cast(rmw_event->data); + event_set_callback( + pub_event, DDS_OFFERED_DEADLINE_MISSED_STATUS_ID, + callback, user_data); + break; + } + case RMW_EVENT_OFFERED_QOS_INCOMPATIBLE: + { + auto pub_event = static_cast(rmw_event->data); + event_set_callback( + pub_event, DDS_OFFERED_INCOMPATIBLE_QOS_STATUS_ID, + callback, user_data); + break; + } + case RMW_EVENT_PUBLICATION_MATCHED: - case RMW_EVENT_PUBLICATION_UNMATCHED: { - pub_event = static_cast(rmw_event->data); - data = &pub_event->user_callback_data; + auto pub_event = static_cast(rmw_event->data); + event_set_callback( + pub_event, DDS_PUBLICATION_MATCHED_STATUS_ID, + callback, user_data); break; } + case RMW_EVENT_INVALID: { return RMW_RET_INVALID_ARGUMENT; } } - - // Set the user callback data - data->event_callback[rmw_event->event_type] = callback; - data->event_data[rmw_event->event_type] = user_data; - - if (callback && data->event_unread_count[rmw_event->event_type]) { - // Push events happened before having assigned a callback - callback(user_data, data->event_unread_count[rmw_event->event_type]); - data->event_unread_count[rmw_event->event_type] = 0; - } return RMW_RET_OK; } @@ -3647,9 +3650,7 @@ static const std::unordered_map mask_map{ {RMW_EVENT_OFFERED_QOS_INCOMPATIBLE, DDS_OFFERED_INCOMPATIBLE_QOS_STATUS}, {RMW_EVENT_MESSAGE_LOST, DDS_SAMPLE_LOST_STATUS}, {RMW_EVENT_SUBSCRIPTION_MATCHED, DDS_SUBSCRIPTION_MATCHED_STATUS}, - {RMW_EVENT_SUBSCRIPTION_UNMATCHED, DDS_SUBSCRIPTION_MATCHED_STATUS}, - {RMW_EVENT_PUBLICATION_MATCHED, DDS_PUBLICATION_MATCHED_STATUS}, - {RMW_EVENT_PUBLICATION_UNMATCHED, DDS_PUBLICATION_MATCHED_STATUS} + {RMW_EVENT_PUBLICATION_MATCHED, DDS_PUBLICATION_MATCHED_STATUS} }; static bool is_event_supported(const rmw_event_type_t event_t) @@ -3703,53 +3704,6 @@ extern "C" rmw_ret_t rmw_subscription_event_init( event_type); } -template -inline void take_matched_event( - DDS_MATCHED_STATUS & st, rmw_matched_status_t * ei, DDS_ENTITY_TYPE * entity) -{ - std::lock_guard lock(entity->user_callback_data.status_mutex); - ei->current_matched_count = st.current_count; - - if (entity->user_callback_data.is_matched_status_saved) { - ei->current_count_change = st.total_count_change + - entity->user_callback_data.saved_matched_status.current_count_change; - - entity->user_callback_data.is_matched_status_saved = false; - entity->user_callback_data.saved_matched_status.current_count_change = 0; - } else { - ei->current_count_change = st.total_count_change; - } - - // Save status for unmatched event since matched/unmatched depend on the same dds status - entity->user_callback_data.saved_unmatched_status.current_count_change += - st.total_count_change - st.current_count_change; - entity->user_callback_data.is_unmatched_status_saved = true; -} - -template -inline void take_unmatched_event( - DDS_MATCHED_STATUS & st, rmw_unmatched_status_t * ei, DDS_ENTITY_TYPE * entity) -{ - std::lock_guard lock(entity->user_callback_data.status_mutex); - ei->current_matched_count = st.current_count; - - if (entity->user_callback_data.is_unmatched_status_saved) { - ei->current_count_change = st.total_count_change - st.current_count_change + - entity->user_callback_data.saved_unmatched_status.current_count_change; - - entity->user_callback_data.is_unmatched_status_saved = false; - entity->user_callback_data.saved_unmatched_status.current_count_change = 0; - } else { - ei->current_count_change = st.total_count_change - st.current_count_change; - } - - // Save status for matched event since matched/unmatched depend on the same dds status - entity->user_callback_data.saved_matched_status.current_count_change += - st.total_count_change; - - entity->user_callback_data.is_matched_status_saved = true; -} - extern "C" rmw_ret_t rmw_take_event( const rmw_event_t * event_handle, void * event_info, bool * taken) @@ -3831,25 +3785,10 @@ extern "C" rmw_ret_t rmw_take_event( *taken = false; return RMW_RET_ERROR; } - // dds_get_subscription_matched_status() clear 'current_count_change' of internal dds - // matched status. So the variable that retain the last value also need to be cleaned up. - sub->user_callback_data.last_dds_matched_event_current_count_change = 0; - take_matched_event(st, ei, sub); - *taken = true; - return RMW_RET_OK; - } - case RMW_EVENT_SUBSCRIPTION_UNMATCHED: { - auto ei = static_cast(event_info); - auto sub = static_cast(event_handle->data); - - dds_subscription_matched_status_t st; - if (dds_get_subscription_matched_status(sub->enth, &st) < 0) { - *taken = false; - return RMW_RET_ERROR; - } - // dds_get_subscription_matched_status() clear 'current_count_change' of internal dds - // matched status. So the variable that retain the last value also need to be cleaned up. - take_unmatched_event(st, ei, sub); + ei->total_count = static_cast(st.total_count); + ei->total_count_change = static_cast(st.total_count_change); + ei->current_count = static_cast(st.current_count); + ei->current_count_change = static_cast(st.current_count_change); *taken = true; return RMW_RET_OK; } @@ -3910,26 +3849,10 @@ extern "C" rmw_ret_t rmw_take_event( *taken = false; return RMW_RET_ERROR; } - // dds_get_publication_matched_status() clear 'current_count_change' of internal dds - // matched status. So the variable that retain the last value also need to be cleaned up. - pub->user_callback_data.last_dds_matched_event_current_count_change = 0; - take_matched_event(st, ei, pub); - *taken = true; - return RMW_RET_OK; - } - case RMW_EVENT_PUBLICATION_UNMATCHED: { - auto ei = static_cast(event_info); - auto pub = static_cast(event_handle->data); - - dds_publication_matched_status st; - if (dds_get_publication_matched_status(pub->enth, &st) < 0) { - *taken = false; - return RMW_RET_ERROR; - } - // dds_get_publication_matched_status() clear 'current_count_change' of internal dds - // matched status. So the variable that retain the last value also need to be cleaned up. - pub->user_callback_data.last_dds_matched_event_current_count_change = 0; - take_unmatched_event(st, ei, pub); + ei->total_count = static_cast(st.total_count); + ei->total_count_change = static_cast(st.total_count_change); + ei->current_count = static_cast(st.current_count); + ei->current_count_change = static_cast(st.current_count_change); *taken = true; return RMW_RET_OK; }