Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Ensure compliant publisher API. #414

Merged
merged 4 commits into from
Jul 28, 2020
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
88 changes: 51 additions & 37 deletions rmw_fastrtps_cpp/src/publisher.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,9 @@
#include "rmw/allocators.h"
#include "rmw/error_handling.h"
#include "rmw/rmw.h"
#include "rmw/validate_full_topic_name.h"

#include "rcpputils/scope_exit.hpp"

#include "rmw_fastrtps_shared_cpp/create_rmw_gid.hpp"
#include "rmw_fastrtps_shared_cpp/custom_participant_info.hpp"
Expand Down Expand Up @@ -49,22 +52,30 @@ rmw_fastrtps_cpp::create_publisher(
bool keyed,
bool create_publisher_listener)
{
if (!participant_info) {
RMW_SET_ERROR_MSG("participant_info is null");
return nullptr;
}
if (!topic_name || strlen(topic_name) == 0) {
RMW_SET_ERROR_MSG("publisher topic is null or empty string");
RMW_CHECK_ARGUMENT_FOR_NULL(participant_info, nullptr);
RMW_CHECK_ARGUMENT_FOR_NULL(type_supports, nullptr);
RMW_CHECK_ARGUMENT_FOR_NULL(topic_name, nullptr);
clalancette marked this conversation as resolved.
Show resolved Hide resolved
if (0 == strlen(topic_name)) {
RMW_SET_ERROR_MSG("topic_name argument is an empty string");
return nullptr;
}
if (!qos_policies) {
RMW_SET_ERROR_MSG("qos_policies is null");
return nullptr;
}
if (!publisher_options) {
RMW_SET_ERROR_MSG("publisher_options is null");
return nullptr;
RMW_CHECK_ARGUMENT_FOR_NULL(qos_policies, nullptr);
if (!qos_policies->avoid_ros_namespace_conventions) {
int validation_result = RMW_TOPIC_VALID;
rmw_ret_t ret = rmw_validate_full_topic_name(topic_name, &validation_result, nullptr);
if (RMW_RET_OK != ret) {
return nullptr;
}
if (RMW_TOPIC_VALID != validation_result) {
const char * reason = rmw_full_topic_name_validation_result_string(validation_result);
RMW_SET_ERROR_MSG_WITH_FORMAT_STRING("invalid topic name: %s", reason);
return nullptr;
}
}
RMW_CHECK_ARGUMENT_FOR_NULL(publisher_options, nullptr);

Participant * participant = participant_info->participant;
RMW_CHECK_ARGUMENT_FOR_NULL(participant, nullptr);

const rosidl_message_type_support_t * type_support = get_message_typesupport_handle(
type_supports, RMW_FASTRTPS_CPP_TYPESUPPORT_C);
Expand Down Expand Up @@ -93,23 +104,30 @@ rmw_fastrtps_cpp::create_publisher(
RMW_SET_ERROR_MSG("failed to allocate CustomPublisherInfo");
return nullptr;
}

auto cleanup_info = rcpputils::make_scope_exit(
[info, participant]() {
if (info->type_support_) {
_unregister_type(participant, info->type_support_);
}
delete info->listener_;
ivanpauno marked this conversation as resolved.
Show resolved Hide resolved
delete info;
});
info->typesupport_identifier_ = type_support->typesupport_identifier;
info->type_support_impl_ = type_support->data;

auto callbacks = static_cast<const message_type_support_callbacks_t *>(type_support->data);
std::string type_name = _create_type_name(callbacks);
if (
!Domain::getRegisteredType(
participant_info->participant, type_name.c_str(),
participant, type_name.c_str(),
reinterpret_cast<TopicDataType **>(&info->type_support_)))
{
info->type_support_ = new (std::nothrow) MessageTypeSupport_cpp(callbacks);
if (!info->type_support_) {
RMW_SET_ERROR_MSG("Failed to allocate MessageTypeSupport");
goto fail;
RMW_SET_ERROR_MSG("failed to allocate MessageTypeSupport");
return nullptr;
}
_register_type(participant_info->participant, info->type_support_);
_register_type(participant, info->type_support_);
}

if (!participant_info->leave_middleware_default_qos) {
Expand All @@ -124,26 +142,25 @@ rmw_fastrtps_cpp::create_publisher(
publisherParam.topic.topicName = _create_topic_name(qos_policies, ros_topic_prefix, topic_name);

if (!get_datawriter_qos(*qos_policies, publisherParam)) {
RMW_SET_ERROR_MSG("failed to get datawriter qos");
ivanpauno marked this conversation as resolved.
Show resolved Hide resolved
goto fail;
return nullptr;
}

info->listener_ = nullptr;
if (create_publisher_listener) {
info->listener_ = new (std::nothrow) PubListener(info);
if (!info->listener_) {
RMW_SET_ERROR_MSG("create_publisher() could not create publisher listener");
goto fail;
return nullptr;
}
}

info->publisher_ = Domain::createPublisher(
participant_info->participant,
participant,
publisherParam,
info->listener_);
if (!info->publisher_) {
RMW_SET_ERROR_MSG("create_publisher() could not create publisher");
goto fail;
return nullptr;
}

info->publisher_gid = rmw_fastrtps_shared_cpp::create_rmw_gid(
Expand All @@ -152,30 +169,27 @@ rmw_fastrtps_cpp::create_publisher(
rmw_publisher = rmw_publisher_allocate();
if (!rmw_publisher) {
RMW_SET_ERROR_MSG("failed to allocate publisher");
goto fail;
return nullptr;
}
auto cleanup_publisher = rcpputils::make_scope_exit(
[rmw_publisher]() {
rmw_free(const_cast<char *>(rmw_publisher->topic_name));
rmw_publisher_free(rmw_publisher);
});

rmw_publisher->implementation_identifier = eprosima_fastrtps_identifier;
rmw_publisher->data = info;
rmw_publisher->topic_name = static_cast<char *>(rmw_allocate(strlen(topic_name) + 1));

rmw_publisher->topic_name = static_cast<char *>(rmw_allocate(strlen(topic_name) + 1));
if (!rmw_publisher->topic_name) {
RMW_SET_ERROR_MSG("failed to allocate memory for publisher topic name");
goto fail;
return nullptr;
}

memcpy(const_cast<char *>(rmw_publisher->topic_name), topic_name, strlen(topic_name) + 1);

rmw_publisher->options = *publisher_options;

cleanup_publisher.cancel();
cleanup_info.cancel();
return rmw_publisher;

fail:
if (info) {
delete info->type_support_;
delete info->listener_;
delete info;
}
rmw_publisher_free(rmw_publisher);

return nullptr;
}
42 changes: 32 additions & 10 deletions rmw_fastrtps_cpp/src/rmw_publisher.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,8 @@
#include "rmw/error_handling.h"
#include "rmw/rmw.h"

#include "rmw/impl/cpp/macros.hpp"

#include "rmw_fastrtps_shared_cpp/custom_participant_info.hpp"
#include "rmw_fastrtps_shared_cpp/custom_publisher_info.hpp"
#include "rmw_fastrtps_shared_cpp/rmw_common.hpp"
Expand Down Expand Up @@ -63,15 +65,12 @@ rmw_create_publisher(
const rmw_qos_profile_t * qos_policies,
const rmw_publisher_options_t * publisher_options)
{
if (!node) {
RMW_SET_ERROR_MSG("node handle is null");
return nullptr;
}

if (node->implementation_identifier != eprosima_fastrtps_identifier) {
RMW_SET_ERROR_MSG("node handle not from this implementation");
return nullptr;
}
RMW_CHECK_ARGUMENT_FOR_NULL(node, nullptr);
RMW_CHECK_TYPE_IDENTIFIERS_MATCH(
node,
node->implementation_identifier,
eprosima_fastrtps_identifier,
return nullptr);

rmw_publisher_t * publisher = rmw_fastrtps_cpp::create_publisher(
static_cast<CustomParticipantInfo *>(node->context->impl->participant_info),
Expand Down Expand Up @@ -101,8 +100,18 @@ rmw_create_publisher(
static_cast<void *>(&msg),
nullptr);
if (RMW_RET_OK != rmw_ret) {
rmw_fastrtps_shared_cpp::__rmw_destroy_publisher(
rmw_error_state_t error_state = *rmw_get_error_state();
rmw_reset_error();
static_cast<void>(common_context->graph_cache.dissociate_writer(
info->publisher_gid, common_context->gid, node->name, node->namespace_));
rmw_ret = rmw_fastrtps_shared_cpp::__rmw_destroy_publisher(
eprosima_fastrtps_identifier, node, publisher);
if (RMW_RET_OK != rmw_ret) {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't like having to do this to ensure the first error that occurs is the one that pops up and that no subsequent error goes unnoticed.

If we had a way to tell rcutils that we're in error handling or finalization/cleanup mode so that it doesn't complain about error states being overwritten (and prints the new one to stderr, or aborts the program, or whatever), this code would simpler (and a bit more neat). What do reviewers think?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

same comment than in the rmw_cyclonedds PR.
I think that for destruction, logging everywhere and setting a generic error message at the end if an error happened might be the simpler approach.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That'd be simpler, yeah, but I think we would be trading usability in the process. To the point it makes me wonder what 's the point of returning an generic error code vs. not returning anything.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I agree that this is a bit unfortunate. The big problem is that you have to be careful on every error path to do the checking and not overwrite the error. It will always be a game of whack-a-mole (or we have to have really good tests).

That'd be simpler, yeah, but I think we would be trading usability in the process. To the point it makes me wonder what 's the point of returning an generic error code vs. not returning anything.

To be fair, the generic error code at least distinguishes between "succeeded" and "failed". The question on whether the individual errors are worthwhile comes down to whether the calling code can do anything reasonable in response to the error. If that caller can do something specific in response to some errors, then it makes sense to have individual errors. If the caller can never do anything about the situation, the generic error is enough.

The problem in my mind with the rcutils flag is that it is not generic enough; you also need that flag at the rcl layer, rmw layer, and potentially the rclcpp/rclpy layers as well.

Overall, I think the error situation isn't perfect. But I also think it is a large topic, and we don't necessarily have to solve it here. I'd say that we continue with this PR assuming the current situation. We can open up a separate issue and potentially have a meeting where we talk about the broader issue of errors. Does that sound reasonable?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

To be fair, the generic error code at least distinguishes between "succeeded" and "failed".

True, but we don't need specific return codes for that.

The problem in my mind with the rcutils flag is that it is not generic enough; you also need that flag at the rcl layer, rmw layer, and potentially the rclcpp/rclpy layers as well.

Why? It's the same flag everywhere. It doesn't have to be a flag either, it's just that building anything bigger than this seems overkill.

We can open up a separate issue and potentially have a meeting where we talk about the broader issue of errors. Does that sound reasonable?

Fair enough. Let's roll as-is.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why? It's the same flag everywhere. It doesn't have to be a flag either, it's just that building anything bigger than this seems overkill.

I'm thinking of this situation:

int rcl_do_thing() {
  rcl_subthing_init();  // Succeeds
  if (rcutils_do_something() != OK) {
    // Failed, rcutils_do_something() set error code/message
    rcl_subthing_cleanup();  // Also failed, reset error code/message
    return NOT_OK;
  }
}

But its possible that I don't have enough context on how the error handling works.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So, all layers use the same thread local error state and API that's defined in rcutils but through aliases. In your example, rcl_do_thing would enter this error handling mode for the extent of that if block statement. In a way, it's a poor man's try-catch block.

But I see now that arbitrarily nested error handling may occur (a try-catch in the catch block of a try-catch in the catch block of...). We would have to make it reentrant and a boolean flag isn't enough. An integer count could be used instead.

RMW_SAFE_FWRITE_TO_STDERR(rmw_get_error_string().str);
RMW_SAFE_FWRITE_TO_STDERR(" during '" RCUTILS_STRINGIFY(__function__) "' cleanup\n");
rmw_reset_error();
}
rmw_set_error_state(error_state.message, error_state.file, error_state.line_number);
return nullptr;
}
}
Expand Down Expand Up @@ -164,6 +173,19 @@ rmw_return_loaned_message_from_publisher(
rmw_ret_t
rmw_destroy_publisher(rmw_node_t * node, rmw_publisher_t * publisher)
{
RMW_CHECK_ARGUMENT_FOR_NULL(node, RMW_RET_INVALID_ARGUMENT);
RMW_CHECK_ARGUMENT_FOR_NULL(publisher, RMW_RET_INVALID_ARGUMENT);
RMW_CHECK_TYPE_IDENTIFIERS_MATCH(
node,
node->implementation_identifier,
eprosima_fastrtps_identifier,
return RMW_RET_INCORRECT_RMW_IMPLEMENTATION);
RMW_CHECK_TYPE_IDENTIFIERS_MATCH(
publisher,
publisher->implementation_identifier,
eprosima_fastrtps_identifier,
return RMW_RET_INCORRECT_RMW_IMPLEMENTATION);
ivanpauno marked this conversation as resolved.
Show resolved Hide resolved

return rmw_fastrtps_shared_cpp::__rmw_destroy_publisher(
eprosima_fastrtps_identifier, node, publisher);
}
Expand Down
91 changes: 50 additions & 41 deletions rmw_fastrtps_dynamic_cpp/src/publisher.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,9 @@
#include "rmw/allocators.h"
#include "rmw/error_handling.h"
#include "rmw/rmw.h"
#include "rmw/validate_full_topic_name.h"

#include "rcpputils/scope_exit.hpp"

#include "rmw_fastrtps_shared_cpp/custom_participant_info.hpp"
#include "rmw_fastrtps_shared_cpp/custom_publisher_info.hpp"
Expand Down Expand Up @@ -51,31 +54,30 @@ rmw_fastrtps_dynamic_cpp::create_publisher(
(void)keyed;
(void)create_publisher_listener;

if (!participant_info) {
RMW_SET_ERROR_MSG("participant_info is null");
return nullptr;
}

if (!topic_name || strlen(topic_name) == 0) {
RMW_SET_ERROR_MSG("publisher topic is null or empty string");
return nullptr;
}

if (!qos_policies) {
RMW_SET_ERROR_MSG("qos_policies is null");
RMW_CHECK_ARGUMENT_FOR_NULL(participant_info, nullptr);
RMW_CHECK_ARGUMENT_FOR_NULL(type_supports, nullptr);
RMW_CHECK_ARGUMENT_FOR_NULL(topic_name, nullptr);
if (0 == strlen(topic_name)) {
RMW_SET_ERROR_MSG("topic_name argument is an empty string");
return nullptr;
}

if (!publisher_options) {
RMW_SET_ERROR_MSG("publisher_options is null");
return nullptr;
RMW_CHECK_ARGUMENT_FOR_NULL(qos_policies, nullptr);
if (!qos_policies->avoid_ros_namespace_conventions) {
int validation_result = RMW_TOPIC_VALID;
rmw_ret_t ret = rmw_validate_full_topic_name(topic_name, &validation_result, nullptr);
if (RMW_RET_OK != ret) {
return nullptr;
}
if (RMW_TOPIC_VALID != validation_result) {
const char * reason = rmw_full_topic_name_validation_result_string(validation_result);
RMW_SET_ERROR_MSG_WITH_FORMAT_STRING("invalid topic name: %s", reason);
return nullptr;
}
}
RMW_CHECK_ARGUMENT_FOR_NULL(publisher_options, nullptr);

Participant * participant = participant_info->participant;
if (!participant) {
RMW_SET_ERROR_MSG("participant handle is null");
return nullptr;
}
RMW_CHECK_ARGUMENT_FOR_NULL(participant, nullptr);

const rosidl_message_type_support_t * type_support = get_message_typesupport_handle(
type_supports, rosidl_typesupport_introspection_c__identifier);
Expand Down Expand Up @@ -104,14 +106,25 @@ rmw_fastrtps_dynamic_cpp::create_publisher(
RMW_SET_ERROR_MSG("failed to allocate CustomPublisherInfo");
return nullptr;
}
auto cleanup_info = rcpputils::make_scope_exit(
[info, participant]() {
if (info->type_support_) {
_unregister_type(participant, info->type_support_);
}
delete info->listener_;
delete info;
});

TypeSupportRegistry & type_registry = TypeSupportRegistry::get_instance();
auto type_impl = type_registry.get_message_type_support(type_support);
if (!type_impl) {
delete info;
RMW_SET_ERROR_MSG("failed to allocate type support");
return nullptr;
}
auto return_type_support = rcpputils::make_scope_exit(
[&type_registry, type_support]() {
type_registry.return_message_type_support(type_support);
});

info->typesupport_identifier_ = type_support->typesupport_identifier;
info->type_support_impl_ = type_impl;
Expand All @@ -126,7 +139,7 @@ rmw_fastrtps_dynamic_cpp::create_publisher(
info->type_support_ = new (std::nothrow) TypeSupportProxy(type_impl);
if (!info->type_support_) {
RMW_SET_ERROR_MSG("failed to allocate TypeSupportProxy");
goto fail;
return nullptr;
}
_register_type(participant, info->type_support_);
}
Expand All @@ -150,20 +163,19 @@ rmw_fastrtps_dynamic_cpp::create_publisher(
// publisherParam.throughputController = throughputController;

if (!get_datawriter_qos(*qos_policies, publisherParam)) {
RMW_SET_ERROR_MSG("failed to get datawriter qos");
goto fail;
return nullptr;
}

info->listener_ = new (std::nothrow) PubListener(info);
if (!info->listener_) {
RMW_SET_ERROR_MSG("create_publisher() could not create publisher listener");
goto fail;
return nullptr;
}

info->publisher_ = Domain::createPublisher(participant, publisherParam, info->listener_);
if (!info->publisher_) {
RMW_SET_ERROR_MSG("create_publisher() could not create publisher");
goto fail;
return nullptr;
}

info->publisher_gid = rmw_fastrtps_shared_cpp::create_rmw_gid(
Expand All @@ -172,32 +184,29 @@ rmw_fastrtps_dynamic_cpp::create_publisher(
rmw_publisher = rmw_publisher_allocate();
if (!rmw_publisher) {
RMW_SET_ERROR_MSG("failed to allocate publisher");
goto fail;
return nullptr;
}
auto cleanup_publisher = rcpputils::make_scope_exit(
[rmw_publisher]() {
rmw_free(const_cast<char *>(rmw_publisher->topic_name));
rmw_publisher_free(rmw_publisher);
});

rmw_publisher->can_loan_messages = false;
rmw_publisher->implementation_identifier = eprosima_fastrtps_identifier;
rmw_publisher->data = info;
rmw_publisher->topic_name = static_cast<char *>(rmw_allocate(strlen(topic_name) + 1));

rmw_publisher->topic_name = static_cast<char *>(rmw_allocate(strlen(topic_name) + 1));
if (!rmw_publisher->topic_name) {
RMW_SET_ERROR_MSG("failed to allocate memory for publisher topic name");
goto fail;
return nullptr;
}

memcpy(const_cast<char *>(rmw_publisher->topic_name), topic_name, strlen(topic_name) + 1);

rmw_publisher->options = *publisher_options;

cleanup_publisher.cancel();
cleanup_info.cancel();
return_type_support.cancel();
return rmw_publisher;

fail:
if (info) {
delete info->type_support_;
delete info->listener_;
delete info;
}
type_registry.return_message_type_support(type_support);
rmw_publisher_free(rmw_publisher);

return nullptr;
}
Loading