Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

How do I wait for closure of a tokio::net::TcpStream? #483

Closed
vi opened this issue Jul 14, 2018 · 17 comments
Closed

How do I wait for closure of a tokio::net::TcpStream? #483

vi opened this issue Jul 14, 2018 · 17 comments

Comments

@vi
Copy link
Contributor

vi commented Jul 14, 2018

Is there something like a future that resolves when given TCP socket is closed, without writing or reading anything to/from that socket?

@kpp
Copy link
Contributor

kpp commented Jul 20, 2018

@tobz
Copy link
Member

tobz commented Aug 1, 2018

@vi Were you able to work through the issue after taking a peek at that example code?

Would be good to close this out if so. 👍

@vi
Copy link
Contributor Author

vi commented Aug 1, 2018

This example actively reads bytes from the socket.

My use case is waiting for closure of backpressured TCP socket without reading anything from it.

Also I'm interested in a being notified when socket stops being writeable (peer doing shutdown(SHUT_RD)) while still supplying bytes for reading, without actually writing any stray bytes to it (just to see if it's error or not).

Imagine client submits us heavy requests, which we asynchronously reply when ready. Requests are rate-limited by limiting reads to the socket (backpressure). But we want to abort processing current request and drop all pending requests in socket read buffer if socket is closed (client gone).

@vi
Copy link
Contributor Author

vi commented Aug 1, 2018

Shall I make a modified print_each_packet example illustrating the problem?

@tobz
Copy link
Member

tobz commented Aug 1, 2018

I think having an example of the problem would be very helpful, yes!

@vi
Copy link
Contributor Author

vi commented Aug 1, 2018

Here is the modified example: https://gist.github.com/vi/cdf2cacf01b82b54ae630c6c4cde921b

What to fill in to the wait_for_socket_closed_future?

Pressing Ctrl+C in connected netcat/socat should immediately abort the "calculation" (including currently running), not after 1-2 more enties.

@tobz
Copy link
Member

tobz commented Aug 1, 2018

Yeah, I'm not really sure that you can achieve what you want to achieve with that code structure.

Notifications on your socket should be serialized IIRC i.e. if you have data waiting, you're sorta required to process it before the end of the stream can occur to "break out" and let you get to the "socket closed" code.

In that sense, you can't detect the condition until you've gotten through the existing good stuff.

The best I can think of is that you actually run a separate task, which you communicate with via an mpsc channel, where you feed it the items to process, and give the task the receiving side of a oneshot channel. This way, your normal framed stream is simply handling off messages as fast as it can, which if the stream closes, lets you get to the "socket closed" combinator a little faster, where you would then send a signal via the oneshot and your processing task would select on that future to short circuit the processing.

I hacked up your example, and it likely doesn't compile, but I think it illustrates my point: https://gist.github.com/tobz/5b8540ff33f03e307f1049503b266f89

@vi
Copy link
Contributor Author

vi commented Aug 1, 2018

"handling off messages as fast as it can" contradicts with "backpressure". I need both backpressure and instant-abort.

To do it for real, it should be supported at syscall level (handling third event "hangup", not just "read" and "write"). I see such specifier in mio for Unix platform.

Also where is the interrupt flag in your example on gist? How do I abort already running thing on TCP RST or FIN packet in your scheme?

@tobz
Copy link
Member

tobz commented Aug 2, 2018

We should probably take this to the gitter chat; it'd be easier than the back-and-forth over a GH issue.

At any rate, the example doesn't actually receive as fast as it can: it uses a bounded channel to only store a maximum of 1 message at a time, so only one can be processing and one can be sent (but not yet received by the processor) before the receive side of the server stops taking in data. The point of my statement was to draw your attention to the split nature of the tasks, letting one do the processing and the other do the receiving so you could accurately capture the close signal by way of the combinator being executed.

If you read the example, you can see I'm sending a message over close_tx in the and_then call which represents the socket closing. That message triggers the receiver -- close_rx -- to complete, which when selected will break the accept of messages in the processor. You could simply clone that receiver and also poll it within your processing code.

This is getting out of my range of knowledge when it comes to the precise order of operations of a hangup being detected while a task is actively reading from the RX buffer of a TCP socket. Others might know.

@vi
Copy link
Contributor Author

vi commented Aug 2, 2018

Implemented myself something like this: https://gist.github.com/vi/65e2c68fbed3baacce2a18b56d875070
It is actually able to swiftly interrupt the job when socket goes away.

It relies on tokio_tcp::TcpStream::try_clone, which seems to be not available on crates.io.

The trick (actually hinted by somebody on IRC) is to use poll_read_ready(Ready::empty()). It remains NotReady even when there are data available (because of empty set of events to be waited for) for reading and only becomes Ready when hangup (because of HUP event is unignorable in epoll_ctr and delivered anyway even with empty waiting set).

Is it reasonably correct way to do this? Shall something like that be in Tokio itself? Will it work on Windows?


P.S. Now joined Tokio room on Gitter again.

@tobz
Copy link
Member

tobz commented Aug 3, 2018

Nice! That's an interesting approach.

I'm pretty sure that @carllerche has mentioned that hangups are very platform dependent and aren't available consistently, which would mean that there's no good way for mio/tokio to expose it.

@vi
Copy link
Contributor Author

vi commented Aug 3, 2018

This approach also duplicates the file descriptors due to try_clone. It is not a good idea to waste two FDs for one client on a loaded system.

epoll_wait tells you when socket hangs up (EPOLLHUP bit set in the epoll_event.events). Mio/Tokio's job is to avoid "spilling it away" while delivering this information to my program.

I'm not sure about Windows although. "always keep an overlapped read pending" seems to contradict backpressure.

@carllerche
Copy link
Member

You can ask to get notified on HUP by calling poll_read_ready and passing Ready::hup() as the interest.

Hopefully this works. I'm going to close the issue, but feel free to continue commenting.

@vi
Copy link
Contributor Author

vi commented Aug 8, 2018

How do I do it without cloning the socket? I want both normal Future and hup-only Future to use the same registration.

@vi
Copy link
Contributor Author

vi commented Nov 16, 2020

poll_read_ready seems to be gone from Tokio 0.3.

How do I watch for incoming RSTs of TcpStreams without reading data from them (hence provoking sending more data, breaking backpressure) now?

@vi
Copy link
Contributor Author

vi commented Nov 16, 2020

Maybe Tokio should maintain a correct TCP port forwarder example, with backpressure, with correct shutdown(2) handling, with correct propagation of connection resets, without externally visible buffering?

@Darksonn
Copy link
Contributor

Please open a new issue (or perhaps a discussion).

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

5 participants