Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add support for content filtered topics #302

Merged
merged 14 commits into from
Mar 21, 2022
1 change: 1 addition & 0 deletions rmw/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ set(rmw_sources
"src/qos_string_conversions.c"
"src/sanity_checks.c"
"src/security_options.c"
"src/subscription_content_filter_options.c"
"src/subscription_options.c"
"src/time.c"
"src/topic_endpoint_info_array.c"
Expand Down
65 changes: 65 additions & 0 deletions rmw/include/rmw/rmw.h
Original file line number Diff line number Diff line change
Expand Up @@ -91,6 +91,7 @@ extern "C"
#include <stddef.h>
#include <stdint.h>

#include "rcutils/allocator.h"
#include "rcutils/macros.h"
#include "rcutils/types.h"

Expand Down Expand Up @@ -1117,6 +1118,70 @@ rmw_subscription_get_actual_qos(
const rmw_subscription_t * subscription,
rmw_qos_profile_t * qos);

/// Set the content filter options for the subscription.
/**
* This function will set a filter expression and an array of expression parameters
* for the given subscription.
*
* <hr>
* Attribute | Adherence
* ------------------ | -------------
* Allocates Memory | No
* Thread-Safe | No
* Uses Atomics | Maybe [1]
* Lock-Free | Maybe [1]
* <i>[1] implementation defined, check the implementation documentation</i>
*
* \param[in] subscription The subscription to set content filter options.
* \param[in] options The content filter options.
* Use `options.filter_expression` with an empty("") string to
* reset/clean content filtered topic for the subscription.
* \return `RMW_RET_OK` if successful, or
* \return `RMW_RET_INVALID_ARGUMENT` if an argument is null, or
* \return `RMW_RET_INCORRECT_RMW_IMPLEMENTATION` if the `subscription` implementation
* identifier does not match this implementation, or
* \return `RMW_RET_UNSUPPORTED` if the implementation does not support content filtered topic, or
* \return `RMW_RET_ERROR` if an unspecified error occurs.
*/
RMW_PUBLIC
RMW_WARN_UNUSED
rmw_ret_t
rmw_subscription_set_content_filter(
rmw_subscription_t * subscription,
fujitatomoya marked this conversation as resolved.
Show resolved Hide resolved
ivanpauno marked this conversation as resolved.
Show resolved Hide resolved
const rmw_subscription_content_filter_options_t * options);

/// Retrieve the content filter options of the subscription.
/**
* This function will return a content filter options by the given subscription.
*
* <hr>
* Attribute | Adherence
* ------------------ | -------------
* Allocates Memory | Yes
* Thread-Safe | No
* Uses Atomics | Maybe [1]
* Lock-Free | Maybe [1]
* <i>[1] implementation defined, check the implementation documentation</i>
*
* \param[in] subscription The subscription object to inspect.
* \param[in] allocator Allocator to be used when populating the content filter options.
* \param[out] options The content filter options.
* \return `RMW_RET_OK` if successful, or
* \return `RMW_RET_INVALID_ARGUMENT` if an argument is null, or
* \return `RMW_RET_INCORRECT_RMW_IMPLEMENTATION` if the `subscription` implementation
* identifier does not match this implementation, or
* \return `RMW_RET_BAD_ALLOC` if memory allocation fails, or
* \return `RMW_RET_UNSUPPORTED` if the implementation does not support content filtered topic, or
* \return `RMW_RET_ERROR` if an unspecified error occurs.
*/
RMW_PUBLIC
RMW_WARN_UNUSED
rmw_ret_t
rmw_subscription_get_content_filter(
const rmw_subscription_t * subscription,
rcutils_allocator_t * allocator,
rmw_subscription_content_filter_options_t * options);

/// Take an incoming ROS message.
/**
* Take a ROS message already received by the given subscription, removing it from internal queues.
Expand Down
130 changes: 130 additions & 0 deletions rmw/include/rmw/subscription_content_filter_options.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,130 @@
// Copyright 2021 Open Source Robotics Foundation, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

#ifndef RMW__SUBSCRIPTION_CONTENT_FILTER_OPTIONS_H_
#define RMW__SUBSCRIPTION_CONTENT_FILTER_OPTIONS_H_

#ifdef __cplusplus
extern "C"
{
#endif

#include "rcutils/allocator.h"
#include "rcutils/types.h"

#include "rmw/macros.h"
#include "rmw/ret_types.h"
#include "rmw/visibility_control.h"

typedef struct RMW_PUBLIC_TYPE rmw_subscription_content_filter_options_s
{
/**
* Specify the criteria to select the data samples of interest.
*
* It is similar to the WHERE part of an SQL clause.
*/
char * filter_expression;
/**
* Give values to the tokens placeholder ‘parameters’ (i.e., "%n" tokens begin from 0) in the
* filter_expression. The number of supplied parameters must fit with the requested values.
*
* It can be NULL if there is no "%n" tokens placeholder in filter_expression.
* The maximum index number must be smaller than 100.
*/
rcutils_string_array_t expression_parameters;
} rmw_subscription_content_filter_options_t;


/// Get zero initialized content filter options.
RMW_PUBLIC
rmw_subscription_content_filter_options_t
rmw_get_zero_initialized_content_filter_options();


/// Initialize the given content filter options.
/**
* \param[in] filter_expression The filter expression.
* \param[in] expression_parameters_argc The expression parameters argc.
* \param[in] expression_parameter_argv The expression parameters argv.
* \param[in] allocator The allocator used when copying data to the content filter options.
* \param[out] options The content filter options to be set.
* \returns RMW_RET_INVALID_ARGUMENT, or
* \returns RMW_RET_BAD_ALLOC, or
* \returns RMW_RET_OK
*/
RMW_PUBLIC
rmw_ret_t
rmw_subscription_content_filter_options_init(
const char * filter_expression,
size_t expression_parameters_argc,
const char * expression_parameter_argv[],
const rcutils_allocator_t * allocator,
rmw_subscription_content_filter_options_t * options);

/// Set the given content filter options.
/**
* \param[in] filter_expression The filter expression.
* \param[in] expression_parameters_argc The expression parameters argc.
* \param[in] expression_parameter_argv The expression parameters argv.
* \param[in] allocator The allocator used when copying data to the content filter options.
* \param[out] options The content filter options to be set.
* \returns RMW_RET_INVALID_ARGUMENT, or
* \returns RMW_RET_BAD_ALLOC, or
* \returns RMW_RET_OK
*/
RMW_PUBLIC
rmw_ret_t
rmw_subscription_content_filter_options_set(
ivanpauno marked this conversation as resolved.
Show resolved Hide resolved
const char * filter_expression,
size_t expression_parameters_argc,
const char * expression_parameter_argv[],
const rcutils_allocator_t * allocator,
rmw_subscription_content_filter_options_t * options);

/// Copy the given content filter options.
/**
* \param[in] src content filter options to be copied.
* \param[in] allocator allocator used when copying data to the new content filter options.
* \param[out] dst content filter options to be set.
* \returns RMW_RET_INVALID_ARGUMENT, or
* \returns RMW_RET_BAD_ALLOC, or
* \returns RMW_RET_OK
*/
RMW_PUBLIC
rmw_ret_t
rmw_subscription_content_filter_options_copy(
const rmw_subscription_content_filter_options_t * src,
const rcutils_allocator_t * allocator,
rmw_subscription_content_filter_options_t * dst);


/// Finalize the content filter options.
/**
* \param[in] options content filter options to be finalized.
* \param[in] allocator allocator used to deallocate the content filter options.
* \returns RMW_RET_INVALID_ARGUMENT, or
* \returns RMW_RET_ERROR, or
* \returns RMW_RET_OK
*/
RMW_PUBLIC
rmw_ret_t
rmw_subscription_content_filter_options_fini(
rmw_subscription_content_filter_options_t * options,
const rcutils_allocator_t * allocator);

#ifdef __cplusplus
}
#endif

#endif // RMW__SUBSCRIPTION_CONTENT_FILTER_OPTIONS_H_
7 changes: 7 additions & 0 deletions rmw/include/rmw/types.h
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ extern "C"
#include "rmw/ret_types.h"
#include "rmw/security_options.h"
#include "rmw/serialized_message.h"
#include "rmw/subscription_content_filter_options.h"
#include "rmw/time.h"
#include "rmw/visibility_control.h"

Expand Down Expand Up @@ -177,6 +178,9 @@ typedef struct RMW_PUBLIC_TYPE rmw_subscription_options_s
* Default value is RMW_UNIQUE_NETWORK_FLOW_ENDPOINTS_NOT_REQUIRED.
*/
rmw_unique_network_flow_endpoints_requirement_t require_unique_network_flow_endpoints;

/// Used to create a content filter options during subscription creation.
rmw_subscription_content_filter_options_t * content_filter_options;
} rmw_subscription_options_t;

typedef struct RMW_PUBLIC_TYPE rmw_subscription_s
Expand All @@ -203,6 +207,9 @@ typedef struct RMW_PUBLIC_TYPE rmw_subscription_s

/// Indicates whether this subscription can loan messages
bool can_loan_messages;

/// Indicates whether content filtered topic of this subscription is enabled
bool is_cft_enabled;
} rmw_subscription_t;

/// A handle to an rmw service
Expand Down
Loading