Skip to content

Commit

Permalink
Replace Listener with Option<sys::Listener>
Browse files Browse the repository at this point in the history
  • Loading branch information
notgull committed Mar 31, 2023
1 parent de1f13a commit 09ded13
Show file tree
Hide file tree
Showing 3 changed files with 39 additions and 41 deletions.
8 changes: 4 additions & 4 deletions src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -179,7 +179,7 @@ impl Event {
// Register the listener.
let mut listener = EventListener(Listener {
event: unsafe { Arc::clone(&ManuallyDrop::new(Arc::from_raw(inner))) },
listener: sys::Listener::Discarded,
listener: None,
});

listener.0.event.insert(&mut listener.0.listener);
Expand Down Expand Up @@ -510,10 +510,10 @@ impl EventListener {
/// use event_listener::Event;
///
/// let event = Event::new();
/// let mut listener = event.listen();
/// let listener = event.listen();
///
/// // There are no notification so this times out.
/// assert!(!listener.as_mut().wait_deadline(Instant::now() + Duration::from_secs(1)));
/// assert!(!listener.wait_deadline(Instant::now() + Duration::from_secs(1)));
/// ```
#[cfg(feature = "std")]
pub fn wait_deadline(self, deadline: Instant) -> bool {
Expand Down Expand Up @@ -601,7 +601,7 @@ struct Listener<B: Borrow<Inner> + Unpin> {
event: B,

/// The inner state of the listener.
listener: sys::Listener,
listener: Option<sys::Listener>,
}

unsafe impl<B: Borrow<Inner> + Unpin + Send> Send for Listener<B> {}
Expand Down
61 changes: 31 additions & 30 deletions src/no_std.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,43 +33,43 @@ impl crate::Inner {
/// Add a new listener to the list.
///
/// Does nothing if the list is already registered.
pub(crate) fn insert(&self, listener: &mut Listener) {
if let Listener::HasNode(_) | Listener::Queued(_) = *listener {
pub(crate) fn insert(&self, listener: &mut Option<Listener>) {
if listener.is_some() {
// Already inserted.
return;
}

match self.try_lock() {
Some(mut lock) => {
let key = lock.insert(State::Created);
*listener = Listener::HasNode(key);
*listener = Some(Listener::HasNode(key));
}

None => {
// Push it to the queue.
let (node, task_waiting) = Node::listener();
self.list.queue.push(node);
*listener = Listener::Queued(task_waiting);
*listener = Some(Listener::Queued(task_waiting));
}
}
}

/// Remove a listener from the list.
pub(crate) fn remove(&self, listener: &mut Listener, propogate: bool) -> Option<State> {
let state = match mem::replace(listener, Listener::Discarded) {
Listener::HasNode(key) => {
pub(crate) fn remove(&self, listener: &mut Option<Listener>, propogate: bool) -> Option<State> {
let state = match listener.take() {
Some(Listener::HasNode(key)) => {
match self.try_lock() {
Some(mut list) => {
// Fast path removal.
list.remove(Listener::HasNode(key), propogate)
list.remove(key, propogate)
}

None => {
// Slow path removal.
// This is why intrusive lists don't work on no_std.
let node = Node::RemoveListener {
key,
propagate: propogate,
listener: Listener::HasNode(key),
};

self.list.queue.push(node);
Expand All @@ -79,12 +79,12 @@ impl crate::Inner {
}
}

Listener::Queued(_) => {
Some(Listener::Queued(_)) => {
// This won't be added after we drop the lock.
None
}

_ => None,
None => None,
};

state
Expand Down Expand Up @@ -115,11 +115,15 @@ impl crate::Inner {
///
/// Returns `true` if the listener was already notified, and `false` otherwise. If the listener
/// isn't inserted, returns `None`.
pub(crate) fn register(&self, listener: &mut Listener, task: TaskRef<'_>) -> Option<bool> {
pub(crate) fn register(
&self,
listener: &mut Option<Listener>,
task: TaskRef<'_>,
) -> Option<bool> {
loop {
match mem::replace(listener, Listener::Discarded) {
Listener::HasNode(key) => {
*listener = Listener::HasNode(key);
match listener.take() {
Some(Listener::HasNode(key)) => {
*listener = Some(Listener::HasNode(key));
match self.try_lock() {
Some(mut guard) => {
// Fast path registration.
Expand All @@ -135,18 +139,18 @@ impl crate::Inner {
}
}

Listener::Queued(task_waiting) => {
Some(Listener::Queued(task_waiting)) => {
// Are we done yet?
match task_waiting.status() {
Some(key) => {
// We're inserted now, adjust state.
*listener = Listener::HasNode(key);
*listener = Some(Listener::HasNode(key));
}

None => {
// We're still queued, so register the task.
task_waiting.register(task.into_task());
*listener = Listener::Queued(task_waiting);
*listener = Some(Listener::Queued(task_waiting));
return None;
}
}
Expand Down Expand Up @@ -476,12 +480,7 @@ impl ListenerSlab {
}

/// Removes an entry from the list and returns its state.
pub(crate) fn remove(&mut self, listener: Listener, propogate: bool) -> Option<State> {
let key = match listener {
Listener::HasNode(key) => key,
_ => return None,
};

pub(crate) fn remove(&mut self, key: NonZeroUsize, propogate: bool) -> Option<State> {
let entry = &self.listeners[key.get()];
let prev = entry.prev().get();
let next = entry.next().get();
Expand Down Expand Up @@ -569,9 +568,13 @@ impl ListenerSlab {
///
/// Returns `true` if the listener was already notified, and `false` otherwise. If the listener
/// isn't inserted, returns `None`.
pub(crate) fn register(&mut self, listener: &mut Listener, task: TaskRef<'_>) -> Option<bool> {
pub(crate) fn register(
&mut self,
listener: &mut Option<Listener>,
task: TaskRef<'_>,
) -> Option<bool> {
let key = match *listener {
Listener::HasNode(key) => key,
Some(Listener::HasNode(key)) => key,
_ => return None,
};

Expand All @@ -581,7 +584,8 @@ impl ListenerSlab {
match entry.state().replace(State::NotifiedTaken) {
State::Notified(_) | State::NotifiedTaken => {
// The listener was already notified, so we don't need to do anything.
self.remove(mem::replace(listener, Listener::Discarded), false)?;
self.remove(key, false)?;
*listener = None;
Some(true)
}

Expand Down Expand Up @@ -609,9 +613,6 @@ pub(crate) enum Listener {
/// The listener has a node inside of the linked list.
HasNode(NonZeroUsize),

/// The listener has already been notified and has discarded its entry.
Discarded,

/// The listener has an entry in the queue that may or may not have a task waiting.
Queued(Arc<TaskWaiting>),
}
Expand Down
11 changes: 4 additions & 7 deletions src/no_std/node.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
//! The node that makes up queues.

use super::{Listener, ListenerSlab};
use super::ListenerSlab;
use crate::sync::atomic::{AtomicPtr, AtomicUsize, Ordering};
use crate::sync::Arc;
use crate::{State, Task};
Expand Down Expand Up @@ -32,7 +32,7 @@ pub(crate) enum Node {
/// This node is removing a listener.
RemoveListener {
/// The ID of the listener to remove.
listener: Listener,
key: NonZeroUsize,

/// Whether to propagate notifications to the next listener.
propagate: bool,
Expand Down Expand Up @@ -83,12 +83,9 @@ impl Node {
// Notify the listener.
list.notify(count, additional);
}
Node::RemoveListener {
listener,
propagate,
} => {
Node::RemoveListener { key, propagate } => {
// Remove the listener from the list.
list.remove(listener, propagate);
list.remove(key, propagate);
}
Node::Waiting(task) => {
return Some(task);
Expand Down

0 comments on commit 09ded13

Please sign in to comment.