Skip to content

Commit

Permalink
Add support for user-specified content filters (#68)
Browse files Browse the repository at this point in the history
* Add support for user-specified content filters.

Signed-off-by: Andrea Sorbini <asorbini@rti.com>

* - Resolve memory leak of custom content-filter resources
- Add missing package dependencies for rti_connext_dds_custom_sql_filter
- Clean up all participants upon factory finalization
- Reset context state upon finalization (rmw_connextddsmicro)
Signed-off-by: Andrea Sorbini <asorbini@rti.com>

* Assume non-null options argument
Signed-off-by: Andrea Sorbini <asorbini@rti.com>

* - Return error when retrieving content-filter from a subscription that doesn't have one.
- Rename internal functions related to content-filters
Signed-off-by: Andrea Sorbini <asorbini@rti.com>

* Fix compilation error, oops.
Signed-off-by: Andrea Sorbini <asorbini@rti.com>

* - Define RMW_CONNEXT_DEBUG when building Debug libraries.
- Make sure participant is enabled before deleting contained entities when using Connext debug libraries.
Signed-off-by: Andrea Sorbini <asorbini@rti.com>

* Resolve memory leak for finalization on error.
Signed-off-by: Andrea Sorbini <asorbini@rti.com>

* Rename content filter public API.
Signed-off-by: Andrea Sorbini <asorbini@rti.com>

* Add client/service QoS getters (#67)

Signed-off-by: Mauro Passerino <mpasserino@irobot.com>

* Changelogs

Signed-off-by: Ivan Santiago Paunovic <ivanpauno@ekumenlabs.com>

* 0.8.1

* Fix cpplint errors (#69)

* Use static_cast instead of C-style cast

Fixes cpplint error.

Signed-off-by: Jacob Perron <jacob@openrobotics.org>

* Update NOLINT category

Relates to ament/ament_lint#324

Signed-off-by: Jacob Perron <jacob@openrobotics.org>

* 0.8.2

Signed-off-by: Audrow Nash <audrow@hey.com>

* Update rti-connext-dds dependency to 6.0.1. (#71)

Now that this package is available in the ROS bootstrap repository for Ubuntu Focal and Jammy we can bump the expected dependency version.

* 0.8.3

* Add rmw listener apis (#44)

* Add stubs for setting listener callbacks

Signed-off-by: Mauro Passerino <mpasserino@irobot.com>

* Address PR suggestions

Signed-off-by: Mauro Passerino <mpasserino@irobot.com>

* Fix linter issues

Signed-off-by: Mauro Passerino <mpasserino@irobot.com>

Co-authored-by: Mauro Passerino <mpasserino@irobot.com>
Co-authored-by: Alberto Soragna <alberto.soragna@gmail.com>

* Changelog. (#73)

Signed-off-by: Chris Lalancette <clalancette@openrobotics.org>

* 0.9.0

* add stub for content filtered topic

Signed-off-by: Chen Lihui <lihui.chen@sony.com>

* * Rebased branch asorbini/cft on top of 0.9.0.
* Resolved CFT finalization issues on error.
* Verified and cleaned up build for rmw_connextddsmicro.
Signed-off-by: Andrea Sorbini <asorbini@rti.com>

* Move custom SQL filter to rmw_connextdds_common
Signed-off-by: Andrea Sorbini <asorbini@rti.com>

* Try to resolve linking error on Windows.
Signed-off-by: Andrea Sorbini <asorbini@rti.com>

* Optionally disable writer-side CFT optimizations to support Windows.
Signed-off-by: Andrea Sorbini <asorbini@rti.com>

* No need to declare private CFT function on Windows.
Signed-off-by: Andrea Sorbini <asorbini@rti.com>

* remove stub implementation for ContentFilteredTopic.

Signed-off-by: Tomoya Fujita <Tomoya.Fujita@sony.com>

* address cpplint error.

Signed-off-by: Tomoya Fujita <Tomoya.Fujita@sony.com>

* Avoid conversion warnings on Windows.
Signed-off-by: Andrea Sorbini <asorbini@rti.com>

* Use strtol instead of sscanf to avoid warnings on Windows.
Signed-off-by: Andrea Sorbini <asorbini@rti.com>

* Avoid finalizing participants if factory is not available.
Signed-off-by: Andrea Sorbini <asorbini@rti.com>

Co-authored-by: mauropasse <mauropasse@hotmail.com>
Co-authored-by: Ivan Santiago Paunovic <ivanpauno@ekumenlabs.com>
Co-authored-by: Jacob Perron <jacob@openrobotics.org>
Co-authored-by: Audrow Nash <audrow@hey.com>
Co-authored-by: Steven! Ragnarök <nuclearsandwich@users.noreply.github.com>
Co-authored-by: Steven! Ragnarök <steven@nuclearsandwich.com>
Co-authored-by: iRobot ROS <49500531+irobot-ros@users.noreply.github.com>
Co-authored-by: Mauro Passerino <mpasserino@irobot.com>
Co-authored-by: Alberto Soragna <alberto.soragna@gmail.com>
Co-authored-by: Chris Lalancette <clalancette@openrobotics.org>
Co-authored-by: Chen Lihui <lihui.chen@sony.com>
Co-authored-by: Tomoya Fujita <Tomoya.Fujita@sony.com>
  • Loading branch information
13 people authored Mar 22, 2022
1 parent 898e570 commit 2c97c79
Show file tree
Hide file tree
Showing 19 changed files with 1,566 additions and 154 deletions.
16 changes: 4 additions & 12 deletions rmw_connextdds/src/rmw_api_impl_ndds.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -716,33 +716,25 @@ rmw_subscription_get_actual_qos(
return rmw_api_connextdds_subscription_get_actual_qos(subscription, qos);
}


rmw_ret_t
rmw_subscription_set_content_filter(
rmw_subscription_t * subscription,
const rmw_subscription_content_filter_options_t * options)
{
UNUSED_ARG(subscription);
UNUSED_ARG(options);
RMW_CONNEXT_LOG_NOT_IMPLEMENTED
return RMW_RET_UNSUPPORTED;
return rmw_api_connextdds_subscription_set_content_filter(
subscription, options);
}


rmw_ret_t
rmw_subscription_get_content_filter(
const rmw_subscription_t * subscription,
rcutils_allocator_t * allocator,
rmw_subscription_content_filter_options_t * options)
{
UNUSED_ARG(subscription);
UNUSED_ARG(allocator);
UNUSED_ARG(options);
RMW_CONNEXT_LOG_NOT_IMPLEMENTED
return RMW_RET_UNSUPPORTED;
return rmw_api_connextdds_subscription_get_content_filter(
subscription, allocator, options);
}


rmw_ret_t
rmw_destroy_subscription(
rmw_node_t * node,
Expand Down
11 changes: 11 additions & 0 deletions rmw_connextdds_common/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -62,13 +62,19 @@ function(rtirmw_add_library)
ament_target_dependencies(${_rti_build_NAME}
${_rti_build_DEPS})

set(_extra_defines)
if("${CMAKE_BUILD_TYPE}" MATCHES "[dD]ebug")
list(APPEND _extra_defines "RMW_CONNEXT_DEBUG=1")
endif()

target_compile_definitions(${_rti_build_NAME}
PUBLIC
RMW_VERSION_MAJOR=${rmw_VERSION_MAJOR}
RMW_VERSION_MINOR=${rmw_VERSION_MINOR}
RMW_VERSION_PATCH=${rmw_VERSION_PATCH}
RMW_CONNEXT_DDS_API=RMW_CONNEXT_DDS_API_${_rti_build_API}
${_rti_build_DEFINES}
${_extra_defines}
)

set(private_defines)
Expand Down Expand Up @@ -189,6 +195,9 @@ else()
if("${CONNEXTDDS_VERSION}" VERSION_LESS "6.0.0")
list(APPEND extra_defines "RMW_CONNEXT_DDS_API_PRO_LEGACY=1")
endif()
if(CONNEXTDDS_ARCH MATCHES ".*Win.*")
list(APPEND extra_defines "RMW_CONNEXT_BUILTIN_CFT_COMPATIBILITY_MODE=1")
endif()
rtirmw_add_library(
NAME ${PROJECT_NAME}_pro
API PRO
Expand All @@ -197,8 +206,10 @@ else()
src/ndds/rmw_type_support_ndds.cpp
src/ndds/rmw_typecode.cpp
src/ndds/dds_api_ndds.cpp
src/ndds/custom_sql_filter.cpp
include/rmw_connextdds/typecode.hpp
include/rmw_connextdds/dds_api_ndds.hpp
include/rmw_connextdds/custom_sql_filter.hpp
DEPS ${RMW_CONNEXT_DEPS}
LIBRARIES RTIConnextDDS::c_api
DEFINES ${extra_defines})
Expand Down
66 changes: 66 additions & 0 deletions rmw_connextdds_common/include/rmw_connextdds/custom_sql_filter.hpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,66 @@
// Copyright 2021 Real-Time Innovations, Inc. (RTI)
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
#ifndef RMW_CONNEXTDDS__CUSTOM_SQL_FILTER_HPP_
#define RMW_CONNEXTDDS__CUSTOM_SQL_FILTER_HPP_

#include "rmw_connextdds/dds_api.hpp"

#if RMW_CONNEXT_DDS_API == RMW_CONNEXT_DDS_API_PRO

namespace rti_connext_dds_custom_sql_filter
{

struct CustomSqlFilterData
{
DDS_SqlFilterGeneratorQos base;

CustomSqlFilterData();

DDS_ReturnCode_t
set_memory_management_property(
const DDS_DomainParticipantQos & dp_qos);
};

RMW_CONNEXTDDS_PUBLIC
DDS_ReturnCode_t
register_content_filter(
DDS_DomainParticipant * const participant,
CustomSqlFilterData * const filter_data);

RMW_CONNEXTDDS_PUBLIC
extern const char * const PLUGIN_NAME;

} // namespace rti_connext_dds_custom_sql_filter

#if !RMW_CONNEXT_BUILTIN_CFT_COMPATIBILITY_MODE
extern "C" {
// This is an internal function from RTI Connext DDS which allows a filter to
// be registered as "built-in". We need this because we want this custom filter
// to be a replacement for the built-in SQL-like filter.
RMW_CONNEXTDDS_PUBLIC
DDS_ReturnCode_t
DDS_ContentFilter_register_filter(
DDS_DomainParticipant * participant,
const char * name,
const struct DDS_ContentFilter * filter,
const DDS_ContentFilterEvaluateFunction evaluateOnSerialized,
const DDS_ContentFilterWriterEvaluateFunction writerEvaluateOnSerialized,
const DDS_ContentFilterQueryFunction query,
DDS_Boolean isBuiltin);
}
#endif // RMW_CONNEXT_BUILTIN_CFT_COMPATIBILITY_MODE

#endif // RMW_CONNEXT_DDS_API == RMW_CONNEXT_DDS_API_PRO

#endif // RMW_CONNEXTDDS__CUSTOM_SQL_FILTER_HPP_
18 changes: 18 additions & 0 deletions rmw_connextdds_common/include/rmw_connextdds/dds_api.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -65,13 +65,19 @@ rmw_connextdds_initialize_participant_qos_impl(
rmw_context_impl_t * const ctx,
DDS_DomainParticipantQos * const dp_qos);

rmw_ret_t
rmw_connextdds_configure_participant(
rmw_context_impl_t * const ctx,
DDS_DomainParticipant * const participant);

rmw_ret_t
rmw_connextdds_create_contentfilteredtopic(
rmw_context_impl_t * const ctx,
DDS_DomainParticipant * const dp,
DDS_Topic * const base_topic,
const char * const cft_name,
const char * const cft_filter,
const rcutils_string_array_t * const cft_expression_parameters,
DDS_TopicDescription ** const cft_out);

rmw_ret_t
Expand Down Expand Up @@ -258,4 +264,16 @@ rmw_connextdds_enable_security(
DDS_SECURITY_PROPERTY_PREFIX ".logging.log_level"
#endif /* DDS_SECURITY_LOGGING_LEVEL_PROPERTY */

rmw_ret_t
rmw_connextdds_set_cft_filter_expression(
DDS_TopicDescription * const topic_desc,
const char * const cft_expression,
const rcutils_string_array_t * const cft_expression_parameters);

rmw_ret_t
rmw_connextdds_get_cft_filter_expression(
DDS_TopicDescription * const topic_desc,
rcutils_allocator_t * const allocator,
rmw_subscription_content_filter_options_t * const options);

#endif // RMW_CONNEXTDDS__DDS_API_HPP_
12 changes: 0 additions & 12 deletions rmw_connextdds_common/include/rmw_connextdds/dds_api_ndds.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -19,18 +19,6 @@

#include "rcutils/types.h"

DDS_SEQUENCE(RMW_Connext_Uint8ArrayPtrSeq, rcutils_uint8_array_t *);

typedef RMW_Connext_Uint8ArrayPtrSeq RMW_Connext_UntypedSampleSeq;

#define RMW_Connext_UntypedSampleSeq_INITIALIZER DDS_SEQUENCE_INITIALIZER

#define DDS_UntypedSampleSeq_get_reference(seq_, i_) \
*RMW_Connext_Uint8ArrayPtrSeq_get_reference(seq_, i_)

#define DDS_UntypedSampleSeq_get_length(seq_) \
RMW_Connext_Uint8ArrayPtrSeq_get_length(seq_)

#if RMW_CONNEXT_DDS_API_PRO_LEGACY
#ifndef RTIXCdrLong_MAX
#define RTIXCdrLong_MAX 2147483647
Expand Down
12 changes: 12 additions & 0 deletions rmw_connextdds_common/include/rmw_connextdds/rmw_api_impl.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -483,6 +483,18 @@ rmw_api_connextdds_subscription_get_actual_qos(
const rmw_subscription_t * subscription,
rmw_qos_profile_t * qos);

RMW_CONNEXTDDS_PUBLIC
rmw_ret_t
rmw_api_connextdds_subscription_set_content_filter(
rmw_subscription_t * subscription,
const rmw_subscription_content_filter_options_t * options);

RMW_CONNEXTDDS_PUBLIC
rmw_ret_t
rmw_api_connextdds_subscription_get_content_filter(
const rmw_subscription_t * subscription,
rcutils_allocator_t * const allocator,
rmw_subscription_content_filter_options_t * options);

RMW_CONNEXTDDS_PUBLIC
rmw_ret_t
Expand Down
22 changes: 22 additions & 0 deletions rmw_connextdds_common/include/rmw_connextdds/rmw_impl.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -438,6 +438,15 @@ class RMW_Connext_Subscriber
rmw_message_info_t * const message_info,
bool * const taken);

rmw_ret_t
set_content_filter(
const rmw_subscription_content_filter_options_t * const options);

rmw_ret_t
get_content_filter(
rcutils_allocator_t * allocator,
rmw_subscription_content_filter_options_t * const options);

bool
has_data()
{
Expand Down Expand Up @@ -470,6 +479,17 @@ class RMW_Connext_Subscriber
return this->dds_topic;
}

static std::string get_atomic_id()
{
static std::atomic_uint64_t id;
return std::to_string(id++);
}

bool is_cft_enabled()
{
return !this->cft_expression.empty();
}

const bool internal;
const bool ignore_local;

Expand All @@ -478,6 +498,7 @@ class RMW_Connext_Subscriber
DDS_DataReader * dds_reader;
DDS_Topic * dds_topic;
DDS_TopicDescription * dds_topic_cft;
std::string cft_expression;
RMW_Connext_MessageTypeSupport * type_support;
rmw_gid_t ros_gid;
const bool created_topic;
Expand All @@ -496,6 +517,7 @@ class RMW_Connext_Subscriber
const bool ignore_local,
const bool created_topic,
DDS_TopicDescription * const dds_topic_cft,
const char * const cft_expression,
const bool internal);

friend class RMW_Connext_SubscriberStatusCondition;
Expand Down
15 changes: 15 additions & 0 deletions rmw_connextdds_common/include/rmw_connextdds/static_config.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,13 @@
#ifndef RMW_CONNEXTDDS__STATIC_CONFIG_HPP_
#define RMW_CONNEXTDDS__STATIC_CONFIG_HPP_

/******************************************************************************
* Debug flags
******************************************************************************/
#ifndef RMW_CONNEXT_DEBUG
#define RMW_CONNEXT_DEBUG 0
#endif // RMW_CONNEXT_DEBUG

/******************************************************************************
* Default User Configuration
******************************************************************************/
Expand Down Expand Up @@ -311,6 +318,14 @@
#define RMW_CONNEXT_LEGACY_RMW_COMPATIBILITY_MODE 1
#endif /* RMW_CONNEXT_LEGACY_RMW_COMPATIBILITY_MODE */

/******************************************************************************
* On windows, the custom SQL filter cannot be registered as "built-in", so we
* must enable some additional code to register it as a user plugin.
******************************************************************************/
#ifndef RMW_CONNEXT_BUILTIN_CFT_COMPATIBILITY_MODE
#define RMW_CONNEXT_BUILTIN_CFT_COMPATIBILITY_MODE 0
#endif /* RMW_CONNEXT_LEGACY_RMW_COMPATIBILITY_MODE */

#include "resource_limits.hpp"

#endif // RMW_CONNEXTDDS__STATIC_CONFIG_HPP_
30 changes: 27 additions & 3 deletions rmw_connextdds_common/include/rmw_connextdds/type_support.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -167,11 +167,21 @@ class RMW_Connext_MessageTypeSupport

struct RMW_Connext_Message
{
const void * user_data;
bool serialized;
RMW_Connext_MessageTypeSupport * type_support;
const void * user_data{nullptr};
bool serialized{false};
RMW_Connext_MessageTypeSupport * type_support{nullptr};
rcutils_uint8_array_t data_buffer;
};

rmw_ret_t
RMW_Connext_Message_initialize(
RMW_Connext_Message * const self,
RMW_Connext_MessageTypeSupport * const type_support,
const size_t data_buffer_size);

void
RMW_Connext_Message_finalize(RMW_Connext_Message * const self);

class RMW_Connext_ServiceTypeSupportWrapper
{
public:
Expand Down Expand Up @@ -276,5 +286,19 @@ class RMW_Connext_ServiceTypeSupportWrapper
const rosidl_service_type_support_t * const type_supports);
};

#if RMW_CONNEXT_DDS_API == RMW_CONNEXT_DDS_API_PRO
DDS_SEQUENCE(RMW_Connext_MessagePtrSeq, RMW_Connext_Message *);

typedef RMW_Connext_MessagePtrSeq RMW_Connext_UntypedSampleSeq;

#define RMW_Connext_UntypedSampleSeq_INITIALIZER DDS_SEQUENCE_INITIALIZER

#define DDS_UntypedSampleSeq_get_reference(seq_, i_) \
*RMW_Connext_MessagePtrSeq_get_reference(seq_, i_)

#define DDS_UntypedSampleSeq_get_length(seq_) \
RMW_Connext_MessagePtrSeq_get_length(seq_)

#endif // RMW_CONNEXT_DDS_API == RMW_CONNEXT_DDS_API_PRO

#endif // RMW_CONNEXTDDS__TYPE_SUPPORT_HPP_
25 changes: 23 additions & 2 deletions rmw_connextdds_common/src/common/rmw_context.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -218,6 +218,12 @@ rmw_context_impl_t::initialize_participant(const bool localhost_only)
return RMW_RET_ERROR;
}

rmw_ret_t cfg_rc = rmw_connextdds_configure_participant(this, this->participant);
if (RMW_RET_OK != cfg_rc) {
RMW_CONNEXT_LOG_ERROR("failed to configure DDS participant")
return cfg_rc;
}

/* Create DDS publisher/subscriber objects that will be used for all DDS
writers/readers created to support RMW publishers/subscriptions. */

Expand Down Expand Up @@ -332,7 +338,21 @@ rmw_ret_t
rmw_context_impl_t::finalize_participant()
{
RMW_CONNEXT_LOG_DEBUG("finalizing DDS DomainParticipant")

#if RMW_CONNEXT_DEBUG && RMW_CONNEXT_DDS_API == RMW_CONNEXT_DDS_API_PRO
// If we are building in Debug mode, an issue in Connext may prevent the
// participant from being able to delete any content-filtered topic if
// the participant has not been enabled.
// For this reason, make sure to enable the participant before trying to
// finalize it.
// TODO(asorbini) reconsider the need for this code in Connext > 6.1.0
if (DDS_RETCODE_OK !=
DDS_Entity_enable(DDS_DomainParticipant_as_entity(participant)))
{
RMW_CONNEXT_LOG_ERROR_SET(
"failed to enable DomainParticipant before deletion")
return RMW_RET_ERROR;
}
#endif // RMW_CONNEXT_DEBUG && RMW_CONNEXT_DDS_API == RMW_CONNEXT_DDS_API_PRO
if (RMW_RET_OK != rmw_connextdds_graph_finalize(this)) {
RMW_CONNEXT_LOG_ERROR("failed to finalize graph cache")
return RMW_RET_ERROR;
Expand Down Expand Up @@ -386,7 +406,7 @@ rmw_context_impl_t::finalize_participant()
if (nullptr != this->participant) {
// If we are cleaning up after some RMW failure, it is possible for some
// DataWriter to not have been deleted.
// Call DDS_Publisher_delete_contained_entities() to make sure we can
// Call DDS_DomainParticipant_delete_contained_entities() to make sure we can
// dispose the publisher.
if (DDS_RETCODE_OK !=
DDS_DomainParticipant_delete_contained_entities(this->participant))
Expand All @@ -402,6 +422,7 @@ rmw_context_impl_t::finalize_participant()
RMW_CONNEXT_LOG_ERROR_SET("failed to delete DDS participant")
return RMW_RET_ERROR;
}

this->participant = nullptr;
}

Expand Down
Loading

0 comments on commit 2c97c79

Please sign in to comment.