Skip to content

Commit

Permalink
sync: support mpsc send with &self (#2861)
Browse files Browse the repository at this point in the history
Updates the mpsc channel to use the intrusive waker based sempahore.
This enables using `Sender` with `&self`.

Instead of using `Sender::poll_ready` to ensure capacity and updating
the `Sender` state, `async fn Sender::reserve()` is added. This function
returns a `Permit` value representing the reserved capacity.

Fixes: #2637
Refs: #2718 (intrusive waiters)
  • Loading branch information
carllerche authored Sep 25, 2020
1 parent 4186b0a commit cf025ba
Show file tree
Hide file tree
Showing 19 changed files with 459 additions and 2,561 deletions.
4 changes: 3 additions & 1 deletion tokio-test/src/io.rs
Original file line number Diff line number Diff line change
Expand Up @@ -200,7 +200,9 @@ impl Inner {
}

fn poll_action(&mut self, cx: &mut task::Context<'_>) -> Poll<Option<Action>> {
self.rx.poll_recv(cx)
use futures_core::stream::Stream;

Pin::new(&mut self.rx).poll_next(cx)
}

fn read(&mut self, dst: &mut ReadBuf<'_>) -> io::Result<()> {
Expand Down
30 changes: 1 addition & 29 deletions tokio/src/signal/unix.rs
Original file line number Diff line number Diff line change
Expand Up @@ -391,35 +391,7 @@ impl Signal {
poll_fn(|cx| self.poll_recv(cx)).await
}

/// Polls to receive the next signal notification event, outside of an
/// `async` context.
///
/// `None` is returned if no more events can be received by this stream.
///
/// # Examples
///
/// Polling from a manually implemented future
///
/// ```rust,no_run
/// use std::pin::Pin;
/// use std::future::Future;
/// use std::task::{Context, Poll};
/// use tokio::signal::unix::Signal;
///
/// struct MyFuture {
/// signal: Signal,
/// }
///
/// impl Future for MyFuture {
/// type Output = Option<()>;
///
/// fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
/// println!("polling MyFuture");
/// self.signal.poll_recv(cx)
/// }
/// }
/// ```
pub fn poll_recv(&mut self, cx: &mut Context<'_>) -> Poll<Option<()>> {
pub(crate) fn poll_recv(&mut self, cx: &mut Context<'_>) -> Poll<Option<()>> {
self.rx.poll_recv(cx)
}
}
Expand Down
4 changes: 2 additions & 2 deletions tokio/src/stream/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -270,8 +270,8 @@ pub trait StreamExt: Stream {
/// # #[tokio::main(basic_scheduler)]
/// async fn main() {
/// # time::pause();
/// let (mut tx1, rx1) = mpsc::channel(10);
/// let (mut tx2, rx2) = mpsc::channel(10);
/// let (tx1, rx1) = mpsc::channel(10);
/// let (tx2, rx2) = mpsc::channel(10);
///
/// let mut rx = rx1.merge(rx2);
///
Expand Down
4 changes: 2 additions & 2 deletions tokio/src/stream/stream_map.rs
Original file line number Diff line number Diff line change
Expand Up @@ -57,8 +57,8 @@ use std::task::{Context, Poll};
///
/// #[tokio::main]
/// async fn main() {
/// let (mut tx1, rx1) = mpsc::channel(10);
/// let (mut tx2, rx2) = mpsc::channel(10);
/// let (tx1, rx1) = mpsc::channel(10);
/// let (tx2, rx2) = mpsc::channel(10);
///
/// tokio::spawn(async move {
/// tx1.send(1).await.unwrap();
Expand Down
10 changes: 7 additions & 3 deletions tokio/src/sync/batch_semaphore.rs
Original file line number Diff line number Diff line change
Expand Up @@ -165,7 +165,6 @@ impl Semaphore {
/// permits and notifies all pending waiters.
// This will be used once the bounded MPSC is updated to use the new
// semaphore implementation.
#[allow(dead_code)]
pub(crate) fn close(&self) {
let mut waiters = self.waiters.lock().unwrap();
// If the semaphore's permits counter has enough permits for an
Expand All @@ -185,6 +184,11 @@ impl Semaphore {
}
}

/// Returns true if the semaphore is closed
pub(crate) fn is_closed(&self) -> bool {
self.permits.load(Acquire) & Self::CLOSED == Self::CLOSED
}

pub(crate) fn try_acquire(&self, num_permits: u32) -> Result<(), TryAcquireError> {
assert!(
num_permits as usize <= Self::MAX_PERMITS,
Expand All @@ -194,8 +198,8 @@ impl Semaphore {
let num_permits = (num_permits as usize) << Self::PERMIT_SHIFT;
let mut curr = self.permits.load(Acquire);
loop {
// Has the semaphore closed?git
if curr & Self::CLOSED > 0 {
// Has the semaphore closed?
if curr & Self::CLOSED == Self::CLOSED {
return Err(TryAcquireError::Closed);
}

Expand Down
9 changes: 4 additions & 5 deletions tokio/src/sync/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -106,7 +106,7 @@
//!
//! #[tokio::main]
//! async fn main() {
//! let (mut tx, mut rx) = mpsc::channel(100);
//! let (tx, mut rx) = mpsc::channel(100);
//!
//! tokio::spawn(async move {
//! for i in 0..10 {
Expand Down Expand Up @@ -150,7 +150,7 @@
//! for _ in 0..10 {
//! // Each task needs its own `tx` handle. This is done by cloning the
//! // original handle.
//! let mut tx = tx.clone();
//! let tx = tx.clone();
//!
//! tokio::spawn(async move {
//! tx.send(&b"data to write"[..]).await.unwrap();
Expand Down Expand Up @@ -213,7 +213,7 @@
//!
//! // Spawn tasks that will send the increment command.
//! for _ in 0..10 {
//! let mut cmd_tx = cmd_tx.clone();
//! let cmd_tx = cmd_tx.clone();
//!
//! join_handles.push(tokio::spawn(async move {
//! let (resp_tx, resp_rx) = oneshot::channel();
Expand Down Expand Up @@ -443,7 +443,6 @@ cfg_sync! {
pub mod oneshot;

pub(crate) mod batch_semaphore;
pub(crate) mod semaphore_ll;
mod semaphore;
pub use semaphore::{Semaphore, SemaphorePermit, OwnedSemaphorePermit};

Expand Down Expand Up @@ -473,7 +472,7 @@ cfg_not_sync! {

cfg_signal_internal! {
pub(crate) mod mpsc;
pub(crate) mod semaphore_ll;
pub(crate) mod batch_semaphore;
}
}

Expand Down
Loading

0 comments on commit cf025ba

Please sign in to comment.