Skip to content

Commit

Permalink
feat: add parking lot compatable api
Browse files Browse the repository at this point in the history
  • Loading branch information
driftluo committed Apr 5, 2021
1 parent 7520fbd commit 4190d1d
Show file tree
Hide file tree
Showing 7 changed files with 97 additions and 7 deletions.
1 change: 1 addition & 0 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ examples:
features-check:
# remove yamux default features
sed -i 's/"tokio-timer"//g' yamux/Cargo.toml
$(Change_Work_Path) && cargo build --features parking_lot
$(Change_Work_Path) && cargo build --features unstable
$(Change_Work_Path) && cargo build --features tokio-runtime,generic-timer,unstable --no-default-features
$(Change_Work_Path) && cargo build --features async-runtime,generic-timer,unstable --no-default-features
Expand Down
2 changes: 2 additions & 0 deletions tentacle/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,8 @@ tokio-util = { version = "0.6.0", features = ["codec"] }
log = "0.4"
bytes = "1.0.0"
thiserror = "1.0"
once_cell = "1.0"
parking_lot = { version = "0.11", optional = true }
tokio-tungstenite = { version = "0.13", optional = true }
futures-timer = { version = "3.0.2", optional = true }
async-std = { version = "1", features = ["unstable"], optional = true }
Expand Down
11 changes: 6 additions & 5 deletions tentacle/src/channel/bound.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ use crate::channel::{
decode_state, encode_state, queue::Queue, Priority, SendError, SendErrorKind, TryRecvError,
TrySendError, INIT_STATE, MAX_BUFFER, MAX_CAPACITY, OPEN_MASK,
};
use crate::lock::Mutex;

use futures::{
stream::{FusedStream, Stream},
Expand All @@ -14,7 +15,7 @@ use std::{
AtomicBool, AtomicUsize,
Ordering::{Relaxed, SeqCst},
},
Arc, Mutex,
Arc,
},
task::{Context, Poll},
};
Expand Down Expand Up @@ -256,7 +257,7 @@ impl<T> BoundedSenderInner<T> {

fn park(&self) {
{
let mut sender = self.sender_task.lock().unwrap();
let mut sender = self.sender_task.lock();
sender.task = None;
sender.is_parked = true;
}
Expand Down Expand Up @@ -326,7 +327,7 @@ impl<T> BoundedSenderInner<T> {
// lock in most cases
if self.maybe_parked.load(Relaxed) {
// Get a lock on the task handle
let mut task = self.sender_task.lock().unwrap();
let mut task = self.sender_task.lock();

if !task.is_parked {
self.maybe_parked.store(false, Relaxed);
Expand Down Expand Up @@ -532,7 +533,7 @@ impl<T> Receiver<T> {
// Wake up any threads waiting as they'll see that we've closed the
// channel and will continue on their merry way.
while let Some(task) = unsafe { inner.parked_queue.pop_spin() } {
task.lock().unwrap().notify();
task.lock().notify();
}
}
}
Expand Down Expand Up @@ -608,7 +609,7 @@ impl<T> Receiver<T> {
fn unpark_one(&mut self) {
if let Some(inner) = &mut self.inner {
if let Some(task) = unsafe { inner.parked_queue.pop_spin() } {
task.lock().unwrap().notify();
task.lock().notify();
}
}
}
Expand Down
5 changes: 3 additions & 2 deletions tentacle/src/channel/tests/mpsc.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
use crate::channel::mpsc;
use crate::lock::Mutex;
use futures::channel::oneshot;
use futures::executor::{block_on, block_on_stream};
use futures::future::{poll_fn, FutureExt};
Expand All @@ -8,7 +9,7 @@ use futures::stream::{Stream, StreamExt};
use futures::task::{Context, Poll};
use futures_test::task::{new_count_waker, noop_context};
use std::sync::atomic::{AtomicUsize, Ordering};
use std::sync::{Arc, Mutex};
use std::sync::Arc;
use std::thread;

trait AssertSend: Send {}
Expand Down Expand Up @@ -283,7 +284,7 @@ fn stress_receiver_multi_task_bounded_hard() {

loop {
i += 1;
let mut rx_opt = rx.lock().unwrap();
let mut rx_opt = rx.lock();
if let Some(rx) = &mut *rx_opt {
if i % 5 == 0 {
let item = block_on(rx.next());
Expand Down
1 change: 1 addition & 0 deletions tentacle/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -104,6 +104,7 @@ pub mod builder;
pub mod context;
/// Error
pub mod error;
pub(crate) mod lock;
/// Protocol handle callback stream
pub(crate) mod protocol_handle_stream;
/// Protocol select
Expand Down
9 changes: 9 additions & 0 deletions tentacle/src/lock/mod.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
#![allow(dead_code)]

#[cfg(feature = "parking_lot")]
pub use parking_lot::{const_fair_mutex, const_mutex, const_rwlock, FairMutex, Mutex, RwLock};
#[cfg(not(feature = "parking_lot"))]
pub mod native;

#[cfg(not(feature = "parking_lot"))]
pub use native::{Mutex, RwLock};
75 changes: 75 additions & 0 deletions tentacle/src/lock/native.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,75 @@
use std::sync::{self, MutexGuard, RwLockReadGuard, RwLockWriteGuard, TryLockError};

/// Adapter for `std::Mutex` that removes the poisoning aspects
// from its api
#[derive(Debug)]
pub struct Mutex<T: ?Sized>(sync::Mutex<T>);

impl<T> Mutex<T> {
#[inline]
pub fn new(t: T) -> Mutex<T> {
Mutex(sync::Mutex::new(t))
}

#[inline]
pub fn lock(&self) -> MutexGuard<'_, T> {
match self.0.lock() {
Ok(guard) => guard,
Err(p_err) => p_err.into_inner(),
}
}

#[inline]
pub fn try_lock(&self) -> Option<MutexGuard<'_, T>> {
match self.0.try_lock() {
Ok(guard) => Some(guard),
Err(TryLockError::Poisoned(p_err)) => Some(p_err.into_inner()),
Err(TryLockError::WouldBlock) => None,
}
}
}

/// Adapter for `std::RwLock` that removes the poisoning aspects
// from its api
pub struct RwLock<T: ?Sized>(sync::RwLock<T>);

impl<T> RwLock<T> {
#[inline]
pub fn new(t: T) -> RwLock<T> {
RwLock(sync::RwLock::new(t))
}

#[inline]
pub fn read(&self) -> RwLockReadGuard<'_, T> {
match self.0.read() {
Ok(guard) => guard,
Err(p_err) => p_err.into_inner(),
}
}

#[inline]
pub fn try_read(&self) -> Option<RwLockReadGuard<'_, T>> {
match self.0.try_read() {
Ok(guard) => Some(guard),
Err(TryLockError::Poisoned(p_err)) => Some(p_err.into_inner()),
Err(TryLockError::WouldBlock) => None,
}
}

#[inline]
pub fn write(&self) -> RwLockWriteGuard<'_, T> {
match self.0.write() {
Ok(guard) => guard,
Err(p_err) => p_err.into_inner(),
}
}

#[inline]
pub fn try_write(&self) -> Option<RwLockWriteGuard<'_, T>> {
match self.0.try_write() {
Ok(guard) => Some(guard),
Err(TryLockError::Poisoned(p_err)) => Some(p_err.into_inner()),
Err(TryLockError::WouldBlock) => None,
}
}
}

0 comments on commit 4190d1d

Please sign in to comment.