diff --git a/tokio-util/src/io/sync_bridge.rs b/tokio-util/src/io/sync_bridge.rs index 2402207584c..050b7cf54b8 100644 --- a/tokio-util/src/io/sync_bridge.rs +++ b/tokio-util/src/io/sync_bridge.rs @@ -6,6 +6,258 @@ use tokio::io::{ /// Use a [`tokio::io::AsyncRead`] synchronously as a [`std::io::Read`] or /// a [`tokio::io::AsyncWrite`] as a [`std::io::Write`]. +/// +/// # Alternatives +/// +/// In many cases, there are better alternatives to using `SyncIoBridge`, especially +/// if you want to avoid blocking the async runtime. Consider the following scenarios: +/// +/// When hashing data, using `SyncIoBridge` can lead to suboptimal performance and +/// might not fully leverage the async capabilities of the system. +/// +/// ### Why It Matters: +/// +/// `SyncIoBridge` allows you to use synchronous I/O operations in an asynchronous +/// context by blocking the current thread. However, this can be inefficient because: +/// +/// - **Blocking**: The use of `SyncIoBridge` may block a valuable async runtime +/// thread, which could otherwise be used to handle more tasks concurrently. +/// +/// - **Thread Pool Saturation**: If many threads are blocked using `SyncIoBridge`, +/// it can exhaust the async runtime's thread pool, leading to increased latency +/// and reduced throughput. +/// +/// - **Lack of Parallelism**: By blocking on synchronous operations, you may miss +/// out on the benefits of running tasks concurrently, especially in I/O-bound +/// operations where async tasks could be interleaved. +/// +/// ## Example 1: Hashing Data +/// +/// The use of `SyncIoBridge` is unnecessary when hashing data. Instead, you can +/// process the data asynchronously by reading it into memory, which avoids blocking +/// the async runtime. +/// +/// There are two strategies for avoiding `SyncIoBridge` when hashing data. When +/// the data fits into memory, the easiest is to read the data into a `Vec` +/// and hash it: +/// +/// ```rust +/// use tokio::io::AsyncReadExt; +/// use tokio::io::AsyncRead; +/// use std::io::Cursor; +/// # mod blake3 { pub fn hash(_: &[u8]) {} } +/// +/// async fn hash_contents(mut reader: impl AsyncRead + Unpin) -> Result<(), std::io::Error> { +/// // Read all data from the reader into a Vec. +/// let mut data = Vec::new(); +/// reader.read_to_end(&mut data).await?; +/// +/// // Hash the data using the blake3 hashing function. +/// let hash = blake3::hash(&data); +/// +/// Ok(()) +///} +/// +/// #[tokio::main] +/// async fn main() -> Result<(), std::io::Error> { +/// // Example: In-memory data. +/// let data = b"Hello, world!"; // A byte slice. +/// let reader = Cursor::new(data); // Create an in-memory AsyncRead. +/// hash_contents(reader).await +/// } +/// ``` +/// Explanation: This example demonstrates how to asynchronously read data from a +/// reader into memory and hash it using a synchronous hashing function. The +/// `SyncIoBridge` is avoided, ensuring that the async runtime is not blocked. +/// +/// +/// When the data doesn't fit into memory, the hashing library will usually +/// provide a `hasher` that you can repeatedly call `update` on to hash the data +/// one chunk at the time. For example: +/// +/// ```rust +/// use tokio::io::AsyncReadExt; +/// use tokio::io::AsyncRead; +/// use std::io::Cursor; +/// # struct Hasher; +/// # impl Hasher { pub fn update(&mut self, _: &[u8]) {} pub fn finalize(&self) {} } +/// +/// /// Asynchronously streams data from an async reader, processes it in chunks, +/// /// and hashes the data incrementally. +/// async fn hash_stream(mut reader: impl AsyncRead + Unpin, mut hasher: Hasher) -> Result<(), std::io::Error> { +/// // Create a buffer to read data into, sized for performance. +/// let mut data = vec![0; 64 * 1024]; +/// loop { +/// // Read data from the reader into the buffer. +/// let len = reader.read(&mut data).await?; +/// if len == 0 { break; } // Exit loop if no more data. +/// +/// // Update the hash with the data read. +/// hasher.update(&data[..len]); +/// } +/// +/// // Finalize the hash after all data has been processed. +/// let hash = hasher.finalize(); +/// +/// Ok(()) +///} +/// +/// #[tokio::main] +/// async fn main() -> Result<(), std::io::Error> { +/// // Example: In-memory data. +/// let data = b"Hello, world!"; // A byte slice. +/// let reader = Cursor::new(data); // Create an in-memory AsyncRead. +/// let hasher = Hasher; +/// hash_stream(reader, hasher).await +/// } +/// ``` +/// Explanation: This example demonstrates how to asynchronously stream data in +/// chunks for hashing. Each chunk is read asynchronously, and the hash is updated +/// incrementally. This avoids blocking and improves performance over using +/// `SyncIoBridge`. +/// +/// +/// ## Example 2: Compressing Data +/// +/// When compressing data, the use of `SyncIoBridge` is unnecessary as it introduces +/// blocking and inefficient code. Instead, you can utilize an async compression library +/// such as the [`async-compression`](https://docs.rs/async-compression/latest/async_compression/) +/// crate, which is built to handle asynchronous data streams efficiently. +/// +/// ```ignore +/// use async_compression::tokio::write::GzipEncoder; +/// use tokio::io::AsyncReadExt; +/// use tokio::io::Cursor; +/// use std::io::AsyncRead; +/// +/// /// Asynchronously compresses data from an async reader using Gzip and an async encoder. +/// async fn compress_data(mut reader: impl AsyncRead + Unpin) -> Result<(), std::io::Error> { +/// let mut writer = tokio::io::sink(); +/// +/// // Create a Gzip encoder that wraps the writer. +/// let mut encoder = GzipEncoder::new(writer); +/// +/// // Copy data from the reader to the encoder, compressing it. +/// tokio::io::copy(&mut reader, &mut encoder).await?; +/// +/// Ok(()) +///} +/// +/// #[tokio::main] +/// async fn main() -> Result<(), std::io::Error> { +/// // Example: In-memory data. +/// let data = b"Hello, world!"; // A byte slice. +/// let reader = Cursor::new(data); // Create an in-memory AsyncRead. +/// compresses_data(reader).await +/// } +/// ``` +/// Explanation: This example shows how to asynchronously compress data using an +/// async compression library. By reading and writing asynchronously, it avoids +/// blocking and is more efficient than using `SyncIoBridge` with a non-async +/// compression library. +/// +/// +/// ## Example 3: Parsing `JSON` +/// +/// +/// `SyncIoBridge` is not ideal when parsing data formats such as `JSON`, as it +/// blocks async operations. A more efficient approach is to read data asynchronously +/// into memory and then `deserialize` it, avoiding unnecessary synchronization overhead. +/// +/// ```rust,no_run +/// use tokio::io::AsyncRead; +/// use tokio::io::AsyncReadExt; +/// use std::io::Cursor; +/// # mod serde { +/// # pub trait DeserializeOwned: 'static {} +/// # impl DeserializeOwned for T {} +/// # } +/// # mod serde_json { +/// # use super::serde::DeserializeOwned; +/// # pub fn from_slice(_: &[u8]) -> Result { +/// # unimplemented!() +/// # } +/// # } +/// # #[derive(Debug)] struct MyStruct; +/// +/// +/// async fn parse_json(mut reader: impl AsyncRead + Unpin) -> Result { +/// // Read all data from the reader into a Vec. +/// let mut data = Vec::new(); +/// reader.read_to_end(&mut data).await?; +/// +/// // Deserialize the data from the Vec into a MyStruct instance. +/// let value: MyStruct = serde_json::from_slice(&data)?; +/// +/// Ok(value) +///} +/// +/// #[tokio::main] +/// async fn main() -> Result<(), std::io::Error> { +/// // Example: In-memory data. +/// let data = b"Hello, world!"; // A byte slice. +/// let reader = Cursor::new(data); // Create an in-memory AsyncRead. +/// parse_json(reader).await?; +/// Ok(()) +/// } +/// ``` +/// Explanation: This example shows how to asynchronously read data into memory +/// and then parse it as `JSON`. By avoiding `SyncIoBridge`, the asynchronous runtime +/// remains unblocked, leading to better performance when working with asynchronous +/// I/O streams. +/// +/// +/// ## Correct Usage of `SyncIoBridge` inside `spawn_blocking` +/// +/// `SyncIoBridge` is mainly useful when you need to interface with synchronous +/// libraries from an asynchronous context. Here is how you can do it correctly: +/// +/// ```rust +/// use tokio::task::spawn_blocking; +/// use tokio_util::io::SyncIoBridge; +/// use tokio::io::AsyncRead; +/// use std::marker::Unpin; +/// use std::io::Cursor; +/// +/// /// Wraps an async reader with `SyncIoBridge` and performs syncrhonous I/O operations in a blocking task. +/// async fn process_sync_io(reader: impl AsyncRead + Unpin + Send + 'static) -> Result, std::io::Error> { +/// // Wrap the async reader with `SyncIoBridge` to allow synchronous reading. +/// let mut sync_reader = SyncIoBridge::new(reader); +/// +/// // Spawn a blocking task to perform synchronous I/O operations. +/// let result = spawn_blocking(move || { +/// // Create an in-memory buffer to hold the copied data. +/// let mut buffer = Vec::new(); +/// +/// // Copy data from the sync_reader to the buffer. +/// std::io::copy(&mut sync_reader, &mut buffer)?; +/// +/// // Return the buffer containing the copied data. +/// Ok::<_, std::io::Error>(buffer) +/// }) +/// .await??; +/// +/// // Return the result from the blocking task. +/// Ok(result) +///} +/// +/// #[tokio::main] +/// async fn main() -> Result<(), std::io::Error> { +/// // Example: In-memory data. +/// let data = b"Hello, world!"; // A byte slice. +/// let reader = Cursor::new(data); // Create an in-memory AsyncRead. +/// let result = process_sync_io(reader).await?; +/// +/// // You can use `result` here as needed. +/// +/// Ok(()) +/// } +/// ``` +/// Explanation: This example shows how to use `SyncIoBridge` inside a `spawn_blocking` +/// task to safely perform synchronous I/O without blocking the async runtime. The +/// `spawn_blocking` ensures that the synchronous code is offloaded to a dedicated +/// thread pool, preventing it from interfering with the async tasks. +/// #[derive(Debug)] pub struct SyncIoBridge { src: T,