Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Document cancellation safety #3900

Merged
merged 2 commits into from
Jun 29, 2021
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 11 additions & 4 deletions tokio/src/io/util/async_read_ext.rs
Original file line number Diff line number Diff line change
Expand Up @@ -105,8 +105,10 @@ cfg_io_util! {
/// async fn read(&mut self, buf: &mut [u8]) -> io::Result<usize>;
/// ```
///
/// This function does not provide any guarantees about whether it
/// completes immediately or asynchronously
/// This method does not provide any guarantees about whether it
Darksonn marked this conversation as resolved.
Show resolved Hide resolved
/// completes immediately or asynchronously, however the method is
/// cancellation safe, so it can safely be used as the event in a
/// [`select!`](crate::select) statement without loss of data.
///
/// # Return
///
Expand Down Expand Up @@ -177,8 +179,10 @@ cfg_io_util! {
/// Usually, only a single `read` syscall is issued, even if there is
/// more space in the supplied buffer.
///
/// This function does not provide any guarantees about whether it
/// completes immediately or asynchronously
/// This method does not provide any guarantees about whether it
/// completes immediately or asynchronously, however the method is
/// cancellation safe, so it can safely be used as the event in a
/// [`select!`](crate::select) statement without loss of data.
///
/// # Return
///
Expand Down Expand Up @@ -246,6 +250,9 @@ cfg_io_util! {
/// This function reads as many bytes as necessary to completely fill
/// the specified buffer `buf`.
///
/// This method is not cancellation safe, and any partially read data is
/// lost on cancellation.
///
/// # Errors
///
/// If the operation encounters an "end of file" before completely
Expand Down
21 changes: 21 additions & 0 deletions tokio/src/io/util/async_write_ext.rs
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,11 @@ cfg_io_util! {
/// error. A call to `write` represents *at most one* attempt to write to
/// any wrapped object.
///
/// This method is cancellation safe in the sense that if it is used as
/// the event in a [`tokio::select!`](crate::select) statement and some
/// other branch completes first, then it is guaranteed that no data was
/// written to this `AsyncWrite`.
///
/// # Return
///
/// If the return value is `Ok(n)` then it must be guaranteed that `n <=
Expand Down Expand Up @@ -129,6 +134,11 @@ cfg_io_util! {
///
/// See [`AsyncWrite::poll_write_vectored`] for more details.
///
/// This method is cancellation safe in the sense that if it is used as
/// the event in a [`tokio::select!`](crate::select) statement and some
/// other branch completes first, then it is guaranteed that no data was
/// written to this `AsyncWrite`.
///
/// # Examples
///
/// ```no_run
Expand Down Expand Up @@ -178,6 +188,11 @@ cfg_io_util! {
/// A call to `write_buf` represents *at most one* attempt to write to any
/// wrapped object.
///
/// This method is cancellation safe in the sense that if it is used as
/// the event in a [`tokio::select!`](crate::select) statement and some
/// other branch completes first, then it is guaranteed that no data was
/// written to this `AsyncWrite`.
///
/// # Return
///
/// If the return value is `Ok(n)` then it must be guaranteed that `n <=
Expand Down Expand Up @@ -300,6 +315,12 @@ cfg_io_util! {
/// has been successfully written or such an error occurs. The first
/// error generated from this method will be returned.
///
/// This method is not cancellation safe. If it is used as the event
/// in a [`tokio::select!`](crate::select) statement and some other
/// branch completes first, then the provided buffer may have been
/// partially written, but future calls to `write_all` will start over
/// from the beginning of the buffer.
///
/// # Errors
///
/// This function will return the first error that [`write`] returns.
Expand Down
214 changes: 131 additions & 83 deletions tokio/src/macros/select.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,10 +23,10 @@
/// returns the result of evaluating the completed branch's `<handler>`
/// expression.
///
/// Additionally, each branch may include an optional `if` precondition. This
/// precondition is evaluated **before** the `<async expression>`. If the
/// precondition returns `false`, the branch is entirely disabled. This
/// capability is useful when using `select!` within a loop.
/// Additionally, each branch may include an optional `if` precondition. If the
/// precondition returns `false`, then the branch is disabled. The provided
/// `<async expression>` is still evaluated but the resulting future is never
/// polled. This capability is useful when using `select!` within a loop.
///
/// The complete lifecycle of a `select!` expression is as follows:
///
Expand All @@ -42,12 +42,10 @@
/// to the provided `<pattern>`, if the pattern matches, evaluate `<handler>`
/// and return. If the pattern **does not** match, disable the current branch
/// and for the remainder of the current call to `select!`. Continue from step 3.
/// 5. If **all** branches are disabled, evaluate the `else` expression. If none
/// is provided, panic.
/// 5. If **all** branches are disabled, evaluate the `else` expression. If no
/// else branch is provided, panic.
///
/// # Notes
///
/// ### Runtime characteristics
/// # Runtime characteristics
///
/// By running all async expressions on the current task, the expressions are
/// able to run **concurrently** but not in **parallel**. This means all
Expand All @@ -58,76 +56,7 @@
///
/// [`tokio::spawn`]: crate::spawn
///
/// ### Avoid racy `if` preconditions
///
/// Given that `if` preconditions are used to disable `select!` branches, some
/// caution must be used to avoid missing values.
///
/// For example, here is **incorrect** usage of `sleep` with `if`. The objective
/// is to repeatedly run an asynchronous task for up to 50 milliseconds.
/// However, there is a potential for the `sleep` completion to be missed.
///
/// ```no_run
/// use tokio::time::{self, Duration};
///
/// async fn some_async_work() {
/// // do work
/// }
///
/// #[tokio::main]
/// async fn main() {
/// let sleep = time::sleep(Duration::from_millis(50));
/// tokio::pin!(sleep);
///
/// while !sleep.is_elapsed() {
/// tokio::select! {
/// _ = &mut sleep, if !sleep.is_elapsed() => {
/// println!("operation timed out");
/// }
/// _ = some_async_work() => {
/// println!("operation completed");
/// }
/// }
/// }
/// }
/// ```
///
/// In the above example, `sleep.is_elapsed()` may return `true` even if
/// `sleep.poll()` never returned `Ready`. This opens up a potential race
/// condition where `sleep` expires between the `while !sleep.is_elapsed()`
/// check and the call to `select!` resulting in the `some_async_work()` call to
/// run uninterrupted despite the sleep having elapsed.
///
/// One way to write the above example without the race would be:
///
/// ```
/// use tokio::time::{self, Duration};
///
/// async fn some_async_work() {
/// # time::sleep(Duration::from_millis(10)).await;
/// // do work
/// }
///
/// #[tokio::main]
/// async fn main() {
/// let sleep = time::sleep(Duration::from_millis(50));
/// tokio::pin!(sleep);
///
/// loop {
/// tokio::select! {
/// _ = &mut sleep => {
/// println!("operation timed out");
/// break;
/// }
/// _ = some_async_work() => {
/// println!("operation completed");
/// }
/// }
/// }
/// }
/// ```
///
/// ### Fairness
/// # Fairness
///
/// By default, `select!` randomly picks a branch to check first. This provides
/// some level of fairness when calling `select!` in a loop with branches that
Expand All @@ -151,10 +80,58 @@
///
/// # Panics
///
/// `select!` panics if all branches are disabled **and** there is no provided
/// `else` branch. A branch is disabled when the provided `if` precondition
/// returns `false` **or** when the pattern does not match the result of `<async
/// expression>`.
/// The `select!` macro panics if all branches are disabled **and** there is no
/// provided `else` branch. A branch is disabled when the provided `if`
/// precondition returns `false` **or** when the pattern does not match the
/// result of `<async expression>`.
///
/// # Cancellation safety
///
/// When using `select!` in a loop to receive messages from multiple sources,
/// you should make sure that the receive call is cancellation safe to avoid
/// losing messages. The lists in this section are not exhaustive.
///
/// The following methods are cancellation safe:
///
/// * [`tokio::sync::mpsc::Receiver::recv`](crate::sync::mpsc::Receiver::recv)
/// * [`tokio::sync::mpsc::UnboundedReceiver::recv`](crate::sync::mpsc::UnboundedReceiver::recv)
/// * [`tokio::sync::broadcast::Receiver::recv`](crate::sync::broadcast::Receiver::recv)
/// * [`tokio::sync::watch::Receiver::changed`](crate::sync::watch::Receiver::changed)
/// * [`tokio::net::TcpListener::accept`](crate::net::TcpListener::accept)
/// * [`tokio::net::UnixListener::accept`](crate::net::UnixListener::accept)
/// * [`tokio::io::AsyncReadExt::read`](crate::io::AsyncReadExt::read) on any `AsyncRead`
/// * [`tokio::io::AsyncReadExt::read_buf`](crate::io::AsyncReadExt::read_buf) on any `AsyncRead`
/// * [`tokio::io::AsyncWriteExt::write`](crate::io::AsyncWriteExt::write) on any `AsyncWrite`
/// * [`tokio::io::AsyncWriteExt::write_buf`](crate::io::AsyncWriteExt::write_buf) on any `AsyncWrite`
/// * [`tokio_stream::StreamExt::next`](https://docs.rs/tokio-stream/0.1/tokio_stream/trait.StreamExt.html#method.next) on any `Stream`
/// * [`futures::stream::StreamExt::next`](https://docs.rs/futures/0.3/futures/stream/trait.StreamExt.html#method.next) on any `Stream`
///
/// The following methods are not cancellation safe and can lead to loss of data:
///
/// * [`tokio::io::AsyncReadExt::read_exact`](crate::io::AsyncReadExt::read_exact)
/// * [`tokio::io::AsyncReadExt::read_to_end`](crate::io::AsyncReadExt::read_to_end)
/// * [`tokio::io::AsyncReadExt::read_to_string`](crate::io::AsyncReadExt::read_to_string)
/// * [`tokio::io::AsyncWriteExt::write_all`](crate::io::AsyncWriteExt::write_all)
///
/// The following methods are not cancellation safe because they use a queue for
/// fairness and cancellation makes you lose your place in the queue:
///
/// * [`tokio::sync::Mutex::lock`](crate::sync::Mutex::lock)
/// * [`tokio::sync::RwLock::read`](crate::sync::RwLock::read)
/// * [`tokio::sync::RwLock::write`](crate::sync::RwLock::write)
/// * [`tokio::sync::Semaphore::acquire`](crate::sync::Semaphore::acquire)
/// * [`tokio::sync::Notify::notified`](crate::sync::Notify::notified)
///
/// To determine whether your own methods are cancellation safe, look for the
/// location of uses of `.await`. This is because when an asynchronous method is
/// cancelled, that always happens at an `.await`. If your function behaves
/// correctly even if it is restarted while waiting at an `.await`, then it is
/// cancellation safe.
///
/// Be aware that cancelling something that is not cancellation safe is not
/// necessarily wrong. For example, if you are cancelling a task because the
/// application is shutting down, then you might not care that partially read
/// data is lost.
///
/// # Examples
///
Expand Down Expand Up @@ -338,6 +315,77 @@
/// }
/// }
/// ```
///
/// ## Avoid racy `if` preconditions
///
/// Given that `if` preconditions are used to disable `select!` branches, some
/// caution must be used to avoid missing values.
///
/// For example, here is **incorrect** usage of `sleep` with `if`. The objective
/// is to repeatedly run an asynchronous task for up to 50 milliseconds.
/// However, there is a potential for the `sleep` completion to be missed.
///
/// ```no_run,should_panic
/// use tokio::time::{self, Duration};
///
/// async fn some_async_work() {
/// // do work
/// }
///
/// #[tokio::main]
/// async fn main() {
/// let sleep = time::sleep(Duration::from_millis(50));
/// tokio::pin!(sleep);
///
/// while !sleep.is_elapsed() {
/// tokio::select! {
/// _ = &mut sleep, if !sleep.is_elapsed() => {
/// println!("operation timed out");
/// }
/// _ = some_async_work() => {
/// println!("operation completed");
/// }
/// }
/// }
///
/// panic!("This example shows how not to do it!");
/// }
/// ```
///
/// In the above example, `sleep.is_elapsed()` may return `true` even if
/// `sleep.poll()` never returned `Ready`. This opens up a potential race
/// condition where `sleep` expires between the `while !sleep.is_elapsed()`
/// check and the call to `select!` resulting in the `some_async_work()` call to
/// run uninterrupted despite the sleep having elapsed.
///
/// One way to write the above example without the race would be:
///
/// ```
/// use tokio::time::{self, Duration};
///
/// async fn some_async_work() {
/// # time::sleep(Duration::from_millis(10)).await;
/// // do work
/// }
///
/// #[tokio::main]
/// async fn main() {
/// let sleep = time::sleep(Duration::from_millis(50));
/// tokio::pin!(sleep);
///
/// loop {
/// tokio::select! {
/// _ = &mut sleep => {
/// println!("operation timed out");
/// break;
/// }
/// _ = some_async_work() => {
/// println!("operation completed");
/// }
/// }
/// }
/// }
/// ```
#[macro_export]
#[cfg_attr(docsrs, doc(cfg(feature = "macros")))]
macro_rules! select {
Expand Down
4 changes: 4 additions & 0 deletions tokio/src/net/tcp/listener.rs
Original file line number Diff line number Diff line change
Expand Up @@ -125,6 +125,10 @@ impl TcpListener {
/// established, the corresponding [`TcpStream`] and the remote peer's
/// address will be returned.
///
/// This method is cancellation safe in the sense that new connections
/// cannot be lost when using it as the event in a [`select!`](crate::select)
/// statement.
///
/// [`TcpStream`]: struct@crate::net::TcpStream
///
/// # Examples
Expand Down
18 changes: 18 additions & 0 deletions tokio/src/net/tcp/stream.rs
Original file line number Diff line number Diff line change
Expand Up @@ -356,6 +356,10 @@ impl TcpStream {
/// can be used to concurrently read / write to the same socket on a single
/// task without splitting the socket.
///
/// This method is cancellation safe in the sense that readiness events
/// cannot be lost when using it as the event in a [`select!`](crate::select)
/// statement.
///
/// # Examples
///
/// Concurrently read and write to the stream on the same task without
Expand Down Expand Up @@ -420,6 +424,10 @@ impl TcpStream {
/// This function is equivalent to `ready(Interest::READABLE)` and is usually
/// paired with `try_read()`.
///
/// This method is cancellation safe in the sense that readiness events
/// cannot be lost when using it as the event in a [`select!`](crate::select)
/// statement.
///
/// # Examples
///
/// ```no_run
Expand Down Expand Up @@ -725,6 +733,10 @@ impl TcpStream {
/// This function is equivalent to `ready(Interest::WRITABLE)` and is usually
/// paired with `try_write()`.
///
/// This method is cancellation safe in the sense that readiness events
/// cannot be lost when using it as the event in a [`select!`](crate::select)
/// statement.
///
/// # Examples
///
/// ```no_run
Expand Down Expand Up @@ -1152,6 +1164,12 @@ impl TcpStream {
split_owned(self)
}

// == Poll IO functions that takes `&self` ==
//
// To read or write without mutable access to the `UnixStream`, combine the
// `poll_read_ready` or `poll_write_ready` methods with the `try_read` or
// `try_write` methods.

pub(crate) fn poll_read_priv(
&self,
cx: &mut Context<'_>,
Expand Down
Loading