Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add support for user-specified content filters #68

Merged
merged 30 commits into from
Mar 22, 2022
Merged
Show file tree
Hide file tree
Changes from 8 commits
Commits
Show all changes
30 commits
Select commit Hold shift + click to select a range
1e0027e
Add support for user-specified content filters.
asorbini Oct 22, 2021
d4788f9
- Resolve memory leak of custom content-filter resources
asorbini Oct 25, 2021
d9a2ba0
Assume non-null options argument
asorbini Oct 25, 2021
1c313e3
- Return error when retrieving content-filter from a subscription tha…
asorbini Oct 25, 2021
3c532ce
Fix compilation error, oops.
asorbini Oct 25, 2021
669ff31
- Define RMW_CONNEXT_DEBUG when building Debug libraries.
asorbini Oct 25, 2021
13087f1
Resolve memory leak for finalization on error.
asorbini Oct 26, 2021
883e9cf
Rename content filter public API.
asorbini Oct 26, 2021
ced2a15
Add client/service QoS getters (#67)
mauropasse Nov 19, 2021
896839f
Changelogs
ivanpauno Nov 19, 2021
a3bbfb6
0.8.1
ivanpauno Nov 19, 2021
8b10deb
Fix cpplint errors (#69)
jacobperron Jan 12, 2022
099692a
0.8.2
paudrow Jan 15, 2022
70d6e2e
Update rti-connext-dds dependency to 6.0.1. (#71)
nuclearsandwich Feb 10, 2022
bbdf5af
0.8.3
nuclearsandwich Feb 10, 2022
4129331
Add rmw listener apis (#44)
Feb 24, 2022
49f3b55
Changelog. (#73)
clalancette Mar 1, 2022
4becb93
0.9.0
clalancette Mar 1, 2022
203f820
add stub for content filtered topic
Mar 18, 2022
5a36784
* Rebased branch asorbini/cft on top of 0.9.0.
asorbini Mar 19, 2022
d43b032
Move custom SQL filter to rmw_connextdds_common
asorbini Mar 19, 2022
60a347c
Try to resolve linking error on Windows.
asorbini Mar 19, 2022
d7d211c
Optionally disable writer-side CFT optimizations to support Windows.
asorbini Mar 20, 2022
ad985bc
No need to declare private CFT function on Windows.
asorbini Mar 20, 2022
f44dceb
Merge branch 'master' into asorbini/cft
fujitatomoya Mar 21, 2022
d30bb6e
remove stub implementation for ContentFilteredTopic.
fujitatomoya Mar 21, 2022
995e244
address cpplint error.
fujitatomoya Mar 21, 2022
b12c6b2
Avoid conversion warnings on Windows.
asorbini Mar 21, 2022
b1b5851
Use strtol instead of sscanf to avoid warnings on Windows.
asorbini Mar 21, 2022
3edb5ae
Avoid finalizing participants if factory is not available.
asorbini Mar 21, 2022
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
18 changes: 18 additions & 0 deletions rmw_connextdds/src/rmw_api_impl_ndds.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -647,6 +647,24 @@ rmw_subscription_get_actual_qos(
return rmw_api_connextdds_subscription_get_actual_qos(subscription, qos);
}

rmw_ret_t
rmw_subscription_set_content_filter(
rmw_subscription_t * subscription,
const rmw_subscription_content_filter_options_t * options)
{
return rmw_api_connextdds_subscription_set_content_filter(
subscription, options);
}

rmw_ret_t
rmw_subscription_get_content_filter(
const rmw_subscription_t * subscription,
rcutils_allocator_t * allocator,
rmw_subscription_content_filter_options_t * options)
{
return rmw_api_connextdds_subscription_get_content_filter(
subscription, allocator, options);
}

rmw_ret_t
rmw_destroy_subscription(
Expand Down
9 changes: 8 additions & 1 deletion rmw_connextdds_common/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -62,13 +62,19 @@ function(rtirmw_add_library)
ament_target_dependencies(${_rti_build_NAME}
${_rti_build_DEPS})

set(_extra_defines)
if("${CMAKE_BUILD_TYPE}" MATCHES "[dD]ebug")
list(APPEND _extra_defines "RMW_CONNEXT_DEBUG=1")
endif()

target_compile_definitions(${_rti_build_NAME}
PUBLIC
RMW_VERSION_MAJOR=${rmw_VERSION_MAJOR}
RMW_VERSION_MINOR=${rmw_VERSION_MINOR}
RMW_VERSION_PATCH=${rmw_VERSION_PATCH}
RMW_CONNEXT_DDS_API=RMW_CONNEXT_DDS_API_${_rti_build_API}
${_rti_build_DEFINES}
${_extra_defines}
)

set(private_defines)
Expand Down Expand Up @@ -112,7 +118,8 @@ set(RMW_CONNEXT_DEPS
rosidl_typesupport_fastrtps_cpp
rosidl_typesupport_introspection_c
rosidl_typesupport_introspection_cpp
rti_connext_dds_cmake_module)
rti_connext_dds_cmake_module
rti_connext_dds_custom_sql_filter)

foreach(pkg_dep ${RMW_CONNEXT_DEPS})
find_package(${pkg_dep} REQUIRED)
Expand Down
18 changes: 18 additions & 0 deletions rmw_connextdds_common/include/rmw_connextdds/dds_api.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -65,13 +65,19 @@ rmw_connextdds_initialize_participant_qos_impl(
rmw_context_impl_t * const ctx,
DDS_DomainParticipantQos * const dp_qos);

rmw_ret_t
rmw_connextdds_configure_participant(
rmw_context_impl_t * const ctx,
DDS_DomainParticipant * const participant);

rmw_ret_t
rmw_connextdds_create_contentfilteredtopic(
rmw_context_impl_t * const ctx,
DDS_DomainParticipant * const dp,
DDS_Topic * const base_topic,
const char * const cft_name,
const char * const cft_filter,
const rcutils_string_array_t * const cft_expression_parameters,
DDS_TopicDescription ** const cft_out);

rmw_ret_t
Expand Down Expand Up @@ -258,4 +264,16 @@ rmw_connextdds_enable_security(
DDS_SECURITY_PROPERTY_PREFIX ".logging.log_level"
#endif /* DDS_SECURITY_LOGGING_LEVEL_PROPERTY */

rmw_ret_t
rmw_connextdds_set_cft_filter_expression(
DDS_TopicDescription * const topic_desc,
const char * const cft_expression,
const rcutils_string_array_t * const cft_expression_parameters);

rmw_ret_t
rmw_connextdds_get_cft_filter_expression(
DDS_TopicDescription * const topic_desc,
rcutils_allocator_t * const allocator,
rmw_subscription_content_filter_options_t * const options);

#endif // RMW_CONNEXTDDS__DDS_API_HPP_
12 changes: 12 additions & 0 deletions rmw_connextdds_common/include/rmw_connextdds/rmw_api_impl.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -435,6 +435,18 @@ rmw_api_connextdds_subscription_get_actual_qos(
const rmw_subscription_t * subscription,
rmw_qos_profile_t * qos);

RMW_CONNEXTDDS_PUBLIC
rmw_ret_t
rmw_api_connextdds_subscription_set_content_filter(
rmw_subscription_t * subscription,
const rmw_subscription_content_filter_options_t * options);

RMW_CONNEXTDDS_PUBLIC
rmw_ret_t
rmw_api_connextdds_subscription_get_content_filter(
const rmw_subscription_t * subscription,
rcutils_allocator_t * const allocator,
rmw_subscription_content_filter_options_t * options);

RMW_CONNEXTDDS_PUBLIC
rmw_ret_t
Expand Down
22 changes: 22 additions & 0 deletions rmw_connextdds_common/include/rmw_connextdds/rmw_impl.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -438,6 +438,15 @@ class RMW_Connext_Subscriber
rmw_message_info_t * const message_info,
bool * const taken);

rmw_ret_t
set_content_filter(
const rmw_subscription_content_filter_options_t * const options);

rmw_ret_t
get_content_filter(
rcutils_allocator_t * allocator,
rmw_subscription_content_filter_options_t * const options);

bool
has_data()
{
Expand Down Expand Up @@ -470,6 +479,17 @@ class RMW_Connext_Subscriber
return this->dds_topic;
}

static std::string get_atomic_id()
{
static std::atomic_uint64_t id;
return std::to_string(id++);
}

bool is_cft_enabled()
{
return !this->cft_expression.empty();
}

const bool internal;
const bool ignore_local;

Expand All @@ -478,6 +498,7 @@ class RMW_Connext_Subscriber
DDS_DataReader * dds_reader;
DDS_Topic * dds_topic;
DDS_TopicDescription * dds_topic_cft;
std::string cft_expression;
RMW_Connext_MessageTypeSupport * type_support;
rmw_gid_t ros_gid;
const bool created_topic;
Expand All @@ -496,6 +517,7 @@ class RMW_Connext_Subscriber
const bool ignore_local,
const bool created_topic,
DDS_TopicDescription * const dds_topic_cft,
const char * const cft_expression,
const bool internal);

friend class RMW_Connext_SubscriberStatusCondition;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,13 @@
#ifndef RMW_CONNEXTDDS__STATIC_CONFIG_HPP_
#define RMW_CONNEXTDDS__STATIC_CONFIG_HPP_

/******************************************************************************
* Debug flags
******************************************************************************/
#ifndef RMW_CONNEXT_DEBUG
#define RMW_CONNEXT_DEBUG 0
#endif // RMW_CONNEXT_DEBUG

/******************************************************************************
* Default User Configuration
******************************************************************************/
Expand Down
1 change: 1 addition & 0 deletions rmw_connextdds_common/package.xml
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
<buildtool_export_depend>ament_cmake</buildtool_export_depend>

<depend>rti_connext_dds_cmake_module</depend>
<depend>rti_connext_dds_custom_sql_filter</depend>
<depend>fastcdr</depend>
<depend>rcutils</depend>
<depend>rcpputils</depend>
Expand Down
25 changes: 23 additions & 2 deletions rmw_connextdds_common/src/common/rmw_context.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -218,6 +218,12 @@ rmw_context_impl_t::initialize_participant(const bool localhost_only)
return RMW_RET_ERROR;
}

rmw_ret_t cfg_rc = rmw_connextdds_configure_participant(this, this->participant);
if (RMW_RET_OK != cfg_rc) {
RMW_CONNEXT_LOG_ERROR("failed to configure DDS participant")
return cfg_rc;
}

/* Create DDS publisher/subscriber objects that will be used for all DDS
writers/readers created to support RMW publishers/subscriptions. */

Expand Down Expand Up @@ -332,7 +338,21 @@ rmw_ret_t
rmw_context_impl_t::finalize_participant()
{
RMW_CONNEXT_LOG_DEBUG("finalizing DDS DomainParticipant")

#if RMW_CONNEXT_DEBUG && RMW_CONNEXT_DDS_API == RMW_CONNEXT_DDS_API_PRO
// If we are building in Debug mode, an issue in Connext may prevent the
// participant from being able to delete any content-filtered topic if
// the participant has not been enabled.
// For this reason, make sure to enable the participant before trying to
// finalize it.
// TODO(asorbini) reconsider the need for this code in Connext > 6.1.0
if (DDS_RETCODE_OK !=
DDS_Entity_enable(DDS_DomainParticipant_as_entity(participant)))
{
RMW_CONNEXT_LOG_ERROR_SET(
"failed to enable DomainParticipant before deletion")
return RMW_RET_ERROR;
}
#endif // RMW_CONNEXT_DEBUG && RMW_CONNEXT_DDS_API == RMW_CONNEXT_DDS_API_PRO
if (RMW_RET_OK != rmw_connextdds_graph_finalize(this)) {
RMW_CONNEXT_LOG_ERROR("failed to finalize graph cache")
return RMW_RET_ERROR;
Expand Down Expand Up @@ -386,7 +406,7 @@ rmw_context_impl_t::finalize_participant()
if (nullptr != this->participant) {
// If we are cleaning up after some RMW failure, it is possible for some
// DataWriter to not have been deleted.
// Call DDS_Publisher_delete_contained_entities() to make sure we can
// Call DDS_DomainParticipant_delete_contained_entities() to make sure we can
// dispose the publisher.
if (DDS_RETCODE_OK !=
DDS_DomainParticipant_delete_contained_entities(this->participant))
Expand All @@ -402,6 +422,7 @@ rmw_context_impl_t::finalize_participant()
RMW_CONNEXT_LOG_ERROR_SET("failed to delete DDS participant")
return RMW_RET_ERROR;
}

this->participant = nullptr;
}

Expand Down
Loading