Skip to content

Commit

Permalink
m: Remove most of Pin API related unsafe code
Browse files Browse the repository at this point in the history
- Use Option::as_pin_mut/Pin::set
- Use pin-project-lite
  • Loading branch information
taiki-e authored Aug 9, 2023
1 parent a1c3570 commit 0ea4641
Show file tree
Hide file tree
Showing 3 changed files with 76 additions and 77 deletions.
1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ portable-atomic = ["portable-atomic-util", "portable_atomic_crate"]

[dependencies]
parking = { version = "2.0.0", optional = true }
pin-project-lite = "0.2.12"
portable-atomic-util = { version = "0.1.2", default-features = false, optional = true, features = ["alloc"] }

[dependencies.portable_atomic_crate]
Expand Down
115 changes: 60 additions & 55 deletions src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -89,7 +89,6 @@ use alloc::boxed::Box;
use core::borrow::Borrow;
use core::fmt;
use core::future::Future;
use core::marker::PhantomPinned;
use core::mem::ManuallyDrop;
use core::pin::Pin;
use core::ptr;
Expand Down Expand Up @@ -633,17 +632,23 @@ impl<T> Drop for Event<T> {
}
}

/// A guard waiting for a notification from an [`Event`].
///
/// There are two ways for a listener to wait for a notification:
///
/// 1. In an asynchronous manner using `.await`.
/// 2. In a blocking manner by calling [`EventListener::wait()`] on it.
///
/// If a notified listener is dropped without receiving a notification, dropping will notify
/// another active listener. Whether one *additional* listener will be notified depends on what
/// kind of notification was delivered.
pub struct EventListener<T = ()>(Listener<T, Arc<Inner<T>>>);
pin_project_lite::pin_project! {
/// A guard waiting for a notification from an [`Event`].
///
/// There are two ways for a listener to wait for a notification:
///
/// 1. In an asynchronous manner using `.await`.
/// 2. In a blocking manner by calling [`EventListener::wait()`] on it.
///
/// If a notified listener is dropped without receiving a notification, dropping will notify
/// another active listener. Whether one *additional* listener will be notified depends on what
/// kind of notification was delivered.
#[project(!Unpin)] // implied by Listener, but can generate better docs
pub struct EventListener<T = ()> {
#[pin]
listener: Listener<T, Arc<Inner<T>>>,
}
}

unsafe impl<T: Send> Send for EventListener<T> {}
unsafe impl<T: Send> Sync for EventListener<T> {}
Expand All @@ -662,10 +667,9 @@ impl<T> EventListener<T> {
let listener = Listener {
event: unsafe { Arc::clone(&ManuallyDrop::new(Arc::from_raw(inner))) },
listener: None,
_pin: PhantomPinned,
};

Self(listener)
Self { listener }
}

/// Register this listener into the given [`Event`].
Expand Down Expand Up @@ -702,7 +706,7 @@ impl<T> EventListener<T> {
/// assert!(!listener.is_listening());
/// ```
pub fn is_listening(&self) -> bool {
self.0.listener.is_some()
self.listener.listener.is_some()
}

/// Blocks until a notification is received.
Expand Down Expand Up @@ -826,11 +830,11 @@ impl<T> EventListener<T> {
}

fn listener(self: Pin<&mut Self>) -> Pin<&mut Listener<T, Arc<Inner<T>>>> {
unsafe { self.map_unchecked_mut(|this| &mut this.0) }
self.project().listener
}

fn inner(&self) -> &Arc<Inner<T>> {
&self.0.event
&self.listener.event
}
}

Expand All @@ -842,37 +846,44 @@ impl<T> Future for EventListener<T> {
}
}

struct Listener<T, B: Borrow<Inner<T>> + Unpin> {
/// The reference to the original event.
event: B,
pin_project_lite::pin_project! {
#[project(!Unpin)]
struct Listener<T, B: Borrow<Inner<T>>>
where
B: Unpin,
{
// The reference to the original event.
event: B,

// The inner state of the listener.
#[pin]
listener: Option<sys::Listener<T>>,
}

/// The inner state of the listener.
listener: Option<sys::Listener<T>>,
impl<T, B: Borrow<Inner<T>>> PinnedDrop for Listener<T, B>
where
B: Unpin,
{
fn drop(mut this: Pin<&mut Self>) {
// If we're being dropped, we need to remove ourself from the list.
let this = this.project();
let inner = (*this.event).borrow();

/// Enforce pinning.
_pin: PhantomPinned,
inner.remove(this.listener, true);
}
}
}

unsafe impl<T: Send, B: Borrow<Inner<T>> + Unpin + Send> Send for Listener<T, B> {}
unsafe impl<T: Send, B: Borrow<Inner<T>> + Unpin + Sync> Sync for Listener<T, B> {}

impl<T, B: Borrow<Inner<T>> + Unpin> Listener<T, B> {
/// Pin-project this listener.
fn project(self: Pin<&mut Self>) -> (&Inner<T>, Pin<&mut Option<sys::Listener<T>>>) {
// SAFETY: `event` is `Unpin`, and `listener`'s pin status is preserved
unsafe {
let Listener {
event, listener, ..
} = self.get_unchecked_mut();

((*event).borrow(), Pin::new_unchecked(listener))
}
}

/// Register this listener with the event.
fn insert(self: Pin<&mut Self>) {
let (inner, listener) = self.project();
inner.insert(listener);
let this = self.project();
let inner = (*this.event).borrow();

inner.insert(this.listener);
}

/// Wait until the provided deadline.
Expand Down Expand Up @@ -917,10 +928,11 @@ impl<T, B: Borrow<Inner<T>> + Unpin> Listener<T, B> {
parker: &Parker,
unparker: TaskRef<'_>,
) -> Option<T> {
let (inner, mut listener) = self.project();
let mut this = self.project();
let inner = (*this.event).borrow();

// Set the listener's state to `Task`.
if let Some(tag) = inner.register(listener.as_mut(), unparker).notified() {
if let Some(tag) = inner.register(this.listener.as_mut(), unparker).notified() {
// We were already notified, so we don't need to park.
return Some(tag);
}
Expand All @@ -936,15 +948,15 @@ impl<T, B: Borrow<Inner<T>> + Unpin> Listener<T, B> {
if now >= deadline {
// Remove our entry and check if we were notified.
return inner
.remove(listener, false)
.remove(this.listener, false)
.expect("We never removed ourself from the list")
.notified();
}
}
}

// See if we were notified.
if let Some(tag) = inner.register(listener.as_mut(), unparker).notified() {
if let Some(tag) = inner.register(this.listener.as_mut(), unparker).notified() {
return Some(tag);
}
}
Expand All @@ -953,20 +965,22 @@ impl<T, B: Borrow<Inner<T>> + Unpin> Listener<T, B> {
/// Drops this listener and discards its notification (if any) without notifying another
/// active listener.
fn discard(self: Pin<&mut Self>) -> bool {
let (inner, listener) = self.project();
let this = self.project();
let inner = (*this.event).borrow();

inner
.remove(listener, false)
.remove(this.listener, false)
.map_or(false, |state| state.is_notified())
}

/// Poll this listener for a notification.
fn poll_internal(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<T> {
let (inner, mut listener) = self.project();
let mut this = self.project();
let inner = (*this.event).borrow();

// Try to register the listener.
match inner
.register(listener.as_mut(), TaskRef::Waker(cx.waker()))
.register(this.listener.as_mut(), TaskRef::Waker(cx.waker()))
.notified()
{
Some(tag) => {
Expand All @@ -982,15 +996,6 @@ impl<T, B: Borrow<Inner<T>> + Unpin> Listener<T, B> {
}
}

impl<T, B: Borrow<Inner<T>> + Unpin> Drop for Listener<T, B> {
fn drop(&mut self) {
// If we're being dropped, we need to remove ourself from the list.
let (inner, listener) = unsafe { Pin::new_unchecked(self).project() };

inner.remove(listener, true);
}
}

/// The state of a listener.
#[derive(Debug, PartialEq)]
enum State<T> {
Expand Down
37 changes: 15 additions & 22 deletions src/std.rs
Original file line number Diff line number Diff line change
Expand Up @@ -57,28 +57,23 @@ impl<T> crate::Inner<T> {
/// Add a new listener to the list.
///
/// Does nothing is the listener is already registered.
pub(crate) fn insert(&self, listener: Pin<&mut Option<Listener<T>>>) {
pub(crate) fn insert(&self, mut listener: Pin<&mut Option<Listener<T>>>) {
let mut inner = self.lock();

// SAFETY: We are locked, so we can access the inner `link`.
let entry = unsafe {
// SAFETY: We never move out the `link` field.
let listener = match listener.get_unchecked_mut() {
listener @ None => {
// TODO: Use Option::insert once the MSRV is high enough.
*listener = Some(Listener {
link: UnsafeCell::new(Link {
state: Cell::new(State::Created),
prev: Cell::new(inner.tail),
next: Cell::new(None),
}),
_pin: PhantomPinned,
});

listener.as_mut().unwrap()
}
Some(_) => return,
};
if listener.is_some() {
return;
}
listener.as_mut().set(Some(Listener {
link: UnsafeCell::new(Link {
state: Cell::new(State::Created),
prev: Cell::new(inner.tail),
next: Cell::new(None),
}),
_pin: PhantomPinned,
}));
let listener = listener.as_pin_mut().unwrap();

// Get the inner pointer.
&*listener.link.get()
Expand Down Expand Up @@ -127,8 +122,7 @@ impl<T> crate::Inner<T> {

// SAFETY: We are locked, so we can access the inner `link`.
let entry = unsafe {
// SAFETY: We never move out the `link` field.
let listener = match listener.as_mut().get_unchecked_mut().as_mut() {
let listener = match listener.as_mut().as_pin_mut() {
Some(listener) => listener,
None => return RegisterResult::NeverInserted,
};
Expand Down Expand Up @@ -172,8 +166,7 @@ impl<T> Inner<T> {
propagate: bool,
) -> Option<State<T>> {
let entry = unsafe {
// SAFETY: We never move out the `link` field.
let listener = listener.as_mut().get_unchecked_mut().as_mut()?;
let listener = listener.as_mut().as_pin_mut()?;

// Get the inner pointer.
&*listener.link.get()
Expand Down

0 comments on commit 0ea4641

Please sign in to comment.