Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Unique network flows #502

Merged
merged 15 commits into from
Apr 5, 2021
Merged
1 change: 1 addition & 0 deletions rmw_fastrtps_cpp/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,7 @@ add_library(rmw_fastrtps_cpp
src/serialization_format.cpp
src/subscription.cpp
src/type_support_common.cpp
src/rmw_get_endpoint_network_flow.cpp
)
target_link_libraries(rmw_fastrtps_cpp
fastcdr fastrtps)
Expand Down
7 changes: 7 additions & 0 deletions rmw_fastrtps_cpp/src/publisher.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,13 @@ rmw_fastrtps_cpp::create_publisher(
}
RMW_CHECK_ARGUMENT_FOR_NULL(publisher_options, nullptr);

if (RMW_UNIQUE_NETWORK_FLOW_ENDPOINTS_STRICTLY_REQUIRED ==
publisher_options->require_unique_network_flow_endpoints)
{
RMW_SET_ERROR_MSG("Unique network flow endpoints not supported on publishers");
return nullptr;
}

Participant * participant = participant_info->participant;
RMW_CHECK_ARGUMENT_FOR_NULL(participant, nullptr);

Expand Down
46 changes: 46 additions & 0 deletions rmw_fastrtps_cpp/src/rmw_get_endpoint_network_flow.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
// Copyright 2016-2018 Proyectos y Sistemas de Mantenimiento SL (eProsima).
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

#include "rmw/get_network_flow_endpoints.h"
#include "rmw/error_handling.h"
#include "rmw/rmw.h"
#include "rmw/types.h"
#include "rmw_fastrtps_shared_cpp/rmw_common.hpp"

extern "C"
{
rmw_ret_t
rmw_publisher_get_network_flow_endpoints(
const rmw_publisher_t * publisher,
rcutils_allocator_t * allocator,
rmw_network_flow_endpoint_array_t * network_flow_endpoint_array)
{
return rmw_fastrtps_shared_cpp::__rmw_publisher_get_network_flow_endpoints(
publisher,
allocator,
network_flow_endpoint_array);
}

rmw_ret_t
rmw_subscription_get_network_flow_endpoints(
const rmw_subscription_t * subscription,
rcutils_allocator_t * allocator,
rmw_network_flow_endpoint_array_t * network_flow_endpoint_array)
{
return rmw_fastrtps_shared_cpp::__rmw_subscription_get_network_flow_endpoints(
subscription,
allocator,
network_flow_endpoint_array);
}
} // extern "C"
36 changes: 35 additions & 1 deletion rmw_fastrtps_cpp/src/subscription.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@
using Domain = eprosima::fastrtps::Domain;
using Participant = eprosima::fastrtps::Participant;
using TopicDataType = eprosima::fastrtps::TopicDataType;
using PropertyPolicyHelper = eprosima::fastrtps::rtps::PropertyPolicyHelper;
using XMLProfileManager = eprosima::fastrtps::xmlparser::XMLProfileManager;

namespace rmw_fastrtps_cpp
Expand Down Expand Up @@ -169,7 +170,40 @@ create_subscription(
}
}

info->subscriber_ = Domain::createSubscriber(participant, subscriberParam, info->listener_);
eprosima::fastrtps::SubscriberAttributes originalParam = subscriberParam;
switch (subscription_options->require_unique_network_flow_endpoints) {
default:
case RMW_UNIQUE_NETWORK_FLOW_ENDPOINTS_SYSTEM_DEFAULT:
case RMW_UNIQUE_NETWORK_FLOW_ENDPOINTS_NOT_REQUIRED:
// Unique network flow endpoints not required. We leave the decission to the XML profile.
break;

case RMW_UNIQUE_NETWORK_FLOW_ENDPOINTS_OPTIONALLY_REQUIRED:
case RMW_UNIQUE_NETWORK_FLOW_ENDPOINTS_STRICTLY_REQUIRED:
// Ensure we request unique network flow endpoints
if (nullptr ==
PropertyPolicyHelper::find_property(
subscriberParam.properties,
"fastdds.unique_network_flows"))
{
subscriberParam.properties.properties().emplace_back("fastdds.unique_network_flows", "");
}
break;
}

info->subscriber_ = Domain::createSubscriber(
participant,
subscriberParam,
info->listener_);
if (!info->subscriber_ &&
(RMW_UNIQUE_NETWORK_FLOW_ENDPOINTS_OPTIONALLY_REQUIRED ==
subscription_options->require_unique_network_flow_endpoints))
{
info->subscriber_ = Domain::createSubscriber(
participant,
originalParam,
info->listener_);
}
if (!info->subscriber_) {
RMW_SET_ERROR_MSG("create_subscriber() could not create subscriber");
return nullptr;
Expand Down
1 change: 1 addition & 0 deletions rmw_fastrtps_dynamic_cpp/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -94,6 +94,7 @@ add_library(rmw_fastrtps_dynamic_cpp
src/type_support_common.cpp
src/type_support_proxy.cpp
src/type_support_registry.cpp
src/rmw_get_endpoint_network_flow.cpp
)
target_link_libraries(rmw_fastrtps_dynamic_cpp
fastcdr fastrtps)
Expand Down
12 changes: 11 additions & 1 deletion rmw_fastrtps_dynamic_cpp/src/publisher.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,13 @@ rmw_fastrtps_dynamic_cpp::create_publisher(
}
RMW_CHECK_ARGUMENT_FOR_NULL(publisher_options, nullptr);

if (RMW_UNIQUE_NETWORK_FLOW_ENDPOINTS_STRICTLY_REQUIRED ==
publisher_options->require_unique_network_flow_endpoints)
{
RMW_SET_ERROR_MSG("Unique network flow endpoints not supported on publishers");
return nullptr;
}

Participant * participant = participant_info->participant;
RMW_CHECK_ARGUMENT_FOR_NULL(participant, nullptr);

Expand Down Expand Up @@ -193,7 +200,10 @@ rmw_fastrtps_dynamic_cpp::create_publisher(
return nullptr;
}

info->publisher_ = Domain::createPublisher(participant, publisherParam, info->listener_);
info->publisher_ = Domain::createPublisher(
participant,
publisherParam,
info->listener_);
if (!info->publisher_) {
RMW_SET_ERROR_MSG("create_publisher() could not create publisher");
return nullptr;
Expand Down
46 changes: 46 additions & 0 deletions rmw_fastrtps_dynamic_cpp/src/rmw_get_endpoint_network_flow.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
// Copyright 2016-2018 Proyectos y Sistemas de Mantenimiento SL (eProsima).
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

#include "rmw/error_handling.h"
#include "rmw/rmw.h"
#include "rmw/types.h"
#include "rmw/get_network_flow_endpoints.h"
#include "rmw_fastrtps_shared_cpp/rmw_common.hpp"

extern "C"
{
rmw_ret_t
rmw_publisher_get_network_flow_endpoints(
const rmw_publisher_t * publisher,
rcutils_allocator_t * allocator,
rmw_network_flow_endpoint_array_t * network_flow_endpoint_array)
{
return rmw_fastrtps_shared_cpp::__rmw_publisher_get_network_flow_endpoints(
publisher,
allocator,
network_flow_endpoint_array);
}

rmw_ret_t
rmw_subscription_get_network_flow_endpoints(
const rmw_subscription_t * subscription,
rcutils_allocator_t * allocator,
rmw_network_flow_endpoint_array_t * network_flow_endpoint_array)
{
return rmw_fastrtps_shared_cpp::__rmw_subscription_get_network_flow_endpoints(
subscription,
allocator,
network_flow_endpoint_array);
}
} // extern "C"
36 changes: 35 additions & 1 deletion rmw_fastrtps_dynamic_cpp/src/subscription.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@
using BaseTypeSupport = rmw_fastrtps_dynamic_cpp::BaseTypeSupport;
using Domain = eprosima::fastrtps::Domain;
using Participant = eprosima::fastrtps::Participant;
using PropertyPolicyHelper = eprosima::fastrtps::rtps::PropertyPolicyHelper;
using TopicDataType = eprosima::fastrtps::TopicDataType;
using TypeSupportProxy = rmw_fastrtps_dynamic_cpp::TypeSupportProxy;
using XMLProfileManager = eprosima::fastrtps::xmlparser::XMLProfileManager;
Expand Down Expand Up @@ -180,7 +181,40 @@ create_subscription(
return nullptr;
}

info->subscriber_ = Domain::createSubscriber(participant, subscriberParam, info->listener_);
eprosima::fastrtps::SubscriberAttributes originalParam = subscriberParam;
switch (subscription_options->require_unique_network_flow_endpoints) {
default:
case RMW_UNIQUE_NETWORK_FLOW_ENDPOINTS_SYSTEM_DEFAULT:
case RMW_UNIQUE_NETWORK_FLOW_ENDPOINTS_NOT_REQUIRED:
// Unique network flow endpoints not required. We leave the decission to the XML profile.
break;

case RMW_UNIQUE_NETWORK_FLOW_ENDPOINTS_OPTIONALLY_REQUIRED:
case RMW_UNIQUE_NETWORK_FLOW_ENDPOINTS_STRICTLY_REQUIRED:
// Ensure we request unique network flow endpoints
if (nullptr ==
PropertyPolicyHelper::find_property(
subscriberParam.properties,
"fastdds.unique_network_flows"))
{
subscriberParam.properties.properties().emplace_back("fastdds.unique_network_flows", "");
}
break;
}

info->subscriber_ = Domain::createSubscriber(
participant,
subscriberParam,
info->listener_);
if (!info->subscriber_ &&
(RMW_UNIQUE_NETWORK_FLOW_ENDPOINTS_OPTIONALLY_REQUIRED ==
subscription_options->require_unique_network_flow_endpoints))
{
info->subscriber_ = Domain::createSubscriber(
participant,
originalParam,
info->listener_);
}
if (!info->subscriber_) {
RMW_SET_ERROR_MSG("create_subscriber() could not create subscriber");
return nullptr;
Expand Down
1 change: 1 addition & 0 deletions rmw_fastrtps_shared_cpp/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,7 @@ add_library(rmw_fastrtps_shared_cpp
src/rmw_wait_set.cpp
src/subscription.cpp
src/TypeSupport_impl.cpp
src/rmw_get_endpoint_network_flow.cpp
)
target_include_directories(rmw_fastrtps_shared_cpp
PUBLIC
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
#include "rmw/topic_endpoint_info_array.h"
#include "rmw/types.h"
#include "rmw/names_and_types.h"
#include "rmw/network_flow_endpoint_array.h"

namespace rmw_fastrtps_shared_cpp
{
Expand Down Expand Up @@ -401,6 +402,20 @@ __rmw_qos_profile_check_compatible(
char * reason,
size_t reason_size);

RMW_FASTRTPS_SHARED_CPP_PUBLIC
rmw_ret_t
__rmw_publisher_get_network_flow_endpoints(
const rmw_publisher_t * publisher,
rcutils_allocator_t * allocator,
rmw_network_flow_endpoint_array_t * network_flow_endpoint_array);

RMW_FASTRTPS_SHARED_CPP_PUBLIC
rmw_ret_t
__rmw_subscription_get_network_flow_endpoints(
const rmw_subscription_t * subscription,
rcutils_allocator_t * allocator,
rmw_network_flow_endpoint_array_t * network_flow_endpoint_array);

} // namespace rmw_fastrtps_shared_cpp

#endif // RMW_FASTRTPS_SHARED_CPP__RMW_COMMON_HPP_
Loading