Skip to content

Commit

Permalink
Split the EventListener struct into a sub-listener structure
Browse files Browse the repository at this point in the history
First part of EventListener reworks. The idea here is that, in the
future, we can add an "EventListenerRef" structure that just takes
an "&'a Inner" rather than an "Arc<Inner>". This way, you don't
need to do an atomic clone for listening.
  • Loading branch information
notgull committed Mar 31, 2023
1 parent 0c76968 commit de1f13a
Showing 1 changed file with 119 additions and 89 deletions.
208 changes: 119 additions & 89 deletions src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,7 @@ mod sys;

use alloc::sync::Arc;

use core::borrow::Borrow;
use core::fmt;
use core::future::Future;
use core::mem::ManuallyDrop;
Expand Down Expand Up @@ -176,12 +177,12 @@ impl Event {
let inner = self.inner();

// Register the listener.
let mut listener = EventListener {
inner: unsafe { Arc::clone(&ManuallyDrop::new(Arc::from_raw(inner))) },
state: sys::Listener::Discarded,
};
let mut listener = EventListener(Listener {
event: unsafe { Arc::clone(&ManuallyDrop::new(Arc::from_raw(inner))) },
listener: sys::Listener::Discarded,
});

listener.inner.insert(&mut listener.state);
listener.0.event.insert(&mut listener.0.listener);

// Make sure the listener is registered before whatever happens next.
full_fence();
Expand Down Expand Up @@ -443,12 +444,12 @@ impl Default for Event {
/// If a notified listener is dropped without receiving a notification, dropping will notify
/// another active listener. Whether one *additional* listener will be notified depends on what
/// kind of notification was delivered.
pub struct EventListener {
/// A reference to [`Event`]'s inner state.
inner: Arc<Inner>,
pub struct EventListener(Listener<Arc<Inner>>);

/// The current state of the listener.
state: sys::Listener,
impl fmt::Debug for EventListener {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
f.write_str("EventListener { .. }")
}
}

#[cfg(feature = "std")]
Expand All @@ -475,7 +476,7 @@ impl EventListener {
/// listener.wait();
/// ```
pub fn wait(self) {
self.wait_internal(None);
self.0.wait_internal(None);
}

/// Blocks until a notification is received or a timeout is reached.
Expand All @@ -495,7 +496,7 @@ impl EventListener {
/// assert!(!listener.wait_timeout(Duration::from_secs(1)));
/// ```
pub fn wait_timeout(self, timeout: Duration) -> bool {
self.wait_internal(Some(Instant::now() + timeout))
self.0.wait_internal(Instant::now().checked_add(timeout))
}

/// Blocks until a notification is received or a deadline is reached.
Expand All @@ -509,69 +510,14 @@ impl EventListener {
/// use event_listener::Event;
///
/// let event = Event::new();
/// let listener = event.listen();
/// let mut listener = event.listen();
///
/// // There are no notification so this times out.
/// assert!(!listener.wait_deadline(Instant::now() + Duration::from_secs(1)));
/// assert!(!listener.as_mut().wait_deadline(Instant::now() + Duration::from_secs(1)));
/// ```
#[cfg(feature = "std")]
pub fn wait_deadline(self, deadline: Instant) -> bool {
self.wait_internal(Some(deadline))
}

fn wait_internal(mut self, deadline: Option<Instant>) -> bool {
let (parker, unparker) = parking::pair();

// Set the listener's state to `Task`.
match self
.inner
.register(&mut self.state, TaskRef::Unparker(&unparker))
{
Some(true) => {
// We were already notified, so we don't need to park.
return true;
}

Some(false) => {
// We're now waiting for a notification.
}

None => {
// We were never inserted into the list.
panic!("listener was never inserted into the list");
}
}

// Wait until a notification is received or the timeout is reached.
loop {
match deadline {
None => parker.park(),

Some(deadline) => {
// Check for timeout.
let now = Instant::now();
if now >= deadline {
// Remove our entry and check if we were notified.
return self
.inner
.remove(&mut self.state, false)
.expect("We never removed ourself from the list")
.is_notified();
}

// Park until the deadline.
parker.park_timeout(deadline - now);
}
}

// See if we were notified.
if self
.inner
.register(&mut self.state, TaskRef::Unparker(&unparker))
.expect("We never removed ourself from the list")
{
return true;
}
}
self.0.wait_internal(Some(deadline))
}
}

Expand All @@ -595,10 +541,8 @@ impl EventListener {
/// assert!(listener1.discard());
/// assert!(!listener2.discard());
/// ```
pub fn discard(mut self) -> bool {
self.inner
.remove(&mut self.state, false)
.map_or(false, |state| state.is_notified())
pub fn discard(self) -> bool {
self.0.discard()
}

/// Returns `true` if this listener listens to the given `Event`.
Expand All @@ -615,7 +559,7 @@ impl EventListener {
/// ```
#[inline]
pub fn listens_to(&self, event: &Event) -> bool {
ptr::eq::<Inner>(&*self.inner, event.inner.load(Ordering::Acquire))
ptr::eq::<Inner>(&**self.inner(), event.inner.load(Ordering::Acquire))
}

/// Returns `true` if both listeners listen to the same `Event`.
Expand All @@ -632,27 +576,112 @@ impl EventListener {
/// assert!(listener1.same_event(&listener2));
/// ```
pub fn same_event(&self, other: &EventListener) -> bool {
ptr::eq::<Inner>(&*self.inner, &*other.inner)
ptr::eq::<Inner>(&**self.inner(), &**other.inner())
}
}

impl fmt::Debug for EventListener {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
f.pad("EventListener { .. }")
fn listener(&mut self) -> &mut Listener<Arc<Inner>> {
&mut self.0
}

fn inner(&self) -> &Arc<Inner> {
&self.0.event
}
}

impl Future for EventListener {
type Output = ();

#[allow(unreachable_patterns)]
fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
let this = self.get_mut();
fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
self.listener().poll_internal(cx)
}
}

struct Listener<B: Borrow<Inner> + Unpin> {
/// The reference to the original event.
event: B,

/// The inner state of the listener.
listener: sys::Listener,
}

unsafe impl<B: Borrow<Inner> + Unpin + Send> Send for Listener<B> {}
unsafe impl<B: Borrow<Inner> + Unpin + Sync> Sync for Listener<B> {}

impl<B: Borrow<Inner> + Unpin> Listener<B> {
/// Wait until the provided deadline.
#[cfg(feature = "std")]
fn wait_internal(mut self, deadline: Option<Instant>) -> bool {
let (parker, unparker) = parking::pair();

// Set the listener's state to `Task`.
match self
.event
.borrow()
.register(&mut self.listener, TaskRef::Unparker(&unparker))
{
Some(true) => {
// We were already notified, so we don't need to park.
return true;
}

Some(false) => {
// We're now waiting for a notification.
}

None => {
// We were never inserted into the list.
panic!("listener was never inserted into the list");
}
}

// Wait until a notification is received or the timeout is reached.
loop {
match deadline {
None => parker.park(),

Some(deadline) => {
// Make sure we're not timed out already.
let now = Instant::now();
if now >= deadline {
// Remove our entry and check if we were notified.
return self
.event
.borrow()
.remove(&mut self.listener, false)
.expect("We never removed ourself from the list")
.is_notified();
}
}
}

// See if we were notified.
if self
.event
.borrow()
.register(&mut self.listener, TaskRef::Unparker(&unparker))
.expect("We never removed ourself from the list")
{
return true;
}
}
}

/// Drops this listener and discards its notification (if any) without notifying another
/// active listener.
fn discard(mut self) -> bool {
self.event
.borrow()
.remove(&mut self.listener, false)
.map_or(false, |state| state.is_notified())
}

/// Poll this listener for a notification.
fn poll_internal(&mut self, cx: &mut Context<'_>) -> Poll<()> {
// Try to register the listener.
match this
.inner
.register(&mut this.state, TaskRef::Waker(cx.waker()))
match self
.event
.borrow()
.register(&mut self.listener, TaskRef::Waker(cx.waker()))
{
Some(true) => {
// We were already notified, so we don't need to park.
Expand All @@ -672,9 +701,10 @@ impl Future for EventListener {
}
}

impl Drop for EventListener {
impl<B: Borrow<Inner> + Unpin> Drop for Listener<B> {
fn drop(&mut self) {
self.inner.remove(&mut self.state, true);
// If we're being dropped, we need to remove ourself from the list.
self.event.borrow().remove(&mut self.listener, true);
}
}

Expand Down

0 comments on commit de1f13a

Please sign in to comment.