Skip to content

Commit

Permalink
Merge pull request #130 from kyrias/make-remaining-core-spinlock-opt-in
Browse files Browse the repository at this point in the history
Make remaining core channel spinlock opt-in
  • Loading branch information
zesterer authored Aug 16, 2023
2 parents 80d19c4 + acc7ade commit 431524d
Show file tree
Hide file tree
Showing 3 changed files with 52 additions and 18 deletions.
1 change: 1 addition & 0 deletions src/async.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ use std::{
use crate::*;
use futures_core::{stream::{Stream, FusedStream}, future::FusedFuture};
use futures_sink::Sink;
use spin1::Mutex as Spinlock;

struct AsyncSignal {
waker: Spinlock<Waker>,
Expand Down
68 changes: 50 additions & 18 deletions src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@ use std::{
fmt,
};

#[cfg(feature = "spin")]
use spin1::{Mutex as Spinlock, MutexGuard as SpinlockGuard};
use crate::signal::{Signal, SyncSignal};

Expand Down Expand Up @@ -256,55 +257,86 @@ enum TryRecvTimeoutError {
}

// TODO: Investigate some sort of invalidation flag for timeouts
#[cfg(feature = "spin")]
struct Hook<T, S: ?Sized>(Option<Spinlock<Option<T>>>, S);

#[cfg(not(feature = "spin"))]
struct Hook<T, S: ?Sized>(Option<Mutex<Option<T>>>, S);

#[cfg(feature = "spin")]
impl<T, S: ?Sized + Signal> Hook<T, S> {
pub fn slot(msg: Option<T>, signal: S) -> Arc<Self> where S: Sized {
pub fn slot(msg: Option<T>, signal: S) -> Arc<Self>
where
S: Sized,
{
Arc::new(Self(Some(Spinlock::new(msg)), signal))
}

pub fn trigger(signal: S) -> Arc<Self> where S: Sized {
Arc::new(Self(None, signal))
fn lock(&self) -> Option<SpinlockGuard<'_, Option<T>>> {
self.0.as_ref().map(|s| s.lock())
}
}

pub fn signal(&self) -> &S {
&self.1
#[cfg(not(feature = "spin"))]
impl<T, S: ?Sized + Signal> Hook<T, S> {
pub fn slot(msg: Option<T>, signal: S) -> Arc<Self>
where
S: Sized,
{
Arc::new(Self(Some(Mutex::new(msg)), signal))
}

pub fn fire_nothing(&self) -> bool {
self.signal().fire()
fn lock(&self) -> Option<MutexGuard<'_, Option<T>>> {
self.0.as_ref().map(|s| s.lock().unwrap())
}
}

impl<T, S: ?Sized + Signal> Hook<T, S> {
pub fn fire_recv(&self) -> (T, &S) {
let msg = self.0.as_ref().unwrap().lock().take().unwrap();
let msg = self.lock().unwrap().take().unwrap();
(msg, self.signal())
}

pub fn fire_send(&self, msg: T) -> (Option<T>, &S) {
let ret = match &self.0 {
Some(hook) => {
*hook.lock() = Some(msg);
let ret = match self.lock() {
Some(mut lock) => {
*lock = Some(msg);
None
},
}
None => Some(msg),
};
(ret, self.signal())
}

pub fn is_empty(&self) -> bool {
self.0.as_ref().map(|s| s.lock().is_none()).unwrap_or(true)
self.lock().map(|s| s.is_none()).unwrap_or(true)
}

pub fn try_take(&self) -> Option<T> {
self.0.as_ref().and_then(|s| s.lock().take())
self.lock().unwrap().take()
}

pub fn trigger(signal: S) -> Arc<Self>
where
S: Sized,
{
Arc::new(Self(None, signal))
}

pub fn signal(&self) -> &S {
&self.1
}

pub fn fire_nothing(&self) -> bool {
self.signal().fire()
}
}

impl<T> Hook<T, SyncSignal> {
pub fn wait_recv(&self, abort: &AtomicBool) -> Option<T> {
loop {
let disconnected = abort.load(Ordering::SeqCst); // Check disconnect *before* msg
let msg = self.0.as_ref().unwrap().lock().take();
let msg = self.lock().unwrap().take();
if let Some(msg) = msg {
break Some(msg);
} else if disconnected {
Expand All @@ -319,7 +351,7 @@ impl<T> Hook<T, SyncSignal> {
pub fn wait_deadline_recv(&self, abort: &AtomicBool, deadline: Instant) -> Result<T, bool> {
loop {
let disconnected = abort.load(Ordering::SeqCst); // Check disconnect *before* msg
let msg = self.0.as_ref().unwrap().lock().take();
let msg = self.lock().unwrap().take();
if let Some(msg) = msg {
break Ok(msg);
} else if disconnected {
Expand All @@ -335,7 +367,7 @@ impl<T> Hook<T, SyncSignal> {
pub fn wait_send(&self, abort: &AtomicBool) {
loop {
let disconnected = abort.load(Ordering::SeqCst); // Check disconnect *before* msg
if disconnected || self.0.as_ref().unwrap().lock().is_none() {
if disconnected || self.lock().unwrap().is_none() {
break;
}

Expand All @@ -347,7 +379,7 @@ impl<T> Hook<T, SyncSignal> {
pub fn wait_deadline_send(&self, abort: &AtomicBool, deadline: Instant) -> Result<(), bool> {
loop {
let disconnected = abort.load(Ordering::SeqCst); // Check disconnect *before* msg
if self.0.as_ref().unwrap().lock().is_none() {
if self.lock().unwrap().is_none() {
break Ok(());
} else if disconnected {
break Err(false);
Expand Down
1 change: 1 addition & 0 deletions src/select.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
//! Types that permit waiting upon multiple blocking operations using the [`Selector`] interface.

use crate::*;
use spin1::Mutex as Spinlock;
use std::{any::Any, marker::PhantomData};

#[cfg(feature = "eventual-fairness")]
Expand Down

0 comments on commit 431524d

Please sign in to comment.