Skip to content

Commit

Permalink
Use message_info in SubscriptionTopicStatistics instead of typed message
Browse files Browse the repository at this point in the history
Signed-off-by: Michael Orlov <michael.orlov@apex.ai>
  • Loading branch information
MichaelOrlov committed Oct 14, 2023
1 parent 7f7ffcf commit 9d93748
Show file tree
Hide file tree
Showing 5 changed files with 28 additions and 50 deletions.
17 changes: 8 additions & 9 deletions rclcpp/include/rclcpp/create_subscription.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -50,8 +50,8 @@ template<
typename SubscriptionT,
typename MessageMemoryStrategyT,
typename NodeParametersT,
typename NodeTopicsT,
typename ROSMessageType = typename SubscriptionT::ROSMessageType>
typename NodeTopicsT
>
typename std::shared_ptr<SubscriptionT>
create_subscription(
NodeParametersT & node_parameters,
Expand All @@ -70,7 +70,7 @@ create_subscription(
using rclcpp::node_interfaces::get_node_topics_interface;
auto node_topics_interface = get_node_topics_interface(node_topics);

std::shared_ptr<rclcpp::topic_statistics::SubscriptionTopicStatistics<ROSMessageType>>
std::shared_ptr<rclcpp::topic_statistics::SubscriptionTopicStatistics>
subscription_topic_stats = nullptr;

if (rclcpp::detail::resolve_enable_topic_statistics(
Expand All @@ -80,8 +80,7 @@ create_subscription(
if (options.topic_stats_options.publish_period <= std::chrono::milliseconds(0)) {
throw std::invalid_argument(
"topic_stats_options.publish_period must be greater than 0, specified value of " +
std::to_string(options.topic_stats_options.publish_period.count()) +
" ms");
std::to_string(options.topic_stats_options.publish_period.count()) + " ms");
}

std::shared_ptr<Publisher<statistics_msgs::msg::MetricsMessage>>
Expand All @@ -91,12 +90,12 @@ create_subscription(
options.topic_stats_options.publish_topic,
qos);

subscription_topic_stats = std::make_shared<
rclcpp::topic_statistics::SubscriptionTopicStatistics<ROSMessageType>
>(node_topics_interface->get_node_base_interface()->get_name(), publisher);
subscription_topic_stats =
std::make_shared<rclcpp::topic_statistics::SubscriptionTopicStatistics>(
node_topics_interface->get_node_base_interface()->get_name(), publisher);

std::weak_ptr<
rclcpp::topic_statistics::SubscriptionTopicStatistics<ROSMessageType>
rclcpp::topic_statistics::SubscriptionTopicStatistics
> weak_subscription_topic_stats(subscription_topic_stats);
auto sub_call_back = [weak_subscription_topic_stats]() {
auto subscription_topic_stats = weak_subscription_topic_stats.lock();
Expand Down
6 changes: 3 additions & 3 deletions rclcpp/include/rclcpp/subscription.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -104,7 +104,7 @@ class Subscription : public SubscriptionBase

private:
using SubscriptionTopicStatisticsSharedPtr =
std::shared_ptr<rclcpp::topic_statistics::SubscriptionTopicStatistics<ROSMessageType>>;
std::shared_ptr<rclcpp::topic_statistics::SubscriptionTopicStatistics>;

public:
RCLCPP_SMART_PTR_DEFINITIONS(Subscription)
Expand Down Expand Up @@ -316,7 +316,7 @@ class Subscription : public SubscriptionBase
if (subscription_topic_statistics_) {
const auto nanos = std::chrono::time_point_cast<std::chrono::nanoseconds>(now);
const auto time = rclcpp::Time(nanos.time_since_epoch().count());
subscription_topic_statistics_->handle_message(*typed_message, time);
subscription_topic_statistics_->handle_message(message_info.get_rmw_message_info(), time);
}
}

Expand Down Expand Up @@ -357,7 +357,7 @@ class Subscription : public SubscriptionBase
if (subscription_topic_statistics_) {
const auto nanos = std::chrono::time_point_cast<std::chrono::nanoseconds>(now);
const auto time = rclcpp::Time(nanos.time_since_epoch().count());
subscription_topic_statistics_->handle_message(*typed_message, time);
subscription_topic_statistics_->handle_message(message_info.get_rmw_message_info(), time);
}
}

Expand Down
5 changes: 2 additions & 3 deletions rclcpp/include/rclcpp/subscription_factory.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -75,15 +75,14 @@ template<
typename CallbackT,
typename AllocatorT,
typename SubscriptionT = rclcpp::Subscription<MessageT, AllocatorT>,
typename MessageMemoryStrategyT = typename SubscriptionT::MessageMemoryStrategyType,
typename ROSMessageType = typename SubscriptionT::ROSMessageType
typename MessageMemoryStrategyT = typename SubscriptionT::MessageMemoryStrategyType
>
SubscriptionFactory
create_subscription_factory(
CallbackT && callback,
const rclcpp::SubscriptionOptionsWithAllocator<AllocatorT> & options,
typename MessageMemoryStrategyT::SharedPtr msg_mem_strat,
std::shared_ptr<rclcpp::topic_statistics::SubscriptionTopicStatistics<ROSMessageType>>
std::shared_ptr<rclcpp::topic_statistics::SubscriptionTopicStatistics>
subscription_topic_stats = nullptr
)
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,21 +48,12 @@ using libstatistics_collector::moving_average_statistics::StatisticData;
/**
* Class used to collect, measure, and publish topic statistics data. Current statistics
* supported for subscribers are received message age and received message period.
*
* \tparam CallbackMessageT the subscribed message type
*/
template<typename CallbackMessageT>
*/
class SubscriptionTopicStatistics
{
using TopicStatsCollector =
libstatistics_collector::topic_statistics_collector::TopicStatisticsCollector<
CallbackMessageT>;
using ReceivedMessageAge =
libstatistics_collector::topic_statistics_collector::ReceivedMessageAgeCollector<
CallbackMessageT>;
using ReceivedMessagePeriod =
libstatistics_collector::topic_statistics_collector::ReceivedMessagePeriodCollector<
CallbackMessageT>;
using TopicStatsCollector = libstatistics_collector::TopicStatisticsCollector;
using ReceivedMessageAge = libstatistics_collector::ReceivedMessageAgeCollector;
using ReceivedMessagePeriod = libstatistics_collector::ReceivedMessagePeriodCollector;

public:
/// Construct a SubscriptionTopicStatistics object.
Expand Down Expand Up @@ -101,16 +92,16 @@ class SubscriptionTopicStatistics
/**
* This method acquires a lock to prevent race conditions to collectors list.
*
* \param received_message the message received by the subscription
* \param message_info the message info corresponding to the received message
* \param now_nanoseconds current time in nanoseconds
*/
virtual void handle_message(
const CallbackMessageT & received_message,
const rmw_message_info_t & message_info,
const rclcpp::Time now_nanoseconds) const
{
std::lock_guard<std::mutex> lock(mutex_);
for (const auto & collector : subscriber_statistics_collectors_) {
collector->OnMessageReceived(received_message, now_nanoseconds.nanoseconds());
collector->OnMessageReceived(message_info, now_nanoseconds.nanoseconds());
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -77,16 +77,14 @@ using libstatistics_collector::moving_average_statistics::StatisticData;

/**
* Wrapper class to test and expose parts of the SubscriptionTopicStatistics<T> class.
* \tparam CallbackMessageT
*/
template<typename CallbackMessageT>
class TestSubscriptionTopicStatistics : public SubscriptionTopicStatistics<CallbackMessageT>
class TestSubscriptionTopicStatistics : public SubscriptionTopicStatistics
{
public:
TestSubscriptionTopicStatistics(
const std::string & node_name,
rclcpp::Publisher<statistics_msgs::msg::MetricsMessage>::SharedPtr publisher)
: SubscriptionTopicStatistics<CallbackMessageT>(node_name, publisher)
: SubscriptionTopicStatistics(node_name, publisher)
{
}

Expand All @@ -95,7 +93,7 @@ class TestSubscriptionTopicStatistics : public SubscriptionTopicStatistics<Callb
/// Exposed for testing
std::vector<StatisticData> get_current_collector_data() const
{
return SubscriptionTopicStatistics<CallbackMessageT>::get_current_collector_data();
return SubscriptionTopicStatistics::get_current_collector_data();
}
};

Expand Down Expand Up @@ -277,12 +275,12 @@ class MessageWithHeaderSubscriber : public rclcpp::Node
class TestSubscriptionTopicStatisticsFixture : public ::testing::Test
{
protected:
void SetUp()
void SetUp() override
{
rclcpp::init(0 /* argc */, nullptr /* argv */);
}

void TearDown()
void TearDown() override
{
rclcpp::shutdown();
}
Expand Down Expand Up @@ -389,7 +387,7 @@ TEST_F(TestSubscriptionTopicStatisticsFixture, test_manual_construction)
10);

// Construct a separate instance
auto sub_topic_stats = std::make_unique<TestSubscriptionTopicStatistics<Empty>>(
auto sub_topic_stats = std::make_unique<TestSubscriptionTopicStatistics>(
empty_subscriber->get_name(),
topic_stats_publisher);

Expand Down Expand Up @@ -432,9 +430,7 @@ TEST_F(TestSubscriptionTopicStatisticsFixture, test_receive_stats_for_message_no
ex.add_node(empty_subscriber);

// Spin and get future
ex.spin_until_future_complete(
statistics_listener->GetFuture(),
kTestTimeout);
ex.spin_until_future_complete(statistics_listener->GetFuture(), kTestTimeout);

// Compare message counts, sample count should be the same as published and received count
EXPECT_EQ(kNumExpectedMessages, statistics_listener->GetNumberOfMessagesReceived());
Expand All @@ -460,15 +456,8 @@ TEST_F(TestSubscriptionTopicStatisticsFixture, test_receive_stats_for_message_no
EXPECT_EQ(kNumExpectedMessagePeriodMessages, message_period_count);

// Check the collected statistics for message period.
// Message age statistics will not be calculated because Empty messages
// don't have a `header` with timestamp. This means that we expect to receive a `message_age`
// and `message_period` message for each empty message published.
for (const auto & msg : received_messages) {
if (msg.metrics_source == kMessageAgeSourceLabel) {
check_if_statistics_message_is_empty(msg);
} else if (msg.metrics_source == kMessagePeriodSourceLabel) {
check_if_statistic_message_is_populated(msg);
}
check_if_statistic_message_is_populated(msg);
}
}

Expand Down

0 comments on commit 9d93748

Please sign in to comment.