Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Ensure compliant publisher API #210

Merged
merged 3 commits into from
Jul 28, 2020
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
169 changes: 128 additions & 41 deletions rmw_cyclonedds_cpp/src/rmw_node.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1588,7 +1588,6 @@ static dds_qos_t * create_readwrite_qos(
dds_qset_writer_data_lifecycle(qos, false); /* disable autodispose */
switch (qos_policies->history) {
case RMW_QOS_POLICY_HISTORY_SYSTEM_DEFAULT:
case RMW_QOS_POLICY_HISTORY_UNKNOWN:
case RMW_QOS_POLICY_HISTORY_KEEP_LAST:
if (qos_policies->depth == RMW_QOS_POLICY_DEPTH_SYSTEM_DEFAULT) {
dds_qset_history(qos, DDS_HISTORY_KEEP_LAST, 1);
Expand All @@ -1604,24 +1603,26 @@ static dds_qos_t * create_readwrite_qos(
case RMW_QOS_POLICY_HISTORY_KEEP_ALL:
dds_qset_history(qos, DDS_HISTORY_KEEP_ALL, DDS_LENGTH_UNLIMITED);
break;
case RMW_QOS_POLICY_HISTORY_UNKNOWN:
return nullptr;
default:
rmw_cyclonedds_cpp::unreachable();
}
switch (qos_policies->reliability) {
case RMW_QOS_POLICY_RELIABILITY_SYSTEM_DEFAULT:
case RMW_QOS_POLICY_RELIABILITY_UNKNOWN:
case RMW_QOS_POLICY_RELIABILITY_RELIABLE:
dds_qset_reliability(qos, DDS_RELIABILITY_RELIABLE, DDS_INFINITY);
break;
case RMW_QOS_POLICY_RELIABILITY_BEST_EFFORT:
dds_qset_reliability(qos, DDS_RELIABILITY_BEST_EFFORT, 0);
break;
case RMW_QOS_POLICY_RELIABILITY_UNKNOWN:
return nullptr;
default:
rmw_cyclonedds_cpp::unreachable();
}
switch (qos_policies->durability) {
case RMW_QOS_POLICY_DURABILITY_SYSTEM_DEFAULT:
case RMW_QOS_POLICY_DURABILITY_UNKNOWN:
case RMW_QOS_POLICY_DURABILITY_VOLATILE:
dds_qset_durability(qos, DDS_DURABILITY_VOLATILE);
break;
Expand All @@ -1638,6 +1639,8 @@ static dds_qos_t * create_readwrite_qos(
DDS_LENGTH_UNLIMITED);
break;
}
case RMW_QOS_POLICY_DURABILITY_UNKNOWN:
return nullptr;
default:
rmw_cyclonedds_cpp::unreachable();
}
Expand All @@ -1659,12 +1662,13 @@ static dds_qos_t * create_readwrite_qos(
switch (qos_policies->liveliness) {
case RMW_QOS_POLICY_LIVELINESS_SYSTEM_DEFAULT:
case RMW_QOS_POLICY_LIVELINESS_AUTOMATIC:
case RMW_QOS_POLICY_LIVELINESS_UNKNOWN:
dds_qset_liveliness(qos, DDS_LIVELINESS_AUTOMATIC, ldur);
break;
case RMW_QOS_POLICY_LIVELINESS_MANUAL_BY_TOPIC:
dds_qset_liveliness(qos, DDS_LIVELINESS_MANUAL_BY_TOPIC, ldur);
break;
case RMW_QOS_POLICY_LIVELINESS_UNKNOWN:
return nullptr;
default:
rmw_cyclonedds_cpp::unreachable();
}
Expand Down Expand Up @@ -1902,33 +1906,40 @@ static rmw_publisher_t * create_publisher(
)
{
CddsPublisher * pub;
rmw_publisher_t * rmw_publisher;
if ((pub =
create_cdds_publisher(
dds_ppant, dds_pub, type_supports, topic_name,
qos_policies)) == nullptr)
{
goto fail_common_init;
return nullptr;
}
rmw_publisher = rmw_publisher_allocate();
RET_ALLOC_X(rmw_publisher, goto fail_publisher);
auto cleanup_cdds_publisher = rcpputils::make_scope_exit(
[pub]() {
if (dds_delete(pub->enth) < 0) {
RCUTILS_LOG_ERROR_NAMED(
"rmw_cyclonedds_cpp", "failed to delete writer during error handling");
}
delete pub;
});

rmw_publisher_t * rmw_publisher = rmw_publisher_allocate();
RET_ALLOC_X(rmw_publisher, return nullptr);
auto cleanup_rmw_publisher = rcpputils::make_scope_exit(
[rmw_publisher]() {
rmw_free(const_cast<char *>(rmw_publisher->topic_name));
rmw_publisher_free(rmw_publisher);
});
rmw_publisher->implementation_identifier = eclipse_cyclonedds_identifier;
rmw_publisher->data = pub;
rmw_publisher->topic_name = reinterpret_cast<char *>(rmw_allocate(strlen(topic_name) + 1));
RET_ALLOC_X(rmw_publisher->topic_name, goto fail_topic_name);
RET_ALLOC_X(rmw_publisher->topic_name, return nullptr);
memcpy(const_cast<char *>(rmw_publisher->topic_name), topic_name, strlen(topic_name) + 1);
rmw_publisher->options = *publisher_options;
rmw_publisher->can_loan_messages = false;

cleanup_rmw_publisher.cancel();
cleanup_cdds_publisher.cancel();
return rmw_publisher;
fail_topic_name:
rmw_publisher_free(rmw_publisher);
fail_publisher:
if (dds_delete(pub->enth) < 0) {
RCUTILS_LOG_ERROR_NAMED("rmw_cyclonedds_cpp", "failed to delete writer during error handling");
}
delete pub;
fail_common_init:
return nullptr;
}

extern "C" rmw_publisher_t * rmw_create_publisher(
Expand All @@ -1937,29 +1948,67 @@ extern "C" rmw_publisher_t * rmw_create_publisher(
const rmw_publisher_options_t * publisher_options
)
{
RET_WRONG_IMPLID_X(node, return nullptr);
RMW_CHECK_ARGUMENT_FOR_NULL(node, nullptr);
RMW_CHECK_TYPE_IDENTIFIERS_MATCH(
node,
node->implementation_identifier,
eclipse_cyclonedds_identifier,
return nullptr);
RMW_CHECK_ARGUMENT_FOR_NULL(type_supports, nullptr);
RMW_CHECK_ARGUMENT_FOR_NULL(topic_name, nullptr);
if (0 == strlen(topic_name)) {
RMW_SET_ERROR_MSG("topic_name argument is an empty string");
return nullptr;
}
RMW_CHECK_ARGUMENT_FOR_NULL(qos_policies, nullptr);
if (!qos_policies->avoid_ros_namespace_conventions) {
int validation_result = RMW_TOPIC_VALID;
rmw_ret_t ret = rmw_validate_full_topic_name(topic_name, &validation_result, nullptr);
if (RMW_RET_OK != ret) {
return nullptr;
}
if (RMW_TOPIC_VALID != validation_result) {
const char * reason = rmw_full_topic_name_validation_result_string(validation_result);
RMW_SET_ERROR_MSG_WITH_FORMAT_STRING("invalid topic name: %s", reason);
return nullptr;
}
}
RMW_CHECK_ARGUMENT_FOR_NULL(publisher_options, nullptr);

rmw_publisher_t * pub = create_publisher(
node->context->impl->ppant, node->context->impl->dds_pub,
type_supports, topic_name, qos_policies,
publisher_options);
if (pub != nullptr) {
// Update graph
auto common = &node->context->impl->common;
const auto cddspub = static_cast<const CddsPublisher *>(pub->data);
if (pub == nullptr) {
return nullptr;
}
auto cleanup_publisher = rcpputils::make_scope_exit(
[pub]() {
rmw_error_state_t error_state = *rmw_get_error_state();
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: Inverting the logic in L1978, setting an error message and returning NULL will make the logic a bit clearer.

rmw_reset_error();
if (RMW_RET_OK != destroy_publisher(pub)) {
RMW_SAFE_FWRITE_TO_STDERR(rmw_get_error_string().str);
RMW_SAFE_FWRITE_TO_STDERR(" during '" RCUTILS_STRINGIFY(__function__) "' cleanup\n");
rmw_reset_error();
}
rmw_set_error_state(error_state.message, error_state.file, error_state.line_number);
});

// Update graph
auto common = &node->context->impl->common;
const auto cddspub = static_cast<const CddsPublisher *>(pub->data);
{
std::lock_guard<std::mutex> guard(common->node_update_mutex);
rmw_dds_common::msg::ParticipantEntitiesInfo msg =
common->graph_cache.associate_writer(cddspub->gid, common->gid, node->name, node->namespace_);
if (RMW_RET_OK != rmw_publish(
common->pub,
static_cast<void *>(&msg),
nullptr))
{
if (RMW_RET_OK != rmw_publish(common->pub, static_cast<void *>(&msg), nullptr)) {
static_cast<void>(common->graph_cache.dissociate_writer(
cddspub->gid, common->gid, node->name, node->namespace_));
static_cast<void>(destroy_publisher(pub));
return nullptr;
}
}

cleanup_publisher.cancel();
return pub;
}

Expand Down Expand Up @@ -2016,14 +2065,18 @@ rmw_ret_t rmw_publisher_assert_liveliness(const rmw_publisher_t * publisher)

rmw_ret_t rmw_publisher_get_actual_qos(const rmw_publisher_t * publisher, rmw_qos_profile_t * qos)
{
RET_NULL(qos);
RET_WRONG_IMPLID(publisher);
RMW_CHECK_ARGUMENT_FOR_NULL(publisher, RMW_RET_INVALID_ARGUMENT);
RMW_CHECK_TYPE_IDENTIFIERS_MATCH(
publisher,
publisher->implementation_identifier,
eclipse_cyclonedds_identifier,
return RMW_RET_INCORRECT_RMW_IMPLEMENTATION);
RMW_CHECK_ARGUMENT_FOR_NULL(qos, RMW_RET_INVALID_ARGUMENT);
auto pub = static_cast<CddsPublisher *>(publisher->data);
if (get_readwrite_qos(pub->enth, qos)) {
return RMW_RET_OK;
} else {
return RMW_RET_ERROR;
}
return RMW_RET_ERROR;
}

extern "C" rmw_ret_t rmw_borrow_loaned_message(
Expand Down Expand Up @@ -2051,23 +2104,37 @@ extern "C" rmw_ret_t rmw_return_loaned_message_from_publisher(

static rmw_ret_t destroy_publisher(rmw_publisher_t * publisher)
{
RET_WRONG_IMPLID(publisher);
rmw_ret_t ret = RMW_RET_OK;
auto pub = static_cast<CddsPublisher *>(publisher->data);
if (pub != nullptr) {
if (dds_delete(pub->enth) < 0) {
RMW_SET_ERROR_MSG("failed to delete writer");
ret = RMW_RET_ERROR;
}
delete pub;
}
rmw_free(const_cast<char *>(publisher->topic_name));
publisher->topic_name = nullptr;
rmw_publisher_free(publisher);
return RMW_RET_OK;
return ret;
}

extern "C" rmw_ret_t rmw_destroy_publisher(rmw_node_t * node, rmw_publisher_t * publisher)
{
RET_WRONG_IMPLID(node);
RMW_CHECK_ARGUMENT_FOR_NULL(node, RMW_RET_INVALID_ARGUMENT);
RMW_CHECK_ARGUMENT_FOR_NULL(publisher, RMW_RET_INVALID_ARGUMENT);
RMW_CHECK_TYPE_IDENTIFIERS_MATCH(
node,
node->implementation_identifier,
eclipse_cyclonedds_identifier,
return RMW_RET_INCORRECT_RMW_IMPLEMENTATION);
RMW_CHECK_TYPE_IDENTIFIERS_MATCH(
publisher,
publisher->implementation_identifier,
eclipse_cyclonedds_identifier,
return RMW_RET_INCORRECT_RMW_IMPLEMENTATION);

rmw_ret_t ret = RMW_RET_OK;
rmw_error_state_t error_state;
{
auto common = &node->context->impl->common;
const auto cddspub = static_cast<const CddsPublisher *>(publisher->data);
Expand All @@ -2076,12 +2143,32 @@ extern "C" rmw_ret_t rmw_destroy_publisher(rmw_node_t * node, rmw_publisher_t *
common->graph_cache.dissociate_writer(
cddspub->gid, common->gid, node->name,
node->namespace_);
if (RMW_RET_OK != rmw_publish(common->pub, static_cast<void *>(&msg), nullptr)) {
RMW_SET_ERROR_MSG(
"failed to publish ParticipantEntitiesInfo message after dissociating writer");
rmw_ret_t publish_ret =
rmw_publish(common->pub, static_cast<void *>(&msg), nullptr);
if (RMW_RET_OK != publish_ret) {
error_state = *rmw_get_error_state();
ret = publish_ret;
rmw_reset_error();
}
}

rmw_ret_t inner_ret = destroy_publisher(publisher);
if (RMW_RET_OK != inner_ret) {
Copy link
Contributor Author

@hidmic hidmic Jul 24, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't like having to do this to ensure the first error that occurs is the one that pops up and that no subsequent error goes unnoticed.

If we had a way to tell rcutils that we're in error handling or finalization/cleanup mode so that it doesn't complain about error states being overwritten (and prints the new one to stderr, or aborts the program, or whatever), this code would simpler (and a bit more neat). What do reviewers think?

Copy link
Member

@ivanpauno ivanpauno Jul 24, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The current logic isn't super nice, but isn't wrong either.
Maybe directly logging all destruction errors and setting a generic error state at the end is a better idea.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The current logic isn't super nice, but isn't wrong either.

No, it's not incorrect, but it's... messy.

Maybe directly logging all destruction errors and setting a generic error state at the end is a better idea.

We could do that, but that doesn't cover errors during cleanup. Besides, it's important to know what the actual error was to do something about it (or debug it), nevermind leaks. At any rate, I don't mind doing this. I just figured it'd much easier if rcutils took care of redirecting all RCUTILS_SET_ERROR_MSG() for a given scope. @clalancette @wjwwood @jacobperron for feedback too.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We could do that, but that doesn't cover errors during cleanup

Do you mean the cleanup that a function does when it fails?
In that case it would be good to set an error message about what failed, and log all cleanup failures.
So logging is fine in that case too,

I just figured it'd much easier if rcutils took care of redirecting all RCUTILS_SET_ERROR_MSG() for a given scope

How?

Copy link
Contributor Author

@hidmic hidmic Jul 24, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For instance, having a flag in TLS that client code can toggle. For C++ implementations, we may be able to use RAII for easier control of the error handling scope. If done right it can reduce clutter.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ok, that's fair. As I said elsewhere, I wonder about the usefulness of a return code if we log everything.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The return code would still be useful, as the caller can know that sth failed (it doesn't matter if we return the first error code or the last one).
The error string won't be useful if we log everything, yes.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I wonder about the usefulness of a return code if we log everything

Isn't the purpose of the return code so that the calling function knows something went wrong, and optionally if there is enough information in the return code to then make an intelligent decision about what to do about it? The logging is more for after-the-fact debugging so having everything logged doesn't seem bad to me.

Having an error string would also still be useful for when logging is turned down, I think.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why not simply pass an RMW error state object around inside the implementation, with a setter that implements the behaviour of returning the first error and logging the rest? That's basically the pattern you have here with ret and inner_ret, but it'd remove the messiness from the code.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

and optionally if there is enough information in the return code to then make an intelligent decision about what to do about it?

IMHO that's the whole point of having different, documented return codes, vs. a bool.

Why not simply pass an RMW error state object around inside the implementation, with a setter that implements the behaviour of returning the first error and logging the rest?

Yeah, I was trying not to (re)build this behavior for each RMW implementation, but to make it widely available. I'll probably push this as-is until we've reached consensus on how to do this nicely.

if (RMW_RET_OK != ret) {
RMW_SAFE_FWRITE_TO_STDERR(rmw_get_error_string().str);
RMW_SAFE_FWRITE_TO_STDERR(" during '" RCUTILS_STRINGIFY(__function__) "'\n");
} else {
error_state = *rmw_get_error_state();
ret = inner_ret;
}
rmw_reset_error();
}
return destroy_publisher(publisher);

if (RMW_RET_OK != ret) {
rmw_set_error_state(error_state.message, error_state.file, error_state.line_number);
}

return ret;
}


Expand Down