Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add ability to confirm reader receive all sent messages #295

Closed
Barry-Xu-2018 opened this issue Dec 24, 2020 · 18 comments
Closed

Add ability to confirm reader receive all sent messages #295

Barry-Xu-2018 opened this issue Dec 24, 2020 · 18 comments
Assignees
Labels
enhancement New feature or request

Comments

@Barry-Xu-2018
Copy link
Contributor

Feature request

Feature description

Developer can use this interface to confirm reader receive all sent messages. For some use cases, this confirmation is very important.

wait_for_acknowledgments is defined in DDS spec.

The description in DDS spec

2.2.2.4.1.12 wait_for_acknowledgments
This operation blocks the calling thread until either all data written by the reliable DataWriter entities is acknowledged by all
matched reliable DataReader entities, or else the duration specified by the max_wait parameter elapses, whichever happens
first. A return value of OK indicates that all the samples written have been acknowledged by all reliable matched data readers;
a return value of TIMEOUT indicates that max_wait elapsed before all the data was acknowledged.

Implementation considerations

If DDS exposes this interface, we can directly use (Such as FastDDS). If not, return unsupported.

@Barry-Xu-2018
Copy link
Contributor Author

@ivanpauno @clalancette

Do you have any comments on adding this interface ?

@ivanpauno
Copy link
Member

@Barry-Xu-2018 can you summarize the motivation of adding this? i.e.: give some example use cases.

Is this standard DDS API?

@emersonknapp
Copy link
Contributor

I believe the motivation is so that rosbag2 playback can finish sending any reliable messages before shutting itself down, rather than immediately shutting down after calling the final publish.

This API could possibly be overkill, though, as I believe currently FastDDS hasn't even had the chance to publish the message at the time of shutdown due to happening in a separate worker thread. I'm not familiar enough with the DDS APIs to say whether "all messages sent once" is a feature we could connect to, as that is probably closer to the desired behavior: "if I call publish, give me the opportunity to determine whether the message was actually published, before I shut down"

@Barry-Xu-2018
Copy link
Contributor Author

Barry-Xu-2018 commented Jan 5, 2021

@ivanpauno

As @emersonknapp said, motivation is

rosbag2 playback can finish sending any reliable messages before shutting itself down, rather than immediately shutting down after calling the final publish.

This API can provide an ability to make sure all subscribers receive all sent messages from the publisher (Only if QOS profile of publisher is set to RELIABLE).
Even if DDS is sync mode, publish() call return doesn't mean the message is actually sent out (Maybe put it to send history). If the publisher program exit at this time, no one can make sure all sent messages are sent out actually.

Is this standard DDS API?

Yes. You can find the section 2.2.2.4.2.15 wait_for_acknowledgments in DDS spec v1.4.

@ivanpauno
Copy link
Member

This API could possibly be overkill, though, as I believe currently FastDDS hasn't even had the chance to publish the message at the time of shutdown due to happening in a separate worker thread

I'm not sure what this means, do you mean that wait_for_acknowledgments will only wait for acknowledgements of messages that the "writer thread" already started sending but might ignore sending messages that still are in the queue?

I think that in a reliable publisher, having a way to wait until all sent messages are actually sent is a good idea, but I'm not sure if wait_for_acknowledgments does exactly that.

@ivanpauno
Copy link
Member

Checking the DDS spec, it seems that it means exactly that:

This operation blocks the calling thread until either all data written by the reliable DataWriter entities is acknowledged by all
matched reliable DataReader entities, or else the duration specified by the max_wait parameter elapses, whichever happens
first. A return value of OK indicates that all the samples written have been acknowledged by all reliable matched data readers;
a return value of TIMEOUT indicates that max_wait elapsed before all the data was acknowledged.

so I second the proposal if somebody else also approves.
maybe @wjwwood @clalancette ?

@emersonknapp
Copy link
Contributor

This API could possibly be overkill, though, as I believe currently FastDDS hasn't even had the chance to publish the message at the time of shutdown due to happening in a separate worker thread

I'm not sure what this means, do you mean that wait_for_acknowledgments will only wait for acknowledgements of messages that the "writer thread" already started sending but might ignore sending messages that still are in the queue?

I think that in a reliable publisher, having a way to wait until all sent messages are actually sent is a good idea, but I'm not sure if wait_for_acknowledgments does exactly that.

What I'm trying to convey is -

  • the goal is for all messages to be sent. to me, this means that once the final network packet is transmitted by the publishing process, then we should be allowed to exit. I believe that this is not currently happening - e.g. we call rmw_publish, then exit the process before any network packets are actually sent, because the message is on some queue
  • wait_for_acknowledgments is only satisfied when all messages are received, meaning that any matched subscriptions have had the chance to send back an ack (or a timeout occurs). This is a more strict requirement than the preceding point. Maybe that's ok with us, I just wanted to make sure we acknowledge the difference before going ahead with it. If the timeout is short then maybe it is not important in practice - what is the default max_wait?

@ivanpauno
Copy link
Member

Thanks for clarifying @emersonknapp, the proposal sounds good to me.

This is a more strict requirement than the preceding point.

I really don't see the point of the less strict version, so I like what DDS provides in this case.

@fujitatomoya
Copy link
Collaborator

@emersonknapp @Barry-Xu-2018

i believe that wait_for_acknowledgments only works for reliable messages, not for best effort. So even with using wait_for_acknowledgments, it cannot guarantee publishing is finished before process exits with using async sending. in other words, i think ros2/rosbag2#571 stays in case of QoS reliability kind best effort.

  • if rosbag2 play QoS reliability kind is reliable (this QoS is recorded in the 1st place) or override the QoS reliability kind into reliable, it is worth to call wait_for_acknowledgments to make sure all of the messages are delivered to the receivers before process exits.
  • if rosbag2 play QoS reliability kind is best effort (recorded or overridden), wait_for_acknowledgments does nothing, and then process could exit before sending the message if async sending is used. in this case, i think eventually we do need sync send to make sure the message is on the network.

@ivanpauno
Copy link
Member

@fujitatomoya thanks, I got that from @emersonknapp comment.
But really that's what I would expect, there's not much difference between "the message didn't array because the network is unreliable" or "the message didn't array because it was waiting in a queue and it was never sent".

IIUC things like shutdown() or SO_LINGER socket option doesn't do anything for UDP sockets. So you also have the option "I sent everything that was queued, but the message was never actually sent out because it was in a kernel buffer", and IIUC that problem cannot be solved (for UDP sockets).

@Barry-Xu-2018
Copy link
Contributor Author

@wjwwood @clalancette

About this new interface, what do you think ?

@anamud
Copy link
Contributor

anamud commented Mar 24, 2021

Thanks for bringing attention to this in the middleware workgroup meeting. I have a couple of questions to clarify the idea.

  • Number of matched subscribers can constantly change since it is affected by liveliness and durability QoS policies. Can the DDS system at the publisher-side latch in the number of matched subscribers when wait_for_acknowledgements is called? If it does, then is such latching not problematic?
  • What is the problem with constructing applications based on faith that the DDS system at the publisher-side will send out messages reliably and the DDS system on the matched subscriber side will receive messages reliably when reliability QoS is set to reliable? In your specific example, I do not see why rosbag2 cannot blindly believe that messages have been received.

@fujitatomoya
Copy link
Collaborator

I am not sure if DataWriter can latch the peer matched DataReader. but this race condition always there, so my understanding is wait_for_acknowledgements can make sure the message is delivered to peer matched DataReader at the moment if QoS is reliable. API also provides the timeout option, probably this is for your concern.

even if the QoS is reliable, the process (in this case rosbag2 play) shuts down immediately after sending last message. and if the sending is async, that means the very last message might not be even on transport, then process exits. Internally DDS tries to guarantee the last message is delivered to DataReader, but the process space itself is gone. So i think there is nothing DDS can do about it, because application wants to shut down the process.

from previous WG discussion, we can see a few options to provide for rosbag2 case.

  • before shutdown, call wait_for_acknowledgements after finish sending all messages. (if QoS is reliable, this can make sure last message will be delivered, but not effective for QoS best effort)
  • option to sleep X seconds before shutdown. (No guarantee, but this can also be effective if QoS is best effort.)
  • option to wait the signal Ctrl-C via CLI. (No guarantee, but this can also be effective if QoS is best effort.)

i may be missing something, please correct me if i got anything wrong ❗

aside from rosbag2, i believe there would be some use cases to call wait_for_acknowledgements. for example, we have application that uses topic for emergency and attention levels. most of the system nodes do use them as pub/sub. if the perception /recognition leads to change level, that needs to be notified to the system nodes to change the system behavior.

@Barry-Xu-2018
Copy link
Contributor Author

@ivanpauno @fujitatomoya

Please help to review rmw implementation.

@Barry-Xu-2018
Copy link
Contributor Author

Barry-Xu-2018 commented May 11, 2021

@fujitatomoya
Copy link
Collaborator

@Barry-Xu-2018 thanks 👍

as we talked offline before, we can probably call this wait_for_all_acked in ros2 bag play? probably calling wait_for_all_acked based on rclcpp/rclpy and also call optional sleep since wait_for_all_acked does nothing if QoS is BestEffort.

it would be better to discuss on ros2/rosbag2#571 with @emersonknapp @Karsten1987

@ivanpauno
Copy link
Member

probably calling wait_for_all_acked based on rclcpp/rclpy and also call optional sleep since wait_for_all_acked does nothing if QoS is BestEffort.

It's a decision in rosbag, but I think sleep() should never be called to workaround this issues.
If the publisher is best effort, then if a message is lost shouldn't matter (if that matters, the publisher shouldn't be best effort).

@fujitatomoya
Copy link
Collaborator

i will close this issue since related system interfaces and APIs are merged in mainline.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
enhancement New feature or request
Projects
None yet
Development

No branches or pull requests

5 participants