Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Expose allocator interface to publishers and subscriptions #117

Closed
wants to merge 18 commits into from
Closed
56 changes: 56 additions & 0 deletions rclcpp/include/rclcpp/allocator_deleter.hpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,56 @@
// Copyright 2015 Open Source Robotics Foundation, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

#ifndef RCLCPP_RCLCPP_ALLOCATOR_DELETER_HPP_
#define RCLCPP_RCLCPP_ALLOCATOR_DELETER_HPP_

#include <memory>

template<typename T, typename Allocator>
class AllocatorDeleter
{
public:
AllocatorDeleter()
: allocator_(new Allocator)
{
}

AllocatorDeleter(Allocator * a)
: allocator_(a)
{
}

template<typename U, typename B>
AllocatorDeleter(const AllocatorDeleter<U, B> & a)
{
allocator_ = a.get_allocator();
}

void operator()(T * ptr)
{
std::allocator_traits<Allocator>::destroy(*allocator_, ptr);
std::allocator_traits<Allocator>::deallocate(*allocator_, ptr, sizeof(T));
ptr = NULL;
}

Allocator * get_allocator() const
{
return allocator_;
}

private:
Allocator * allocator_;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can this be private / protected?

};

#endif
141 changes: 141 additions & 0 deletions rclcpp/include/rclcpp/allocator_wrapper.hpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,141 @@
// Copyright 2014 Open Source Robotics Foundation, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

#ifndef RCLCPP_RCLCPP_ALLOCATOR_WRAPPER_HPP_
#define RCLCPP_RCLCPP_ALLOCATOR_WRAPPER_HPP_

#include <memory>
#include <iostream>
#include <rclcpp/allocator_deleter.hpp>


template<typename Alloc, typename T, typename D>
void initialize_deleter(D * deleter, Alloc * alloc)
{
(void) deleter;
(void) alloc;
throw std::runtime_error("Reached unexpected template specialization");
}

template<typename T>
void initialize_deleter(std::default_delete<T> * deleter, std::allocator<T> * alloc)
{
(void) alloc;
deleter = new std::default_delete<T>;
if (!deleter) {
throw std::runtime_error("initialize_deleter failed");
}
}

template<typename Alloc, typename T>
void initialize_deleter(AllocatorDeleter<T, Alloc> * deleter, Alloc * alloc)
{
if (!alloc) {
throw std::invalid_argument("Allocator argument was NULL");
}
deleter = new AllocatorDeleter<T, Alloc>(alloc);
if (!deleter) {
throw std::runtime_error("initialize_deleter failed");
}
}

template<typename T, typename Alloc>
class AllocatorWrapper
{
public:
using Deleter = typename std::conditional<
std::is_same<Alloc, std::allocator<T>>::value,
std::default_delete<T>,
AllocatorDeleter<T, Alloc>
>::type;

AllocatorWrapper(Alloc * allocator)
: allocator_(allocator)
{
if (!allocator_) {
throw std::invalid_argument("Allocator argument was NULL");
}
initialize_deleter(deleter_, allocator_);
if (deleter_ == NULL) {
throw std::invalid_argument("Failed to initialize deleter");
}
}

AllocatorWrapper(Alloc * allocator, Deleter * deleter)
: allocator_(allocator), deleter_(deleter)
{
if (!allocator_) {
throw std::invalid_argument("Allocator argument was NULL");
}
if (!deleter_) {
throw std::invalid_argument("Deleter argument was NULL");
}
}

AllocatorWrapper(Alloc & allocator)
{
allocator_ = &allocator;
if (!allocator_) {
throw std::invalid_argument("Allocator argument was NULL");
}
initialize_deleter(deleter_, allocator_);
if (!deleter_) {
throw std::invalid_argument("Failed to initialize deleter");
}
}

AllocatorWrapper()
{
allocator_ = new Alloc();
initialize_deleter(deleter_, allocator_);
if (!deleter_) {
//throw std::invalid_argument("Failed to initialize deleter");
deleter_ = new Deleter;
}
}

T * allocate(size_t size)
{
return std::allocator_traits<Alloc>::allocate(*allocator_, size);
}

T * deallocate(void * pointer, size_t size)
{
std::allocator_traits<Alloc>::deallocate(*allocator_, pointer, size);
}

template<class ... Args>
void construct(T * pointer, Args && ... args)
{
std::allocator_traits<Alloc>::construct(*allocator_, pointer, std::forward<Args>(args)...);
}

Deleter * get_deleter() const
{
return deleter_;
}
Alloc * get_underlying_allocator() const
{
return allocator_;
}

private:
Alloc * allocator_;
Deleter * deleter_;
};

template<typename T>
using DefaultAllocator = AllocatorWrapper<T, std::allocator<T>>;

#endif
56 changes: 51 additions & 5 deletions rclcpp/include/rclcpp/any_subscription_callback.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -29,23 +29,29 @@ namespace rclcpp
namespace any_subscription_callback
{

template<typename MessageT>
template<typename MessageT, typename Deleter = std::default_delete<MessageT>>
struct AnySubscriptionCallback
{
using SharedPtrCallback = std::function<void(const std::shared_ptr<MessageT> &)>;
using SharedPtrCallback = std::function<void(const std::shared_ptr<MessageT>)>;
using SharedPtrWithInfoCallback =
std::function<void(const std::shared_ptr<MessageT> &, const rmw_message_info_t &)>;
using UniquePtrCallback = std::function<void(std::unique_ptr<MessageT> &)>;
std::function<void(const std::shared_ptr<MessageT>, const rmw_message_info_t &)>;
using ConstSharedPtrCallback = std::function<void(const std::shared_ptr<const MessageT>)>;
using ConstSharedPtrWithInfoCallback =
std::function<void(const std::shared_ptr<const MessageT>, const rmw_message_info_t &)>;
using UniquePtrCallback = std::function<void(std::unique_ptr<MessageT, Deleter>)>;
using UniquePtrWithInfoCallback =
std::function<void(std::unique_ptr<MessageT> &, const rmw_message_info_t &)>;
std::function<void(std::unique_ptr<MessageT, Deleter>, const rmw_message_info_t &)>;

SharedPtrCallback shared_ptr_callback;
SharedPtrWithInfoCallback shared_ptr_with_info_callback;
ConstSharedPtrCallback const_shared_ptr_callback;
ConstSharedPtrWithInfoCallback const_shared_ptr_with_info_callback;
UniquePtrCallback unique_ptr_callback;
UniquePtrWithInfoCallback unique_ptr_with_info_callback;

AnySubscriptionCallback()
: shared_ptr_callback(nullptr), shared_ptr_with_info_callback(nullptr),
const_shared_ptr_callback(nullptr), const_shared_ptr_with_info_callback(nullptr),
unique_ptr_callback(nullptr), unique_ptr_with_info_callback(nullptr)
{}

Expand Down Expand Up @@ -80,8 +86,48 @@ struct AnySubscriptionCallback
>
void set(CallbackT callback)
{
static_assert(std::is_same<
typename function_traits<CallbackT>::template argument_type<1>,
const rmw_message_info_t &>::value,
"Passed incorrect argument type to callback, should be rmw_message_info_t");
shared_ptr_with_info_callback = callback;
}

template<typename CallbackT,
typename std::enable_if<
function_traits<CallbackT>::arity == 1
>::type * = nullptr,
typename std::enable_if<
std::is_same<
typename function_traits<CallbackT>::template argument_type<0>,
typename std::shared_ptr<const MessageT>
>::value
>::type * = nullptr
>
void set(CallbackT callback)
{
const_shared_ptr_callback = callback;
}

template<typename CallbackT,
typename std::enable_if<
function_traits<CallbackT>::arity == 2
>::type * = nullptr,
typename std::enable_if<
std::is_same<
typename function_traits<CallbackT>::template argument_type<0>,
typename std::shared_ptr<const MessageT>
>::value
>::type * = nullptr
>
void set(CallbackT callback)
{
static_assert(std::is_same<
typename function_traits<CallbackT>::template argument_type<1>,
const rmw_message_info_t &>::value,
"Passed incorrect argument type to callback, should be rmw_message_info_t");
const_shared_ptr_with_info_callback = callback;
}
/*
template<typename CallbackT,
typename std::enable_if<
Expand Down
1 change: 0 additions & 1 deletion rclcpp/include/rclcpp/executor.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -256,7 +256,6 @@ class Executor
"[rclcpp::error] take failed for subscription on topic '%s': %s\n",
subscription->get_topic_name().c_str(), rmw_get_error_string_safe());
}
subscription->return_message(message);
}

static void
Expand Down
24 changes: 14 additions & 10 deletions rclcpp/include/rclcpp/intra_process_manager.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
#ifndef RCLCPP_RCLCPP_INTRA_PROCESS_MANAGER_HPP_
#define RCLCPP_RCLCPP_INTRA_PROCESS_MANAGER_HPP_

#include <rclcpp/allocator_wrapper.hpp>
#include <rclcpp/mapped_ring_buffer.hpp>
#include <rclcpp/macros.hpp>
#include <rclcpp/publisher.hpp>
Expand Down Expand Up @@ -188,9 +189,10 @@ class IntraProcessManager
* \param buffer_size if 0 (default) a size is calculated based on the QoS.
* \return an unsigned 64-bit integer which is the publisher's unique id.
*/
template<typename MessageT>
template<typename MessageT, typename AllocWrapper = DefaultAllocator<MessageT>>
uint64_t
add_publisher(publisher::Publisher::SharedPtr publisher, size_t buffer_size = 0)
add_publisher(typename publisher::Publisher<MessageT, AllocWrapper>::SharedPtr publisher,
AllocWrapper * allocator = new AllocWrapper(), size_t buffer_size = 0)
{
auto id = IntraProcessManager::get_next_unique_id();
publishers_[id].publisher = publisher;
Expand All @@ -200,7 +202,9 @@ class IntraProcessManager
throw std::invalid_argument("the calculated buffer size is too large");
}
publishers_[id].sequence_number.store(0);
publishers_[id].buffer = mapped_ring_buffer::MappedRingBuffer<MessageT>::make_shared(size);
publishers_[id].buffer =
mapped_ring_buffer::MappedRingBuffer<MessageT, AllocWrapper>::make_shared(size,
allocator);
publishers_[id].target_subscriptions_by_message_sequence.reserve(size);
return id;
}
Expand Down Expand Up @@ -246,11 +250,11 @@ class IntraProcessManager
* \param message the message that is being stored.
* \return the message sequence number.
*/
template<typename MessageT>
template<typename MessageT, typename AllocWrapper = DefaultAllocator<MessageT>>
uint64_t
store_intra_process_message(
uint64_t intra_process_publisher_id,
std::unique_ptr<MessageT> & message)
std::unique_ptr<MessageT, typename AllocWrapper::Deleter> & message)
{
auto it = publishers_.find(intra_process_publisher_id);
if (it == publishers_.end()) {
Expand All @@ -260,7 +264,7 @@ class IntraProcessManager
// Calculate the next message sequence number.
uint64_t message_seq = info.sequence_number.fetch_add(1, std::memory_order_relaxed);
// Insert the message into the ring buffer using the message_seq to identify it.
typedef typename mapped_ring_buffer::MappedRingBuffer<MessageT> TypedMRB;
typedef typename mapped_ring_buffer::MappedRingBuffer<MessageT, AllocWrapper> TypedMRB;
typename TypedMRB::SharedPtr typed_buffer = std::static_pointer_cast<TypedMRB>(info.buffer);
bool did_replace = typed_buffer->push_and_replace(message_seq, message);
// TODO(wjwwood): do something when a message was displaced. log debug?
Expand Down Expand Up @@ -320,13 +324,13 @@ class IntraProcessManager
* \param requesting_subscriptions_intra_process_id the subscription's id.
* \param message the message typed unique_ptr used to return the message.
*/
template<typename MessageT>
template<typename MessageT, typename AllocWrapper = DefaultAllocator<MessageT>>
void
take_intra_process_message(
uint64_t intra_process_publisher_id,
uint64_t message_sequence_number,
uint64_t requesting_subscriptions_intra_process_id,
std::unique_ptr<MessageT> & message)
std::unique_ptr<MessageT, typename AllocWrapper::Deleter> & message)
{
message = nullptr;
PublisherInfo * info;
Expand Down Expand Up @@ -358,7 +362,7 @@ class IntraProcessManager
}
target_subs->erase(it);
}
typedef typename mapped_ring_buffer::MappedRingBuffer<MessageT> TypedMRB;
typedef typename mapped_ring_buffer::MappedRingBuffer<MessageT, AllocWrapper> TypedMRB;
typename TypedMRB::SharedPtr typed_buffer = std::static_pointer_cast<TypedMRB>(info->buffer);
// Return a copy or the unique_ptr (ownership) depending on how many subscriptions are left.
if (target_subs->size()) {
Expand Down Expand Up @@ -419,7 +423,7 @@ class IntraProcessManager

PublisherInfo() = default;

publisher::Publisher::WeakPtr publisher;
typename publisher::PublisherBase::WeakPtr publisher;
std::atomic<uint64_t> sequence_number;
mapped_ring_buffer::MappedRingBufferBase::SharedPtr buffer;
std::unordered_map<uint64_t, std::set<uint64_t>> target_subscriptions_by_message_sequence;
Expand Down
Loading