Skip to content

Commit

Permalink
docs: add examples to tokio-stream::wrappers docs
Browse files Browse the repository at this point in the history
  • Loading branch information
joshka committed Dec 8, 2024
1 parent eb72ddd commit 95b81d0
Show file tree
Hide file tree
Showing 7 changed files with 150 additions and 0 deletions.
22 changes: 22 additions & 0 deletions tokio-stream/src/wrappers/broadcast.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,28 @@ use std::task::{ready, Context, Poll};

/// A wrapper around [`tokio::sync::broadcast::Receiver`] that implements [`Stream`].
///
/// # Example
///
/// ```
/// use tokio::sync::broadcast;
/// use tokio_stream::wrappers::BroadcastStream;
/// use tokio_stream::StreamExt;
///
/// #[tokio::main]
/// async fn main() -> Result<(), Box<dyn std::error::Error>> {
/// let (tx, rx) = broadcast::channel(16);
/// tx.send(10)?;
/// tx.send(20)?;
/// drop(tx);
///
/// let mut stream = BroadcastStream::new(rx);
/// assert_eq!(stream.next().await, Some(Ok(10)));
/// assert_eq!(stream.next().await, Some(Ok(20)));
/// assert_eq!(stream.next().await, None);
/// Ok(())
/// }
/// ```
///
/// [`tokio::sync::broadcast::Receiver`]: struct@tokio::sync::broadcast::Receiver
/// [`Stream`]: trait@futures_core::Stream
#[cfg_attr(docsrs, doc(cfg(feature = "sync")))]
Expand Down
20 changes: 20 additions & 0 deletions tokio-stream/src/wrappers/interval.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,26 @@ use tokio::time::{Instant, Interval};

/// A wrapper around [`Interval`] that implements [`Stream`].
///
/// # Example
///
/// ```
/// use tokio::time::{Duration, Instant};
/// use tokio_stream::wrappers::IntervalStream;
/// use tokio_stream::StreamExt;
///
/// #[tokio::main]
/// async fn main() {
/// let interval = tokio::time::interval(Duration::from_millis(10));
/// let mut stream = IntervalStream::new(interval);
/// let start = Instant::now();
/// for _ in 0..3 {
/// if let Some(instant) = stream.next().await {
/// println!("elapsed: {:.1?}", instant.duration_since(start));
/// }
/// }
/// }
/// ```
///
/// [`Interval`]: struct@tokio::time::Interval
/// [`Stream`]: trait@crate::Stream
#[derive(Debug)]
Expand Down
19 changes: 19 additions & 0 deletions tokio-stream/src/wrappers/lines.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,25 @@ use tokio::io::{AsyncBufRead, Lines};
pin_project! {
/// A wrapper around [`tokio::io::Lines`] that implements [`Stream`].
///
/// # Example
///
/// ```
/// use tokio::io::AsyncBufReadExt;
/// use tokio_stream::wrappers::LinesStream;
/// use tokio_stream::StreamExt;
///
/// #[tokio::main]
/// async fn main() -> Result<(), Box<dyn std::error::Error>> {
/// let input = b"Hello\nWorld\n";
/// let lines = input.lines(); // tokio::io::util::Lines
/// let mut stream = LinesStream::new(lines);
/// while let Some(line) = stream.next().await {
/// println!("{}", line?);
/// }
/// Ok(())
/// }
/// ```
///
/// [`tokio::io::Lines`]: struct@tokio::io::Lines
/// [`Stream`]: trait@crate::Stream
#[derive(Debug)]
Expand Down
22 changes: 22 additions & 0 deletions tokio-stream/src/wrappers/mpsc_bounded.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,28 @@ use tokio::sync::mpsc::Receiver;

/// A wrapper around [`tokio::sync::mpsc::Receiver`] that implements [`Stream`].
///
/// # Example
///
/// ```
/// use tokio::sync::mpsc;
/// use tokio_stream::wrappers::ReceiverStream;
/// use tokio_stream::StreamExt;
///
/// #[tokio::main]
/// async fn main() -> Result<(), Box<dyn std::error::Error>> {
/// let (tx, rx) = mpsc::channel(2);
/// tx.send(10).await?;
/// tx.send(20).await?;
/// drop(tx);
///
/// let mut stream = ReceiverStream::new(rx);
/// assert_eq!(stream.next().await, Some(10));
/// assert_eq!(stream.next().await, Some(20));
/// assert_eq!(stream.next().await, None);
/// Ok(())
/// }
/// ```
///
/// [`tokio::sync::mpsc::Receiver`]: struct@tokio::sync::mpsc::Receiver
/// [`Stream`]: trait@crate::Stream
#[derive(Debug)]
Expand Down
21 changes: 21 additions & 0 deletions tokio-stream/src/wrappers/mpsc_unbounded.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,27 @@ use tokio::sync::mpsc::UnboundedReceiver;

/// A wrapper around [`tokio::sync::mpsc::UnboundedReceiver`] that implements [`Stream`].
///
/// # Example
///
/// ```
/// use tokio::sync::mpsc;
/// use tokio_stream::wrappers::UnboundedReceiverStream;
/// use tokio_stream::StreamExt;
///
/// #[tokio::main]
/// async fn main() {
/// let (tx, rx) = mpsc::unbounded_channel();
/// tx.send(10).unwrap();
/// tx.send(20).unwrap();
/// drop(tx);
///
/// let mut stream = UnboundedReceiverStream::new(rx);
/// assert_eq!(stream.next().await, Some(10));
/// assert_eq!(stream.next().await, Some(20));
/// assert_eq!(stream.next().await, None);
/// }
/// ```
///
/// [`tokio::sync::mpsc::UnboundedReceiver`]: struct@tokio::sync::mpsc::UnboundedReceiver
/// [`Stream`]: trait@crate::Stream
#[derive(Debug)]
Expand Down
18 changes: 18 additions & 0 deletions tokio-stream/src/wrappers/read_dir.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,24 @@ use tokio::fs::{DirEntry, ReadDir};

/// A wrapper around [`tokio::fs::ReadDir`] that implements [`Stream`].
///
/// # Example
///
/// ```
/// use tokio::fs::read_dir;
/// use tokio_stream::{wrappers::ReadDirStream, StreamExt};
///
/// #[tokio::main]
/// async fn main() -> Result<(), Box<dyn std::error::Error>> {
/// let dirs = read_dir("examples").await?;
/// let mut dirs = ReadDirStream::new(dirs);
/// while let Some(dir) = dirs.next().await {
/// let dir = dir?;
/// println!("{}", dir.path().display());
/// }
/// Ok(())
/// }
/// ```
///
/// [`tokio::fs::ReadDir`]: struct@tokio::fs::ReadDir
/// [`Stream`]: trait@crate::Stream
#[derive(Debug)]
Expand Down
28 changes: 28 additions & 0 deletions tokio-stream/src/wrappers/tcp_listener.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,34 @@ use tokio::net::{TcpListener, TcpStream};

/// A wrapper around [`TcpListener`] that implements [`Stream`].
///
/// # Example
///
/// Accept connections from both IPv4 and IPv6 listeners in the same loop:
///
/// ```no_run
/// use std::net::{Ipv4Addr, Ipv6Addr};
///
/// use tokio::net::TcpListener;
/// use tokio_stream::{wrappers::TcpListenerStream, StreamExt};
///
/// #[tokio::main]
/// async fn main() -> Result<(), Box<dyn std::error::Error>> {
/// let ipv4_listener = TcpListener::bind((Ipv6Addr::LOCALHOST, 8080)).await?;
/// let ipv6_listener = TcpListener::bind((Ipv4Addr::LOCALHOST, 8080)).await?;
/// let ipv4_connections = TcpListenerStream::new(ipv4_listener);
/// let ipv6_connections = TcpListenerStream::new(ipv6_listener);
/// let mut connections = ipv4_connections.chain(ipv6_connections);
/// while let Some(tcp_stream) = connections.next().await {
/// let stream = tcp_stream?;
/// println!(
/// "accepted connection; peer address = {:?}",
/// stream.peer_addr()?
/// );
/// }
/// Ok(())
/// }
/// ```
///
/// [`TcpListener`]: struct@tokio::net::TcpListener
/// [`Stream`]: trait@crate::Stream
#[derive(Debug)]
Expand Down

0 comments on commit 95b81d0

Please sign in to comment.