Skip to content

Commit

Permalink
stream: add examples to wrapper types (#7024)
Browse files Browse the repository at this point in the history
  • Loading branch information
joshka authored Dec 10, 2024
1 parent 79a2afa commit 6d15c6c
Show file tree
Hide file tree
Showing 11 changed files with 239 additions and 0 deletions.
23 changes: 23 additions & 0 deletions tokio-stream/src/wrappers/broadcast.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,29 @@ use std::task::{ready, Context, Poll};

/// A wrapper around [`tokio::sync::broadcast::Receiver`] that implements [`Stream`].
///
/// # Example
///
/// ```
/// use tokio::sync::broadcast;
/// use tokio_stream::wrappers::BroadcastStream;
/// use tokio_stream::StreamExt;
///
/// # #[tokio::main(flavor = "current_thread")]
/// # async fn main() -> Result<(), tokio::sync::broadcast::error::SendError<u8>> {
/// let (tx, rx) = broadcast::channel(16);
/// tx.send(10)?;
/// tx.send(20)?;
/// # // prevent the doc test from hanging
/// drop(tx);
///
/// let mut stream = BroadcastStream::new(rx);
/// assert_eq!(stream.next().await, Some(Ok(10)));
/// assert_eq!(stream.next().await, Some(Ok(20)));
/// assert_eq!(stream.next().await, None);
/// # Ok(())
/// # }
/// ```
///
/// [`tokio::sync::broadcast::Receiver`]: struct@tokio::sync::broadcast::Receiver
/// [`Stream`]: trait@futures_core::Stream
#[cfg_attr(docsrs, doc(cfg(feature = "sync")))]
Expand Down
20 changes: 20 additions & 0 deletions tokio-stream/src/wrappers/interval.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,26 @@ use tokio::time::{Instant, Interval};

/// A wrapper around [`Interval`] that implements [`Stream`].
///
/// # Example
///
/// ```
/// use tokio::time::{Duration, Instant, interval};
/// use tokio_stream::wrappers::IntervalStream;
/// use tokio_stream::StreamExt;
///
/// # #[tokio::main(flavor = "current_thread")]
/// # async fn main() {
/// let start = Instant::now();
/// let interval = interval(Duration::from_millis(10));
/// let mut stream = IntervalStream::new(interval);
/// for _ in 0..3 {
/// if let Some(instant) = stream.next().await {
/// println!("elapsed: {:.1?}", instant.duration_since(start));
/// }
/// }
/// # }
/// ```
///
/// [`Interval`]: struct@tokio::time::Interval
/// [`Stream`]: trait@crate::Stream
#[derive(Debug)]
Expand Down
18 changes: 18 additions & 0 deletions tokio-stream/src/wrappers/lines.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,24 @@ use tokio::io::{AsyncBufRead, Lines};
pin_project! {
/// A wrapper around [`tokio::io::Lines`] that implements [`Stream`].
///
/// # Example
///
/// ```
/// use tokio::io::AsyncBufReadExt;
/// use tokio_stream::wrappers::LinesStream;
/// use tokio_stream::StreamExt;
///
/// # #[tokio::main(flavor = "current_thread")]
/// # async fn main() -> std::io::Result<()> {
/// let input = b"Hello\nWorld\n";
/// let mut stream = LinesStream::new(input.lines());
/// while let Some(line) = stream.next().await {
/// println!("{}", line?);
/// }
/// # Ok(())
/// # }
/// ```
///
/// [`tokio::io::Lines`]: struct@tokio::io::Lines
/// [`Stream`]: trait@crate::Stream
#[derive(Debug)]
Expand Down
23 changes: 23 additions & 0 deletions tokio-stream/src/wrappers/mpsc_bounded.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,29 @@ use tokio::sync::mpsc::Receiver;

/// A wrapper around [`tokio::sync::mpsc::Receiver`] that implements [`Stream`].
///
/// # Example
///
/// ```
/// use tokio::sync::mpsc;
/// use tokio_stream::wrappers::ReceiverStream;
/// use tokio_stream::StreamExt;
///
/// # #[tokio::main(flavor = "current_thread")]
/// # async fn main() -> Result<(), tokio::sync::mpsc::error::SendError<u8>> {
/// let (tx, rx) = mpsc::channel(2);
/// tx.send(10).await?;
/// tx.send(20).await?;
/// # // prevent the doc test from hanging
/// drop(tx);
///
/// let mut stream = ReceiverStream::new(rx);
/// assert_eq!(stream.next().await, Some(10));
/// assert_eq!(stream.next().await, Some(20));
/// assert_eq!(stream.next().await, None);
/// # Ok(())
/// # }
/// ```
///
/// [`tokio::sync::mpsc::Receiver`]: struct@tokio::sync::mpsc::Receiver
/// [`Stream`]: trait@crate::Stream
#[derive(Debug)]
Expand Down
23 changes: 23 additions & 0 deletions tokio-stream/src/wrappers/mpsc_unbounded.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,29 @@ use tokio::sync::mpsc::UnboundedReceiver;

/// A wrapper around [`tokio::sync::mpsc::UnboundedReceiver`] that implements [`Stream`].
///
/// # Example
///
/// ```
/// use tokio::sync::mpsc;
/// use tokio_stream::wrappers::UnboundedReceiverStream;
/// use tokio_stream::StreamExt;
///
/// # #[tokio::main(flavor = "current_thread")]
/// # async fn main() -> Result<(), tokio::sync::mpsc::error::SendError<u8>> {
/// let (tx, rx) = mpsc::unbounded_channel();
/// tx.send(10)?;
/// tx.send(20)?;
/// # // prevent the doc test from hanging
/// drop(tx);
///
/// let mut stream = UnboundedReceiverStream::new(rx);
/// assert_eq!(stream.next().await, Some(10));
/// assert_eq!(stream.next().await, Some(20));
/// assert_eq!(stream.next().await, None);
/// # Ok(())
/// # }
/// ```
///
/// [`tokio::sync::mpsc::UnboundedReceiver`]: struct@tokio::sync::mpsc::UnboundedReceiver
/// [`Stream`]: trait@crate::Stream
#[derive(Debug)]
Expand Down
18 changes: 18 additions & 0 deletions tokio-stream/src/wrappers/read_dir.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,24 @@ use tokio::fs::{DirEntry, ReadDir};

/// A wrapper around [`tokio::fs::ReadDir`] that implements [`Stream`].
///
/// # Example
///
/// ```
/// use tokio::fs::read_dir;
/// use tokio_stream::{StreamExt, wrappers::ReadDirStream};
///
/// # #[tokio::main(flavor = "current_thread")]
/// # async fn main() -> std::io::Result<()> {
/// let dirs = read_dir(".").await?;
/// let mut dirs = ReadDirStream::new(dirs);
/// while let Some(dir) = dirs.next().await {
/// let dir = dir?;
/// println!("{}", dir.path().display());
/// }
/// # Ok(())
/// # }
/// ```
///
/// [`tokio::fs::ReadDir`]: struct@tokio::fs::ReadDir
/// [`Stream`]: trait@crate::Stream
#[derive(Debug)]
Expand Down
16 changes: 16 additions & 0 deletions tokio-stream/src/wrappers/signal_unix.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,22 @@ use tokio::signal::unix::Signal;

/// A wrapper around [`Signal`] that implements [`Stream`].
///
/// # Example
///
/// ```no_run
/// use tokio::signal::unix::{signal, SignalKind};
/// use tokio_stream::{StreamExt, wrappers::SignalStream};
///
/// # #[tokio::main(flavor = "current_thread")]
/// # async fn main() -> std::io::Result<()> {
/// let signals = signal(SignalKind::hangup())?;
/// let mut stream = SignalStream::new(signals);
/// while stream.next().await.is_some() {
/// println!("hangup signal received");
/// }
/// # Ok(())
/// # }
/// ```
/// [`Signal`]: struct@tokio::signal::unix::Signal
/// [`Stream`]: trait@crate::Stream
#[derive(Debug)]
Expand Down
34 changes: 34 additions & 0 deletions tokio-stream/src/wrappers/signal_windows.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,23 @@ use tokio::signal::windows::{CtrlBreak, CtrlC};
///
/// [`CtrlC`]: struct@tokio::signal::windows::CtrlC
/// [`Stream`]: trait@crate::Stream
///
/// # Example
///
/// ```no_run
/// use tokio::signal::windows::ctrl_c;
/// use tokio_stream::{StreamExt, wrappers::CtrlCStream};
///
/// # #[tokio::main(flavor = "current_thread")]
/// # async fn main() -> std::io::Result<()> {
/// let signals = ctrl_c()?;
/// let mut stream = CtrlCStream::new(signals);
/// while stream.next().await.is_some() {
/// println!("ctrl-c received");
/// }
/// # Ok(())
/// # }
/// ```
#[derive(Debug)]
#[cfg_attr(docsrs, doc(cfg(all(windows, feature = "signal"))))]
pub struct CtrlCStream {
Expand Down Expand Up @@ -47,6 +64,23 @@ impl AsMut<CtrlC> for CtrlCStream {

/// A wrapper around [`CtrlBreak`] that implements [`Stream`].
///
/// # Example
///
/// ```no_run
/// use tokio::signal::windows::ctrl_break;
/// use tokio_stream::{StreamExt, wrappers::CtrlBreakStream};
///
/// # #[tokio::main(flavor = "current_thread")]
/// # async fn main() -> std::io::Result<()> {
/// let signals = ctrl_break()?;
/// let mut stream = CtrlBreakStream::new(signals);
/// while stream.next().await.is_some() {
/// println!("ctrl-break received");
/// }
/// # Ok(())
/// # }
/// ```
///
/// [`CtrlBreak`]: struct@tokio::signal::windows::CtrlBreak
/// [`Stream`]: trait@crate::Stream
#[derive(Debug)]
Expand Down
18 changes: 18 additions & 0 deletions tokio-stream/src/wrappers/split.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,24 @@ use tokio::io::{AsyncBufRead, Split};
pin_project! {
/// A wrapper around [`tokio::io::Split`] that implements [`Stream`].
///
/// # Example
///
/// ```
/// use tokio::io::AsyncBufReadExt;
/// use tokio_stream::{StreamExt, wrappers::SplitStream};
///
/// # #[tokio::main(flavor = "current_thread")]
/// # async fn main() -> std::io::Result<()> {
/// let input = "Hello\nWorld\n".as_bytes();
/// let lines = AsyncBufReadExt::split(input, b'\n');
///
/// let mut stream = SplitStream::new(lines);
/// while let Some(line) = stream.next().await {
/// println!("length = {}", line?.len())
/// }
/// # Ok(())
/// # }
/// ```
/// [`tokio::io::Split`]: struct@tokio::io::Split
/// [`Stream`]: trait@crate::Stream
#[derive(Debug)]
Expand Down
27 changes: 27 additions & 0 deletions tokio-stream/src/wrappers/tcp_listener.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,33 @@ use tokio::net::{TcpListener, TcpStream};

/// A wrapper around [`TcpListener`] that implements [`Stream`].
///
/// # Example
///
/// Accept connections from both IPv4 and IPv6 listeners in the same loop:
///
/// ```no_run
/// use std::net::{Ipv4Addr, Ipv6Addr};
///
/// use tokio::net::TcpListener;
/// use tokio_stream::{StreamExt, wrappers::TcpListenerStream};
///
/// # #[tokio::main(flavor = "current_thread")]
/// # async fn main() -> std::io::Result<()> {
/// let ipv4_listener = TcpListener::bind((Ipv6Addr::LOCALHOST, 8080)).await?;
/// let ipv6_listener = TcpListener::bind((Ipv4Addr::LOCALHOST, 8080)).await?;
/// let ipv4_connections = TcpListenerStream::new(ipv4_listener);
/// let ipv6_connections = TcpListenerStream::new(ipv6_listener);
///
/// let mut connections = ipv4_connections.chain(ipv6_connections);
/// while let Some(tcp_stream) = connections.next().await {
/// let stream = tcp_stream?;
/// let peer_addr = stream.peer_addr()?;
/// println!("accepted connection; peer address = {peer_addr}");
/// }
/// # Ok(())
/// # }
/// ```
///
/// [`TcpListener`]: struct@tokio::net::TcpListener
/// [`Stream`]: trait@crate::Stream
#[derive(Debug)]
Expand Down
19 changes: 19 additions & 0 deletions tokio-stream/src/wrappers/unix_listener.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,25 @@ use tokio::net::{UnixListener, UnixStream};

/// A wrapper around [`UnixListener`] that implements [`Stream`].
///
/// # Example
///
/// ```no_run
/// use tokio::net::UnixListener;
/// use tokio_stream::{StreamExt, wrappers::UnixListenerStream};
///
/// # #[tokio::main(flavor = "current_thread")]
/// # async fn main() -> std::io::Result<()> {
/// let listener = UnixListener::bind("/tmp/sock")?;
/// let mut incoming = UnixListenerStream::new(listener);
///
/// while let Some(stream) = incoming.next().await {
/// let stream = stream?;
/// let peer_addr = stream.peer_addr()?;
/// println!("Accepted connection from: {peer_addr:?}");
/// }
/// # Ok(())
/// # }
/// ```
/// [`UnixListener`]: struct@tokio::net::UnixListener
/// [`Stream`]: trait@crate::Stream
#[derive(Debug)]
Expand Down

0 comments on commit 6d15c6c

Please sign in to comment.