Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Feature/available capacity of ipm #2173

Merged
merged 10 commits into from
May 19, 2023
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ class BufferImplementationBase

virtual void clear() = 0;
virtual bool has_data() const = 0;
virtual size_t available_capacity() const = 0;
};

} // namespace buffers
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ class IntraProcessBufferBase

virtual bool has_data() const = 0;
virtual bool use_take_shared_method() const = 0;
virtual size_t available_capacity() const = 0;
};

template<
Expand Down Expand Up @@ -143,6 +144,11 @@ class TypedIntraProcessBuffer : public IntraProcessBuffer<MessageT, Alloc, Messa
return std::is_same<BufferT, MessageSharedPtr>::value;
}

size_t available_capacity() const override
{
return buffer_->available_capacity();
}

private:
std::unique_ptr<BufferImplementationBase<BufferT>> buffer_;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -148,6 +148,18 @@ class RingBufferImplementation : public BufferImplementationBase<BufferT>
return is_full_();
}

/// Get the remaining capacity to store messages
/**
* This member function is thread-safe.
*
* \return the number of free capacity for new messages
*/
size_t available_capacity() const
{
std::lock_guard<std::mutex> lock(mutex_);
return available_capacity_();
}

void clear()
{
TRACEPOINT(rclcpp_ring_buffer_clear, static_cast<const void *>(this));
Expand Down Expand Up @@ -189,6 +201,17 @@ class RingBufferImplementation : public BufferImplementationBase<BufferT>
return size_ == capacity_;
}

/// Get the remaining capacity to store messages
/**
* This member function is not thread-safe.
*
* \return the number of free capacity for new messages
*/
inline size_t available_capacity_() const
{
return capacity_ - size_;
}

size_t capacity_;

std::vector<BufferT> ring_buffer_;
Expand Down
5 changes: 5 additions & 0 deletions rclcpp/include/rclcpp/experimental/intra_process_manager.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -306,6 +306,11 @@ class IntraProcessManager
rclcpp::experimental::SubscriptionIntraProcessBase::SharedPtr
get_subscription_intra_process(uint64_t intra_process_subscription_id);

/// Return the lowest available capacity for all subscription buffers for a publisher id.
RCLCPP_PUBLIC
size_t
lowest_available_capacity(const uint64_t intra_process_publisher_id) const;

private:
struct SplittedSubscriptions
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,11 @@ class SubscriptionIntraProcessBase : public rclcpp::Waitable
void
add_to_wait_set(rcl_wait_set_t * wait_set) override;

RCLCPP_PUBLIC
virtual
size_t
available_capacity() const = 0;

bool
is_ready(rcl_wait_set_t * wait_set) override = 0;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -169,6 +169,11 @@ class SubscriptionIntraProcessBuffer : public SubscriptionROSMsgIntraProcessBuff
return buffer_->use_take_shared_method();
}

size_t available_capacity() const override
{
return buffer_->available_capacity();
}

protected:
void
trigger_guard_condition() override
Expand Down
11 changes: 11 additions & 0 deletions rclcpp/include/rclcpp/publisher_base.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -215,6 +215,17 @@ class PublisherBase : public std::enable_shared_from_this<PublisherBase>
std::vector<rclcpp::NetworkFlowEndpoint>
get_network_flow_endpoints() const;

/// Return the lowest available capacity for all subscription buffers.
/**
* For intraprocess communication return the lowest buffer capacity for all subscriptions.
* If intraprocess is disabled or no intraprocess subscriptions present, return maximum of size_t.
* On failure return 0.
* \return lowest buffer capacity for all subscriptions
*/
RCLCPP_PUBLIC
size_t
lowest_available_ipm_capacity() const;

/// Wait until all published messages are acknowledged or until the specified timeout elapses.
/**
* This method waits until all published messages are acknowledged by all matching
Expand Down
47 changes: 47 additions & 0 deletions rclcpp/src/rclcpp/intra_process_manager.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -225,5 +225,52 @@ IntraProcessManager::can_communicate(
return true;
}

size_t
IntraProcessManager::lowest_available_capacity(const uint64_t intra_process_publisher_id) const
{
size_t capacity = std::numeric_limits<size_t>::max();

auto publisher_it = pub_to_subs_.find(intra_process_publisher_id);
if (publisher_it == pub_to_subs_.end()) {
// Publisher is either invalid or no longer exists.
RCLCPP_WARN(
rclcpp::get_logger("rclcpp"),
"Calling lowest_available_capacity for invalid or no longer existing publisher id");
return 0u;
}

if (publisher_it->second.take_shared_subscriptions.empty() &&
publisher_it->second.take_ownership_subscriptions.empty())
{
// no subscriptions available
return 0u;
}

auto available_capacity = [this, &capacity](const uint64_t intra_process_subscription_id)
{
auto subscription_it = subscriptions_.find(intra_process_subscription_id);
if (subscription_it != subscriptions_.end()) {
auto subscription = subscription_it->second.lock();
if (subscription) {
capacity = std::min(capacity, subscription->available_capacity());
}
} else {
// Subscription is either invalid or no longer exists.
RCLCPP_WARN(
rclcpp::get_logger("rclcpp"),
"Calling available_capacity for invalid or no longer existing subscription id");
}
};

for (const auto sub_id : publisher_it->second.take_shared_subscriptions) {
available_capacity(sub_id);
}

for (const auto sub_id : publisher_it->second.take_ownership_subscriptions) {
available_capacity(sub_id);
}

return capacity;
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

there could be possibility that this returns std::numeric_limits<size_t>::max(). is that expected behavior? Or if publisher_it can be found, there must be corresponding subscription id in the list?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's expected behavior. If not subscriptions are present, the buffer is "free" -> return max. value. To check for the number of (intra-process) subscriptions there is another method.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

so that means that publisher actually can write the maximum number of data in the buffer? i believe that what it means to return the maximum number here as interface, and what user application expects.

}
} // namespace experimental
} // namespace rclcpp
19 changes: 19 additions & 0 deletions rclcpp/src/rclcpp/publisher_base.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -384,3 +384,22 @@ std::vector<rclcpp::NetworkFlowEndpoint> PublisherBase::get_network_flow_endpoin

return network_flow_endpoint_vector;
}

size_t PublisherBase::lowest_available_ipm_capacity() const
{
if (!intra_process_is_enabled_) {
return 0u;
}

auto ipm = weak_ipm_.lock();

if (!ipm) {
// TODO(ivanpauno): should this raise an error?
RCLCPP_WARN(
rclcpp::get_logger("rclcpp"),
"Intra process manager died for a publisher.");
return 0u;
}

return ipm->lowest_available_capacity(intra_process_publisher_id_);
}
72 changes: 72 additions & 0 deletions rclcpp/test/rclcpp/test_intra_process_buffer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -238,3 +238,75 @@ TEST(TestIntraProcessBuffer, unique_buffer_consume) {
EXPECT_EQ(original_value, *popped_unique_msg);
EXPECT_EQ(original_message_pointer, popped_message_pointer);
}

/*
Check the available buffer capacity while storing and consuming data from an intra-process
buffer.
The initial available buffer capacity should equal the buffer size.
Inserting a message should decrease the available buffer capacity by 1.
Consuming a message should increase the available buffer capacity by 1.
*/
TEST(TestIntraProcessBuffer, available_capacity) {
using MessageT = char;
using Alloc = std::allocator<void>;
using Deleter = std::default_delete<MessageT>;
using SharedMessageT = std::shared_ptr<const MessageT>;
using UniqueMessageT = std::unique_ptr<MessageT, Deleter>;
using UniqueIntraProcessBufferT = rclcpp::experimental::buffers::TypedIntraProcessBuffer<
MessageT, Alloc, Deleter, UniqueMessageT>;

constexpr auto history_depth = 5u;

auto buffer_impl =
std::make_unique<rclcpp::experimental::buffers::RingBufferImplementation<UniqueMessageT>>(
history_depth);

UniqueIntraProcessBufferT intra_process_buffer(std::move(buffer_impl));

EXPECT_EQ(history_depth, intra_process_buffer.available_capacity());

auto original_unique_msg = std::make_unique<char>('a');
auto original_message_pointer = reinterpret_cast<std::uintptr_t>(original_unique_msg.get());
auto original_value = *original_unique_msg;

intra_process_buffer.add_unique(std::move(original_unique_msg));

EXPECT_EQ(history_depth - 1u, intra_process_buffer.available_capacity());

SharedMessageT popped_shared_msg;
popped_shared_msg = intra_process_buffer.consume_shared();
auto popped_message_pointer = reinterpret_cast<std::uintptr_t>(popped_shared_msg.get());

EXPECT_EQ(history_depth, intra_process_buffer.available_capacity());
EXPECT_EQ(original_value, *popped_shared_msg);
EXPECT_EQ(original_message_pointer, popped_message_pointer);

original_unique_msg = std::make_unique<char>('b');
original_message_pointer = reinterpret_cast<std::uintptr_t>(original_unique_msg.get());
original_value = *original_unique_msg;

intra_process_buffer.add_unique(std::move(original_unique_msg));

auto second_unique_msg = std::make_unique<char>('c');
auto second_message_pointer = reinterpret_cast<std::uintptr_t>(second_unique_msg.get());
auto second_value = *second_unique_msg;

intra_process_buffer.add_unique(std::move(second_unique_msg));

EXPECT_EQ(history_depth - 2u, intra_process_buffer.available_capacity());

UniqueMessageT popped_unique_msg;
popped_unique_msg = intra_process_buffer.consume_unique();
popped_message_pointer = reinterpret_cast<std::uintptr_t>(popped_unique_msg.get());

EXPECT_EQ(history_depth - 1u, intra_process_buffer.available_capacity());
EXPECT_EQ(original_value, *popped_unique_msg);
EXPECT_EQ(original_message_pointer, popped_message_pointer);

popped_unique_msg = intra_process_buffer.consume_unique();
popped_message_pointer = reinterpret_cast<std::uintptr_t>(popped_unique_msg.get());

EXPECT_EQ(history_depth, intra_process_buffer.available_capacity());
EXPECT_EQ(second_value, *popped_unique_msg);
EXPECT_EQ(second_message_pointer, popped_message_pointer);
}
Loading