Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

While playing, the last recorded message cannot be sent if the size of the message is big. #571

Closed
Barry-Xu-2018 opened this issue Nov 25, 2020 · 21 comments
Labels
bug Something isn't working

Comments

@Barry-Xu-2018
Copy link
Contributor

Description

While playing, the last recorded message cannot be sent if the size of the message is big

Expected Behavior

Recorded messages all be sent while playing

Actual Behavior

The last recorded message cannot be sent successfully

To Reproduce

Make publisher and the size of message is bigger than 512KB.

  1. Execute publisher

  2. In another terminal, execute

    ros2 bag record -o /tmp/mytest /topic
    

    Wait for a while, and do ctrl+c to terminate record.

  3. Check the number of record by below command

    ros2 bag info /tmp/mytest
    

    At item Messages:, you will get the number of message recorded.

  4. Play recorded message
    Start subscriber and execute below command in another terminal

    ros2 bag play /tmp/mytest
    

You will find the number of received message is less than recorded message.
If print the sequence of message, you will find the last one isn't received in subscriber.

System (please complete the following information)

  • OS: Ubuntu focal
  • ROS 2 Distro: rolling
  • Version: master

Additional context

If you cannot reproduce this on your environment, you can increase the size of the message (such 2MB).

@Barry-Xu-2018 Barry-Xu-2018 added the bug Something isn't working label Nov 25, 2020
@Barry-Xu-2018
Copy link
Contributor Author

After checking, the cause is

void Rosbag2Transport::play(
const rosbag2_storage::StorageOptions & storage_options, const PlayOptions & play_options)
{
try {
auto transport_node =
setup_node(play_options.node_prefix, play_options.topic_remapping_options);
Player player(reader_, transport_node);
do {
reader_->open(storage_options, {"", rmw_get_serialization_format()});
player.play(play_options);
} while (rclcpp::ok() && play_options.loop);
} catch (std::runtime_error & e) {

Publisher is created and saved in player

void Player::prepare_publishers(const PlayOptions & options)
{
rosbag2_storage::StorageFilter storage_filter;
storage_filter.topics = options.topics_to_filter;
reader_->set_filter(storage_filter);
auto topics = reader_->get_all_topics_and_types();
for (const auto & topic : topics) {
auto topic_qos = publisher_qos_for_topic(topic, topic_qos_profile_overrides_);
publishers_.insert(
std::make_pair(
topic.name, rosbag2_transport_->create_generic_publisher(
topic.name, topic.type, topic_qos)));
}
}

Since sending a message is asynchronous, while the message is bigger, sending all messages will spend much time.
So if player is deconstructed (that is, publisher is deconstructed) and there are messages which aren't sent in DDS, these message will not be sent.

If modify codes as below, all recorded messages can be sent in my case.

void Rosbag2Transport::play(
  const rosbag2_storage::StorageOptions & storage_options, const PlayOptions & play_options)
{
  try {
    auto transport_node =
      setup_node(play_options.node_prefix, play_options.topic_remapping_options);
    Player player(reader_, transport_node);
    do {
      reader_->open(storage_options, {"", rmw_get_serialization_format()});
      player.play(play_options);
      using namespace std::chrono_literals;
      std::this_thread::sleep_for(2s);     // <== New add codes
    } while (rclcpp::ok() && play_options.loop);

  } catch (std::runtime_error & e) {
    ROSBAG2_TRANSPORT_LOG_ERROR("Failed to play: %s", e.what());
  }
}

Of course, this isn't the correct solution.
By now, I have no good idea on how to wait for the completion of sending all messages.
Does anyone have any idea for this issue ?

@Karsten1987
Copy link
Collaborator

@adamdbrw do you see a similar behavior during your performance evaluation?

@adamdbrw
Copy link
Collaborator

adamdbrw commented Nov 25, 2020

We didn't notice such behavior, but also our evaluation was almost completely focused on recording (for various reasons explained earlier).

This looks like we need to wait for confirmation of successful sending of last message before we exit destructor. I don't mind fixing this as long as all interested parties agree.

@Barry-Xu-2018
Copy link
Contributor Author

@adamdbrw

This looks like we need to wait for confirmation of successful sending of last message before we exit destructor.

Do you have any idea on how to check all message sending ?

@adamdbrw
Copy link
Collaborator

I would start with checking whether the publish is called at all for the last message - this can be easily debugged

@Barry-Xu-2018
Copy link
Contributor Author

I would start with checking whether the publish is called at all for the last message - this can be easily debugged

Share my check information.
I have checked it in my environment.
The codes at line 186 is called for the last message.

while (message_queue_.try_dequeue(message) && rclcpp::ok()) {
std::this_thread::sleep_until(
start_time_ + std::chrono::duration_cast<std::chrono::nanoseconds>(
1.0 / rate * message.time_since_start));
if (rclcpp::ok()) {
publishers_[message.message->topic_name]->publish(message.message->serialized_data);
}
}
}

@emersonknapp
Copy link
Collaborator

I wonder - do you see this behavior across all 3 standard RMW implementations, or just with FastDDS?

@Barry-Xu-2018
Copy link
Contributor Author

@emersonknapp

I wonder - do you see this behavior across all 3 standard RMW implementations, or just with FastDDS?

Sorry for late reply.

I tried cyclonedds and rti connext. This problem doesn't occur for these DDS.

@adamdbrw
Copy link
Collaborator

adamdbrw commented Dec 14, 2020

@Barry-Xu-2018 the difference is likely because only FastDDS writer (for publishing) is asynchronous by default. While this issue should of course be resolved (we should wait in some way), you can also easily set FastDDS to use synchronous writing. This is described here https://discourse.ros.org/t/rmw-fast-dds-publication-mode-sync-vs-async-and-how-to-change-it/17153.

@Barry-Xu-2018
Copy link
Contributor Author

@adamdbrw

the difference is likely because only FastDDS writer (for publishing) is asynchronous by default. While this issue should of course be resolved (we should wait in some way), you can set FastDDS to use synchronous writing. This is described here https://discourse.ros.org/t/rmw-fast-dds-publication-mode-sync-vs-async-and-how-to-change-it/17153.

Thank you for your information.
Yeah. The root cause is that sending is asynchronous.
I know cyclonedds is synchronous. So it doesn't have this issue. rti connext maybe asynchronous by default.

I am interesting in how to fix this issue with asynchronous. But now I have no good idea.

@JaimeMartin
Copy link

JaimeMartin commented Dec 22, 2020

Hi @Barry-Xu-2018,

Here you have an API, you could use: wait_for_all_acked

https://github.com/eProsima/Fast-DDS/blob/master/include/fastrtps/publisher/Publisher.h#L134

And here you have how to expose the publisher to call this API:

https://github.com/ros2/demos/blob/master/demo_nodes_cpp_native/src/talker.cpp

@Barry-Xu-2018
Copy link
Contributor Author

@JaimeMartin

Here you have an API, you could use: wait_for_all_acked

https://github.com/eProsima/Fast-DDS/blob/master/include/fastrtps/publisher/Publisher.h#L134

And here you have how to expose the publisher to call this API:

https://github.com/ros2/demos/blob/master/demo_nodes_cpp_native/src/talker.cpp

Really appreciate your information.
Maybe it is better to provide uniform API from rcl layer.

@JaimeMartin
Copy link

Hi @Barry-Xu-2018

The wait_for_acknowledgments is part of the DDS Standard, both Fast DDS and RTI Connext DDS implement this Publisher/DataWriter method.

I think this use case justifies to expose the method through the RCL

@Barry-Xu-2018
Copy link
Contributor Author

Hi @JaimeMartin

The wait_for_acknowledgments is part of the DDS Standard, both Fast DDS and RTI Connext DDS implement this Publisher/DataWriter method.

I think this use case justifies to expose the method through the RCL

Totally agree.

@JaimeMartin
Copy link

Hi @Barry-Xu-2018

We can guide you through the process of exposing this method, it should be straightforward.

@Barry-Xu-2018
Copy link
Contributor Author

Barry-Xu-2018 commented Dec 23, 2020

Hi @JaimeMartin

We can guide you through the process of exposing this method, it should be straightforward.

Thank your very much for your support.

@EduPonz
Copy link

EduPonz commented Dec 23, 2020

Hi everyone,

I think exposing wait_for_acknowledgments in rclcpp/rlcpy should be just a matter of propagating the call all the way to the middleware. You'll just need convert rmw_time_t to Duration_t (example), and maybe the return from bool to some return code at your convenience.

It is indeed a very useful feature, no only for this particular use case, but also in general (for example for synchronizing publisher and subscription operations based on a command topic).

As a side note, in synchronous delivery you could experience the same behaviour if the message is lost somewhere in the network stack, so even in that case, you'd want to wait for the acknowledgement.

@Barry-Xu-2018
Copy link
Contributor Author

@JaimeMartin @EduPonz

I have created a issue in RMW ros2/rmw#295 to expose this interface wait_for_acknowledgments.
And hear comments from other members.

@fujitatomoya
Copy link
Contributor

@Barry-Xu-2018 all of the interfaces and APIs have been merged in mainline. would you mind taking care of this as well? probably we can provide the option via ros2 bag play to choose if wait_for_acknowledge needs to be issued or not? (@emersonknapp what do you think about having the option?)

@Barry-Xu-2018
Copy link
Contributor Author

@fujitatomoya

Okay. I will make a fix.

probably we can provide the option via ros2 bag play to choose if wait_for_acknowledge needs to be issued or not?

Yeah. It is better to be an option for user.

@emersonknapp
Copy link
Collaborator

It looks like this is closed by #951 (please reopen if that is not the case). By the way, yes, the option seems like a good idea to me. Good work 👍

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
bug Something isn't working
Projects
None yet
Development

No branches or pull requests

7 participants