Skip to content

Commit

Permalink
util/io: Add SyncIoBridge
Browse files Browse the repository at this point in the history
I'm doing quite a bit of stuff inside `spawn_blocking` because
I need to use some large synchronous APIs.

I found it not entirely obvious how to "bridge" the world of
async I/O with sync I/O.

Since we have a handy "tokio-util" module with an "io" feature,
these small helpers seem like a good fit hopefully!

Extracted these from my code in https://github.com/ostreedev/ostree-rs-ext/blob/main/lib/src/async_util.rs
  • Loading branch information
cgwalters committed Oct 4, 2021
1 parent 9ff7d8c commit cb4f217
Show file tree
Hide file tree
Showing 3 changed files with 128 additions and 1 deletion.
8 changes: 7 additions & 1 deletion tokio-util/src/io/mod.rs
Original file line number Diff line number Diff line change
@@ -1,16 +1,22 @@
//! Helpers for IO related tasks.
//!
//! These types are often used in combination with hyper or reqwest, as they
//! The stream types are often used in combination with hyper or reqwest, as they
//! allow converting between a hyper [`Body`] and [`AsyncRead`].
//!
//! The [`SyncIoBridge`] type converts from the world of async I/O
//! to synchronous I/O; this may often come up when using synchronous APIs
//! inside [`tokio::task::spawn_blocking`].
//!
//! [`Body`]: https://docs.rs/hyper/0.13/hyper/struct.Body.html
//! [`AsyncRead`]: tokio::io::AsyncRead

mod read_buf;
mod reader_stream;
mod stream_reader;
mod sync_bridge;

pub use self::read_buf::read_buf;
pub use self::reader_stream::ReaderStream;
pub use self::stream_reader::StreamReader;
pub use self::sync_bridge::SyncIoBridge;
pub use crate::util::{poll_read_buf, poll_write_buf};
78 changes: 78 additions & 0 deletions tokio-util/src/io/sync_bridge.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,78 @@
use std::io::{Read, Write};
use std::pin::Pin;
use tokio::io::{AsyncRead, AsyncReadExt, AsyncWrite, AsyncWriteExt};

/// Use a [`tokio::io::AsyncRead`] synchronously as a [`std::io::Read`] or
/// a [`tokio::io::AsyncWrite`] as a [`std::io::Write`].
#[derive(Debug)]
pub struct SyncIoBridge<T> {
src: Pin<Box<T>>,
rt: tokio::runtime::Handle,
}

impl<T: AsyncRead> Read for SyncIoBridge<T> {
fn read(&mut self, buf: &mut [u8]) -> std::io::Result<usize> {
let src = &mut self.src;
self.rt.block_on(src.read(buf))
}
}

impl<T: AsyncWrite> Write for SyncIoBridge<T> {
fn write(&mut self, buf: &[u8]) -> std::io::Result<usize> {
let src = &mut self.src;
self.rt.block_on(src.write(buf))
}

fn flush(&mut self) -> std::io::Result<()> {
let src = &mut self.src;
self.rt.block_on(src.flush())
}

fn write_all(&mut self, buf: &[u8]) -> std::io::Result<()> {
let src = &mut self.src;
self.rt.block_on(src.write_all(buf))
}

fn write_vectored(&mut self, bufs: &[std::io::IoSlice<'_>]) -> std::io::Result<usize> {
let src = &mut self.src;
self.rt.block_on(src.write_vectored(bufs))
}
}

// Because https://doc.rust-lang.org/std/io/trait.Write.html#method.is_write_vectored is at the time
// of this writing still unstable, we expose this as part of a standalone method.
impl<T: AsyncWrite> SyncIoBridge<T> {
/// Determines if the underlying [`tokio::io::AsyncWrite`] target supports efficient vectored writes.
///
/// See [`tokio::io::AsyncWrite::is_write_vectored`].
pub fn is_write_vectored(&self) -> bool {
self.src.is_write_vectored()
}
}

impl<T: AsyncRead> SyncIoBridge<T> {
/// Use a [`tokio::io::AsyncRead`] synchronously as a [`std::io::Read`] or
/// a [`tokio::io::AsyncWrite`] as a [`std::io::Write`].
///
/// This is useful with e.g. [`tokio::task::spawn_blocking`].
///
/// This works via capturing a handle to the current thread's runtime with [`tokio::runtime::Handle::current`].
/// Synchronous I/O will use that handle to block on the backing asynchronous source.
///
/// # Panic
///
/// This will panic if called outside the context of a Tokio runtime.
pub fn new(src: T) -> Self {
Self::new_with_handle(src, tokio::runtime::Handle::current())
}

/// Use a [`tokio::io::AsyncRead`] synchronously as a [`std::io::Read`] or
/// a [`tokio::io::AsyncWrite`] as a [`std::io::Write`].
///
/// This is the same as [`SyncIoBridge::new`], but allows passing an arbitrary handle and hence may
/// be invoked outside of an asynchronous context.
pub fn new_with_handle(src: T, rt: tokio::runtime::Handle) -> Self {
let src = Box::pin(src);
Self { src, rt }
}
}
43 changes: 43 additions & 0 deletions tokio-util/tests/io_sync_bridge.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
#![cfg(feature = "io")]

use std::error::Error;
use std::io::{Cursor, Read, Result as IoResult};
use tokio::io::AsyncRead;
use tokio_util::io::SyncIoBridge;

async fn test_reader_len(
r: impl AsyncRead + Unpin + Send + 'static,
expected_len: usize,
) -> IoResult<()> {
let mut r = SyncIoBridge::new(r);
let res = tokio::task::spawn_blocking(move || {
let mut buf = Vec::new();
r.read_to_end(&mut buf)?;
Ok::<_, std::io::Error>(buf)
})
.await?;
assert_eq!(res?.len(), expected_len);
Ok(())
}

#[tokio::test]
async fn test_async_read_to_sync() -> Result<(), Box<dyn Error>> {
test_reader_len(tokio::io::empty(), 0).await?;
let buf = b"hello world";
test_reader_len(Cursor::new(buf), buf.len()).await?;
Ok(())
}

#[tokio::test]
async fn test_async_write_to_sync() -> Result<(), Box<dyn Error>> {
let mut dest = Vec::new();
let src = b"hello world";
let dest = tokio::task::spawn_blocking(move || -> Result<_, String> {
let mut w = SyncIoBridge::new(Cursor::new(&mut dest));
std::io::copy(&mut Cursor::new(src), &mut w).map_err(|e| e.to_string())?;
Ok(dest)
})
.await??;
assert_eq!(dest.as_slice(), src);
Ok(())
}

0 comments on commit cb4f217

Please sign in to comment.