Skip to content

Commit

Permalink
Updated to only support rmw matched event
Browse files Browse the repository at this point in the history
Signed-off-by: Barry Xu <barry.xu@sony.com>
  • Loading branch information
Barry-Xu-2018 committed Mar 15, 2023
1 parent 3326aa7 commit 054bca7
Showing 1 changed file with 105 additions and 182 deletions.
287 changes: 105 additions & 182 deletions rmw_cyclonedds_cpp/src/rmw_node.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -342,19 +342,9 @@ struct user_callback_data_t
rmw_event_callback_t callback {nullptr};
const void * user_data {nullptr};
size_t unread_count {0};
rmw_event_callback_t event_callback[RMW_EVENT_INVALID + 1] {nullptr};
const void * event_data[RMW_EVENT_INVALID + 1] {nullptr};
size_t event_unread_count[RMW_EVENT_INVALID + 1] {0};
std::mutex status_mutex;
// Save matched status from last time status was read
rmw_matched_status_t saved_matched_status {0, 0};
bool is_matched_status_saved {false};
// Save unmatched status from last time status was read
rmw_unmatched_status_t saved_unmatched_status {0, 0};
bool is_unmatched_status_saved {false};
// Save the last 'current_count_change' of dds matched event status
// This value is used to determine whether it is matched event or unmatched event
int32_t last_dds_matched_event_current_count_change {0};
rmw_event_callback_t event_callback[DDS_STATUS_ID_MAX + 1] {nullptr};
const void * event_data[DDS_STATUS_ID_MAX + 1] {nullptr};
size_t event_unread_count[DDS_STATUS_ID_MAX + 1] {0};
};

struct CddsPublisher : CddsEntity
Expand Down Expand Up @@ -499,77 +489,21 @@ static void dds_listener_callback(dds_entity_t entity, void * arg)
}
}

static inline rmw_event_type_t dds_event_to_rmw_event(
dds_status_id_t rmw_event,
const void * status,
user_callback_data_t * data)
{
switch (rmw_event) {
// subscription events
case DDS_LIVELINESS_CHANGED_STATUS_ID:
return RMW_EVENT_LIVELINESS_CHANGED;
case DDS_REQUESTED_DEADLINE_MISSED_STATUS_ID:
return RMW_EVENT_REQUESTED_DEADLINE_MISSED;
case DDS_REQUESTED_INCOMPATIBLE_QOS_STATUS_ID:
return RMW_EVENT_REQUESTED_QOS_INCOMPATIBLE;
case DDS_SAMPLE_LOST_STATUS_ID:
return RMW_EVENT_MESSAGE_LOST;
case DDS_SUBSCRIPTION_MATCHED_STATUS_ID: {
rmw_event_type_t ret;
const dds_subscription_matched_status_t * s =
static_cast<const dds_subscription_matched_status_t *>(status);

ret = (s->current_count_change > data->last_dds_matched_event_current_count_change) ?
RMW_EVENT_SUBSCRIPTION_MATCHED :
RMW_EVENT_SUBSCRIPTION_UNMATCHED;
data->last_dds_matched_event_current_count_change = s->current_count_change;

return ret;
}

// publisher events
case DDS_LIVELINESS_LOST_STATUS_ID:
return RMW_EVENT_LIVELINESS_LOST;
case DDS_OFFERED_DEADLINE_MISSED_STATUS_ID:
return RMW_EVENT_OFFERED_DEADLINE_MISSED;
case DDS_OFFERED_INCOMPATIBLE_QOS_STATUS_ID:
return RMW_EVENT_OFFERED_QOS_INCOMPATIBLE;
case DDS_PUBLICATION_MATCHED_STATUS_ID: {
rmw_event_type_t ret;
const dds_publication_matched_status_t * s =
static_cast<const dds_publication_matched_status_t *>(status);

ret = (s->current_count_change > data->last_dds_matched_event_current_count_change) ?
RMW_EVENT_PUBLICATION_MATCHED :
RMW_EVENT_PUBLICATION_UNMATCHED;
data->last_dds_matched_event_current_count_change = s->current_count_change;

return ret;
}

default:
return RMW_EVENT_INVALID;
}
}

#define MAKE_DDS_EVENT_CALLBACK_FN(event_type, EVENT_TYPE) \
static void on_ ## event_type ## _fn( \
dds_entity_t entity, \
const dds_ ## event_type ## _status_t status, \
void * arg) \
{ \
(void)status; \
(void)entity; \
auto data = static_cast<user_callback_data_t *>(arg); \
std::lock_guard<std::mutex> guard(data->mutex); \
rmw_event_type_t event = dds_event_to_rmw_event( \
DDS_ ## EVENT_TYPE ## _STATUS_ID, \
static_cast<const void *>(&status), \
data); \
auto cb = data->event_callback[event]; \
auto cb = data->event_callback[DDS_ ## EVENT_TYPE ## _STATUS_ID]; \
if (cb) { \
cb(data->event_data[event], 1); \
cb(data->event_data[DDS_ ## EVENT_TYPE ## _STATUS_ID], 1); \
} else { \
data->event_unread_count[event]++; \
data->event_unread_count[DDS_ ## EVENT_TYPE ## _STATUS_ID]++; \
} \
}

Expand Down Expand Up @@ -655,6 +589,7 @@ extern "C" rmw_ret_t rmw_service_set_on_new_request_callback(
user_callback_data_t * data = &(srv->user_callback_data);

std::lock_guard<std::mutex> guard(data->mutex);

// Set the user callback data
data->callback = callback;
data->user_data = user_data;
Expand Down Expand Up @@ -693,53 +628,121 @@ extern "C" rmw_ret_t rmw_client_set_on_new_response_callback(
return RMW_RET_OK;
}

template<typename T>
static void event_set_callback(
T event,
dds_status_id_t status_id,
rmw_event_callback_t callback,
const void * user_data)
{
user_callback_data_t * data = &(event->user_callback_data);

std::lock_guard<std::mutex> guard(data->mutex);

// Set the user callback data
data->event_callback[status_id] = callback;
data->event_data[status_id] = user_data;

if (callback && data->event_unread_count[status_id]) {
// Push events happened before having assigned a callback
callback(user_data, data->event_unread_count[status_id]);
data->event_unread_count[status_id] = 0;
}
}

extern "C" rmw_ret_t rmw_event_set_callback(
rmw_event_t * rmw_event,
rmw_event_callback_t callback,
const void * user_data)
{
CddsSubscription * sub_event;
CddsPublisher * pub_event;
user_callback_data_t * data;
RMW_CHECK_ARGUMENT_FOR_NULL(rmw_event, RMW_RET_INVALID_ARGUMENT);

switch (rmw_event->event_type) {
case RMW_EVENT_LIVELINESS_CHANGED:
{
auto sub_event = static_cast<CddsSubscription *>(rmw_event->data);
event_set_callback(
sub_event, DDS_LIVELINESS_CHANGED_STATUS_ID,
callback, user_data);
break;
}

case RMW_EVENT_REQUESTED_DEADLINE_MISSED:
{
auto sub_event = static_cast<CddsSubscription *>(rmw_event->data);
event_set_callback(
sub_event, DDS_REQUESTED_DEADLINE_MISSED_STATUS_ID,
callback, user_data);
break;
}

case RMW_EVENT_REQUESTED_QOS_INCOMPATIBLE:
{
auto sub_event = static_cast<CddsSubscription *>(rmw_event->data);
event_set_callback(
sub_event, DDS_REQUESTED_INCOMPATIBLE_QOS_STATUS_ID,
callback, user_data);
break;
}

case RMW_EVENT_MESSAGE_LOST:
{
auto sub_event = static_cast<CddsSubscription *>(rmw_event->data);
event_set_callback(
sub_event, DDS_SAMPLE_LOST_STATUS_ID,
callback, user_data);
break;
}

case RMW_EVENT_SUBSCRIPTION_MATCHED:
case RMW_EVENT_SUBSCRIPTION_UNMATCHED:
{
sub_event = static_cast<CddsSubscription *>(rmw_event->data);
data = &sub_event->user_callback_data;
auto sub_event = static_cast<CddsSubscription *>(rmw_event->data);
event_set_callback(
sub_event, DDS_SUBSCRIPTION_MATCHED_STATUS_ID,
callback, user_data);
break;
}

case RMW_EVENT_LIVELINESS_LOST:
{
auto pub_event = static_cast<CddsPublisher *>(rmw_event->data);
event_set_callback(
pub_event, DDS_LIVELINESS_LOST_STATUS_ID,
callback, user_data);
break;
}

case RMW_EVENT_OFFERED_DEADLINE_MISSED:
{
auto pub_event = static_cast<CddsPublisher *>(rmw_event->data);
event_set_callback(
pub_event, DDS_OFFERED_DEADLINE_MISSED_STATUS_ID,
callback, user_data);
break;
}

case RMW_EVENT_OFFERED_QOS_INCOMPATIBLE:
{
auto pub_event = static_cast<CddsPublisher *>(rmw_event->data);
event_set_callback(
pub_event, DDS_OFFERED_INCOMPATIBLE_QOS_STATUS_ID,
callback, user_data);
break;
}

case RMW_EVENT_PUBLICATION_MATCHED:
case RMW_EVENT_PUBLICATION_UNMATCHED:
{
pub_event = static_cast<CddsPublisher *>(rmw_event->data);
data = &pub_event->user_callback_data;
auto pub_event = static_cast<CddsPublisher *>(rmw_event->data);
event_set_callback(
pub_event, DDS_PUBLICATION_MATCHED_STATUS_ID,
callback, user_data);
break;
}

case RMW_EVENT_INVALID:
{
return RMW_RET_INVALID_ARGUMENT;
}
}

// Set the user callback data
data->event_callback[rmw_event->event_type] = callback;
data->event_data[rmw_event->event_type] = user_data;

if (callback && data->event_unread_count[rmw_event->event_type]) {
// Push events happened before having assigned a callback
callback(user_data, data->event_unread_count[rmw_event->event_type]);
data->event_unread_count[rmw_event->event_type] = 0;
}
return RMW_RET_OK;
}

Expand Down Expand Up @@ -3647,9 +3650,7 @@ static const std::unordered_map<rmw_event_type_t, uint32_t> mask_map{
{RMW_EVENT_OFFERED_QOS_INCOMPATIBLE, DDS_OFFERED_INCOMPATIBLE_QOS_STATUS},
{RMW_EVENT_MESSAGE_LOST, DDS_SAMPLE_LOST_STATUS},
{RMW_EVENT_SUBSCRIPTION_MATCHED, DDS_SUBSCRIPTION_MATCHED_STATUS},
{RMW_EVENT_SUBSCRIPTION_UNMATCHED, DDS_SUBSCRIPTION_MATCHED_STATUS},
{RMW_EVENT_PUBLICATION_MATCHED, DDS_PUBLICATION_MATCHED_STATUS},
{RMW_EVENT_PUBLICATION_UNMATCHED, DDS_PUBLICATION_MATCHED_STATUS}
{RMW_EVENT_PUBLICATION_MATCHED, DDS_PUBLICATION_MATCHED_STATUS}
};

static bool is_event_supported(const rmw_event_type_t event_t)
Expand Down Expand Up @@ -3703,53 +3704,6 @@ extern "C" rmw_ret_t rmw_subscription_event_init(
event_type);
}

template<typename DDS_MATCHED_STATUS, typename DDS_ENTITY_TYPE>
inline void take_matched_event(
DDS_MATCHED_STATUS & st, rmw_matched_status_t * ei, DDS_ENTITY_TYPE * entity)
{
std::lock_guard<std::mutex> lock(entity->user_callback_data.status_mutex);
ei->current_matched_count = st.current_count;

if (entity->user_callback_data.is_matched_status_saved) {
ei->current_count_change = st.total_count_change +
entity->user_callback_data.saved_matched_status.current_count_change;

entity->user_callback_data.is_matched_status_saved = false;
entity->user_callback_data.saved_matched_status.current_count_change = 0;
} else {
ei->current_count_change = st.total_count_change;
}

// Save status for unmatched event since matched/unmatched depend on the same dds status
entity->user_callback_data.saved_unmatched_status.current_count_change +=
st.total_count_change - st.current_count_change;
entity->user_callback_data.is_unmatched_status_saved = true;
}

template<typename DDS_MATCHED_STATUS, typename DDS_ENTITY_TYPE>
inline void take_unmatched_event(
DDS_MATCHED_STATUS & st, rmw_unmatched_status_t * ei, DDS_ENTITY_TYPE * entity)
{
std::lock_guard<std::mutex> lock(entity->user_callback_data.status_mutex);
ei->current_matched_count = st.current_count;

if (entity->user_callback_data.is_unmatched_status_saved) {
ei->current_count_change = st.total_count_change - st.current_count_change +
entity->user_callback_data.saved_unmatched_status.current_count_change;

entity->user_callback_data.is_unmatched_status_saved = false;
entity->user_callback_data.saved_unmatched_status.current_count_change = 0;
} else {
ei->current_count_change = st.total_count_change - st.current_count_change;
}

// Save status for matched event since matched/unmatched depend on the same dds status
entity->user_callback_data.saved_matched_status.current_count_change +=
st.total_count_change;

entity->user_callback_data.is_matched_status_saved = true;
}

extern "C" rmw_ret_t rmw_take_event(
const rmw_event_t * event_handle, void * event_info,
bool * taken)
Expand Down Expand Up @@ -3831,25 +3785,10 @@ extern "C" rmw_ret_t rmw_take_event(
*taken = false;
return RMW_RET_ERROR;
}
// dds_get_subscription_matched_status() clear 'current_count_change' of internal dds
// matched status. So the variable that retain the last value also need to be cleaned up.
sub->user_callback_data.last_dds_matched_event_current_count_change = 0;
take_matched_event(st, ei, sub);
*taken = true;
return RMW_RET_OK;
}
case RMW_EVENT_SUBSCRIPTION_UNMATCHED: {
auto ei = static_cast<rmw_unmatched_status_t *>(event_info);
auto sub = static_cast<CddsSubscription *>(event_handle->data);

dds_subscription_matched_status_t st;
if (dds_get_subscription_matched_status(sub->enth, &st) < 0) {
*taken = false;
return RMW_RET_ERROR;
}
// dds_get_subscription_matched_status() clear 'current_count_change' of internal dds
// matched status. So the variable that retain the last value also need to be cleaned up.
take_unmatched_event(st, ei, sub);
ei->total_count = static_cast<size_t>(st.total_count);
ei->total_count_change = static_cast<size_t>(st.total_count_change);
ei->current_count = static_cast<size_t>(st.current_count);
ei->current_count_change = static_cast<size_t>(st.current_count_change);
*taken = true;
return RMW_RET_OK;
}
Expand Down Expand Up @@ -3910,26 +3849,10 @@ extern "C" rmw_ret_t rmw_take_event(
*taken = false;
return RMW_RET_ERROR;
}
// dds_get_publication_matched_status() clear 'current_count_change' of internal dds
// matched status. So the variable that retain the last value also need to be cleaned up.
pub->user_callback_data.last_dds_matched_event_current_count_change = 0;
take_matched_event(st, ei, pub);
*taken = true;
return RMW_RET_OK;
}
case RMW_EVENT_PUBLICATION_UNMATCHED: {
auto ei = static_cast<rmw_unmatched_status_t *>(event_info);
auto pub = static_cast<CddsPublisher *>(event_handle->data);

dds_publication_matched_status st;
if (dds_get_publication_matched_status(pub->enth, &st) < 0) {
*taken = false;
return RMW_RET_ERROR;
}
// dds_get_publication_matched_status() clear 'current_count_change' of internal dds
// matched status. So the variable that retain the last value also need to be cleaned up.
pub->user_callback_data.last_dds_matched_event_current_count_change = 0;
take_unmatched_event(st, ei, pub);
ei->total_count = static_cast<size_t>(st.total_count);
ei->total_count_change = static_cast<size_t>(st.total_count_change);
ei->current_count = static_cast<size_t>(st.current_count);
ei->current_count_change = static_cast<size_t>(st.current_count_change);
*taken = true;
return RMW_RET_OK;
}
Expand Down

0 comments on commit 054bca7

Please sign in to comment.