Skip to content

Commit

Permalink
Add matched/unmatched event support
Browse files Browse the repository at this point in the history
Signed-off-by: Barry Xu <barry.xu@sony.com>
  • Loading branch information
Barry-Xu-2018 committed Feb 15, 2023
1 parent 4bec4c8 commit 984ed63
Showing 1 changed file with 229 additions and 79 deletions.
308 changes: 229 additions & 79 deletions rmw_cyclonedds_cpp/src/rmw_node.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -342,9 +342,18 @@ struct user_callback_data_t
rmw_event_callback_t callback {nullptr};
const void * user_data {nullptr};
size_t unread_count {0};
rmw_event_callback_t event_callback[DDS_STATUS_ID_MAX + 1] {nullptr};
const void * event_data[DDS_STATUS_ID_MAX + 1] {nullptr};
size_t event_unread_count[DDS_STATUS_ID_MAX + 1] {0};
rmw_event_callback_t event_callback[RMW_EVENT_INVALID + 1] {nullptr};
const void * event_data[RMW_EVENT_INVALID + 1] {nullptr};
size_t event_unread_count[RMW_EVENT_INVALID + 1] {0};
std::mutex status_mutex;
// Save matched status from last time status was read
rmw_matched_status_t saved_matched_status {0, 0};
bool is_matched_status_saved {false};
// Save unmatched status from last time status was read
rmw_unmatched_status_t saved_unmatched_status {0, 0};
bool is_unmatched_status_saved {false};
// Save previous matched status to judge matched or unmatched.
uint32_t pre_matched_event_current_count{0};
};

struct CddsPublisher : CddsEntity
Expand Down Expand Up @@ -489,21 +498,79 @@ static void dds_listener_callback(dds_entity_t entity, void * arg)
}
}

static inline rmw_event_type_t dds_event_to_rmw_event(
dds_status_id_t rmw_event,
const void * status,
user_callback_data_t * data)
{
switch (rmw_event) {
// subscription events
case DDS_LIVELINESS_CHANGED_STATUS_ID:
return RMW_EVENT_LIVELINESS_CHANGED;
case DDS_REQUESTED_DEADLINE_MISSED_STATUS_ID:
return RMW_EVENT_REQUESTED_DEADLINE_MISSED;
case DDS_REQUESTED_INCOMPATIBLE_QOS_STATUS_ID:
return RMW_EVENT_REQUESTED_QOS_INCOMPATIBLE;
case DDS_SAMPLE_LOST_STATUS_ID:
return RMW_EVENT_MESSAGE_LOST;
case DDS_SUBSCRIPTION_MATCHED_STATUS_ID: {
rmw_event_type_t ret;
const dds_subscription_matched_status_t * s =
static_cast<const dds_subscription_matched_status_t *>(status);

ret = (s->current_count_change < 0) ?
RMW_EVENT_SUBSCRIPTION_UNMATCHED :
RMW_EVENT_SUBSCRIPTION_MATCHED;

data->pre_matched_event_current_count =
static_cast<const dds_subscription_matched_status_t *>(status)->current_count;
return ret;
}

// publisher events
case DDS_LIVELINESS_LOST_STATUS_ID:
return RMW_EVENT_LIVELINESS_LOST;
case DDS_OFFERED_DEADLINE_MISSED_STATUS_ID:
return RMW_EVENT_OFFERED_DEADLINE_MISSED;
case DDS_OFFERED_INCOMPATIBLE_QOS_STATUS_ID:
return RMW_EVENT_OFFERED_QOS_INCOMPATIBLE;
case DDS_PUBLICATION_MATCHED_STATUS_ID: {
rmw_event_type_t ret;
const dds_publication_matched_status_t * s =
static_cast<const dds_publication_matched_status_t *>(status);

ret = (s->current_count_change < 0) ?
RMW_EVENT_PUBLICATION_UNMATCHED :
RMW_EVENT_PUBLICATION_MATCHED;

data->pre_matched_event_current_count =
static_cast<const dds_subscription_matched_status_t *>(status)->current_count;
return ret;
}

default:
return RMW_EVENT_INVALID;
}
}

#define MAKE_DDS_EVENT_CALLBACK_FN(event_type, EVENT_TYPE) \
static void on_ ## event_type ## _fn( \
dds_entity_t entity, \
const dds_ ## event_type ## _status_t status, \
void * arg) \
{ \
(void)status; \
(void)entity; \
auto data = static_cast<user_callback_data_t *>(arg); \
std::lock_guard<std::mutex> guard(data->mutex); \
auto cb = data->event_callback[DDS_ ## EVENT_TYPE ## _STATUS_ID]; \
rmw_event_type_t event = dds_event_to_rmw_event( \
DDS_ ## EVENT_TYPE ## _STATUS_ID, \
static_cast<const void *>(&status), \
data); \
auto cb = data->event_callback[event]; \
if (cb) { \
cb(data->event_data[DDS_ ## EVENT_TYPE ## _STATUS_ID], 1); \
cb(data->event_data[event], 1); \
} else { \
data->event_unread_count[DDS_ ## EVENT_TYPE ## _STATUS_ID]++; \
data->event_unread_count[event]++; \
} \
}

Expand All @@ -515,6 +582,8 @@ MAKE_DDS_EVENT_CALLBACK_FN(requested_incompatible_qos, REQUESTED_INCOMPATIBLE_QO
MAKE_DDS_EVENT_CALLBACK_FN(sample_lost, SAMPLE_LOST)
MAKE_DDS_EVENT_CALLBACK_FN(offered_incompatible_qos, OFFERED_INCOMPATIBLE_QOS)
MAKE_DDS_EVENT_CALLBACK_FN(liveliness_changed, LIVELINESS_CHANGED)
MAKE_DDS_EVENT_CALLBACK_FN(subscription_matched, SUBSCRIPTION_MATCHED)
MAKE_DDS_EVENT_CALLBACK_FN(publication_matched, PUBLICATION_MATCHED)

static void listener_set_event_callbacks(dds_listener_t * l, void * arg)
{
Expand All @@ -525,6 +594,8 @@ static void listener_set_event_callbacks(dds_listener_t * l, void * arg)
dds_lset_offered_deadline_missed_arg(l, on_offered_deadline_missed_fn, arg, false);
dds_lset_offered_incompatible_qos_arg(l, on_offered_incompatible_qos_fn, arg, false);
dds_lset_liveliness_changed_arg(l, on_liveliness_changed_fn, arg, false);
dds_lset_subscription_matched_arg(l, on_subscription_matched_fn, arg, false);
dds_lset_publication_matched_arg(l, on_publication_matched_fn, arg, false);
}

static bool get_readwrite_qos(dds_entity_t handle, rmw_qos_profile_t * rmw_qos_policies)
Expand Down Expand Up @@ -624,103 +695,53 @@ extern "C" rmw_ret_t rmw_client_set_on_new_response_callback(
return RMW_RET_OK;
}

template<typename T>
static void event_set_callback(
T event,
dds_status_id_t status_id,
rmw_event_callback_t callback,
const void * user_data)
{
user_callback_data_t * data = &(event->user_callback_data);

std::lock_guard<std::mutex> guard(data->mutex);

// Set the user callback data
data->event_callback[status_id] = callback;
data->event_data[status_id] = user_data;

if (callback && data->event_unread_count[status_id]) {
// Push events happened before having assigned a callback
callback(user_data, data->event_unread_count[status_id]);
data->event_unread_count[status_id] = 0;
}
}

extern "C" rmw_ret_t rmw_event_set_callback(
rmw_event_t * rmw_event,
rmw_event_callback_t callback,
const void * user_data)
{
CddsSubscription * sub_event;
CddsPublisher * pub_event;
user_callback_data_t * data;
RMW_CHECK_ARGUMENT_FOR_NULL(rmw_event, RMW_RET_INVALID_ARGUMENT);

switch (rmw_event->event_type) {
case RMW_EVENT_LIVELINESS_CHANGED:
{
auto sub_event = static_cast<CddsSubscription *>(rmw_event->data);
event_set_callback(
sub_event, DDS_LIVELINESS_CHANGED_STATUS_ID,
callback, user_data);
break;
}

case RMW_EVENT_REQUESTED_DEADLINE_MISSED:
{
auto sub_event = static_cast<CddsSubscription *>(rmw_event->data);
event_set_callback(
sub_event, DDS_REQUESTED_DEADLINE_MISSED_STATUS_ID,
callback, user_data);
break;
}

case RMW_EVENT_REQUESTED_QOS_INCOMPATIBLE:
{
auto sub_event = static_cast<CddsSubscription *>(rmw_event->data);
event_set_callback(
sub_event, DDS_REQUESTED_INCOMPATIBLE_QOS_STATUS_ID,
callback, user_data);
break;
}

case RMW_EVENT_MESSAGE_LOST:
case RMW_EVENT_SUBSCRIPTION_MATCHED:
case RMW_EVENT_SUBSCRIPTION_UNMATCHED:
{
auto sub_event = static_cast<CddsSubscription *>(rmw_event->data);
event_set_callback(
sub_event, DDS_SAMPLE_LOST_STATUS_ID,
callback, user_data);
sub_event = static_cast<CddsSubscription *>(rmw_event->data);
data = &sub_event->user_callback_data;
break;
}

case RMW_EVENT_LIVELINESS_LOST:
{
auto pub_event = static_cast<CddsPublisher *>(rmw_event->data);
event_set_callback(
pub_event, DDS_LIVELINESS_LOST_STATUS_ID,
callback, user_data);
break;
}

case RMW_EVENT_OFFERED_DEADLINE_MISSED:
{
auto pub_event = static_cast<CddsPublisher *>(rmw_event->data);
event_set_callback(
pub_event, DDS_OFFERED_DEADLINE_MISSED_STATUS_ID,
callback, user_data);
break;
}

case RMW_EVENT_OFFERED_QOS_INCOMPATIBLE:
case RMW_EVENT_PUBLICATION_MATCHED:
case RMW_EVENT_PUBLICATION_UNMATCHED:
{
auto pub_event = static_cast<CddsPublisher *>(rmw_event->data);
event_set_callback(
pub_event, DDS_OFFERED_INCOMPATIBLE_QOS_STATUS_ID,
callback, user_data);
pub_event = static_cast<CddsPublisher *>(rmw_event->data);
data = &pub_event->user_callback_data;
break;
}

case RMW_EVENT_INVALID:
{
return RMW_RET_INVALID_ARGUMENT;
}
}

// Set the user callback data
data->event_callback[rmw_event->event_type] = callback;
data->event_data[rmw_event->event_type] = user_data;

if (callback && data->event_unread_count[rmw_event->event_type]) {
// Push events happened before having assigned a callback
callback(user_data, data->event_unread_count[rmw_event->event_type]);
data->event_unread_count[rmw_event->event_type] = 0;
}
return RMW_RET_OK;
}

Expand Down Expand Up @@ -3627,6 +3648,10 @@ static const std::unordered_map<rmw_event_type_t, uint32_t> mask_map{
{RMW_EVENT_REQUESTED_QOS_INCOMPATIBLE, DDS_REQUESTED_INCOMPATIBLE_QOS_STATUS},
{RMW_EVENT_OFFERED_QOS_INCOMPATIBLE, DDS_OFFERED_INCOMPATIBLE_QOS_STATUS},
{RMW_EVENT_MESSAGE_LOST, DDS_SAMPLE_LOST_STATUS},
{RMW_EVENT_SUBSCRIPTION_MATCHED, DDS_SUBSCRIPTION_MATCHED_STATUS},
{RMW_EVENT_SUBSCRIPTION_UNMATCHED, DDS_SUBSCRIPTION_MATCHED_STATUS},
{RMW_EVENT_PUBLICATION_MATCHED, DDS_PUBLICATION_MATCHED_STATUS},
{RMW_EVENT_PUBLICATION_UNMATCHED, DDS_PUBLICATION_MATCHED_STATUS}
};

static bool is_event_supported(const rmw_event_type_t event_t)
Expand Down Expand Up @@ -3752,6 +3777,70 @@ extern "C" rmw_ret_t rmw_take_event(
return RMW_RET_OK;
}

case RMW_EVENT_SUBSCRIPTION_MATCHED: {
auto ei = static_cast<rmw_matched_status_t *>(event_info);
auto sub = static_cast<CddsSubscription *>(event_handle->data);

dds_subscription_matched_status_t st;
if (dds_get_subscription_matched_status(sub->enth, &st) < 0) {
*taken = false;
return RMW_RET_ERROR;
}

std::lock_guard<std::mutex> lock(sub->user_callback_data.status_mutex);
ei->current_matched_count = st.current_count;

if (sub->user_callback_data.is_matched_status_saved) {
ei->current_count_change = st.total_count_change +
sub->user_callback_data.saved_matched_status.current_count_change;

sub->user_callback_data.is_matched_status_saved = false;
sub->user_callback_data.saved_matched_status.current_count_change = 0;
} else {
ei->current_count_change = st.total_count_change;
}

// Save status for unmatched event since matched/unmatched depend on the same dds status
sub->user_callback_data.saved_unmatched_status.current_count_change +=
st.total_count_change - st.current_count_change;
sub->user_callback_data.is_unmatched_status_saved = true;

*taken = true;
return RMW_RET_OK;
}
case RMW_EVENT_SUBSCRIPTION_UNMATCHED: {
auto ei = static_cast<rmw_matched_status_t *>(event_info);
auto sub = static_cast<CddsSubscription *>(event_handle->data);

dds_subscription_matched_status_t st;
if (dds_get_subscription_matched_status(sub->enth, &st) < 0) {
*taken = false;
return RMW_RET_ERROR;
}

std::lock_guard<std::mutex> lock(sub->user_callback_data.status_mutex);
ei->current_matched_count = st.current_count;

if (sub->user_callback_data.is_unmatched_status_saved) {
ei->current_count_change = st.total_count_change - st.current_count_change +
sub->user_callback_data.saved_unmatched_status.current_count_change;

sub->user_callback_data.is_unmatched_status_saved = false;
sub->user_callback_data.saved_unmatched_status.current_count_change = 0;
} else {
ei->current_count_change = st.total_count_change - st.current_count_change;
}

// Save status for matched event since matched/unmatched depend on the same dds status
sub->user_callback_data.saved_matched_status.current_count_change +=
st.total_count_change;

sub->user_callback_data.is_matched_status_saved = true;

*taken = true;
return RMW_RET_OK;
}

case RMW_EVENT_LIVELINESS_LOST: {
auto ei = static_cast<rmw_liveliness_lost_status_t *>(event_info);
auto pub = static_cast<CddsPublisher *>(event_handle->data);
Expand Down Expand Up @@ -3799,6 +3888,67 @@ extern "C" rmw_ret_t rmw_take_event(
}
}

case RMW_EVENT_PUBLICATION_MATCHED: {
auto ei = static_cast<rmw_matched_status_t *>(event_info);
auto pub = static_cast<CddsPublisher *>(event_handle->data);

dds_publication_matched_status st;
if (dds_get_publication_matched_status(pub->enth, &st) < 0) {
*taken = false;
return RMW_RET_ERROR;
}

std::lock_guard<std::mutex> lock(pub->user_callback_data.status_mutex);
ei->current_matched_count = st.current_count;
if (pub->user_callback_data.is_matched_status_saved) {
ei->current_count_change = st.total_count_change +
pub->user_callback_data.saved_matched_status.current_count_change;

pub->user_callback_data.is_matched_status_saved = false;
pub->user_callback_data.saved_matched_status.current_count_change = 0;
} else {
ei->current_count_change = st.total_count_change;
}

// Save status for unmatched event since matched/unmatched depend on the same dds status
pub->user_callback_data.saved_unmatched_status.current_count_change +=
st.total_count_change - st.current_count_change;
pub->user_callback_data.is_unmatched_status_saved = true;

*taken = true;
return RMW_RET_OK;
}
case RMW_EVENT_PUBLICATION_UNMATCHED: {
auto ei = static_cast<rmw_matched_status_t *>(event_info);
auto pub = static_cast<CddsPublisher *>(event_handle->data);

dds_publication_matched_status st;
if (dds_get_publication_matched_status(pub->enth, &st) < 0) {
*taken = false;
return RMW_RET_ERROR;
}

std::lock_guard<std::mutex> lock(pub->user_callback_data.status_mutex);
ei->current_matched_count = st.current_count;
if (pub->user_callback_data.is_unmatched_status_saved) {
ei->current_count_change = st.total_count_change - st.current_count_change +
pub->user_callback_data.saved_unmatched_status.current_count_change;

pub->user_callback_data.is_unmatched_status_saved = false;
pub->user_callback_data.saved_unmatched_status.current_count_change = 0;
} else {
ei->current_count_change = st.total_count_change - st.current_count_change;
}

// Save status for matched event since matched/unmatched depend on the same dds status
pub->user_callback_data.saved_matched_status.current_count_change +=
st.total_count_change;
pub->user_callback_data.is_matched_status_saved = true;

*taken = true;
return RMW_RET_OK;
}

case RMW_EVENT_INVALID: {
break;
}
Expand Down

0 comments on commit 984ed63

Please sign in to comment.