Skip to content

Commit

Permalink
Replace EventLoop notify with new channel impl
Browse files Browse the repository at this point in the history
  • Loading branch information
carllerche committed May 25, 2016
1 parent 9964a56 commit 88494d5
Show file tree
Hide file tree
Showing 10 changed files with 270 additions and 100 deletions.
140 changes: 128 additions & 12 deletions src/channel.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ use lazy::{Lazy, AtomicLazy};
use std::sync::{mpsc, Arc};
use std::sync::atomic::{AtomicUsize, Ordering};

#[derive(Clone)]
pub struct SenderCtl {
inner: Arc<Inner>,
}
Expand All @@ -15,26 +16,61 @@ pub struct ReceiverCtl {
}

pub struct Sender<T> {
tx: mpsc::Sender<T>,
tx: StdSender<T>,
ctl: SenderCtl,
}

enum StdSender<T> {
Bounded(mpsc::SyncSender<T>),
Unbounded(mpsc::Sender<T>),
}

pub struct Receiver<T> {
rx: mpsc::Receiver<T>,
ctl: ReceiverCtl,
}

#[derive(Debug)]
pub enum SendError<T> {
Io(io::Error),
Disconnected(T),
}

#[derive(Debug)]
pub enum TrySendError<T> {
Io(io::Error),
Full(T),
Disconnected(T),
}

struct Inner {
pending: AtomicUsize,
set_readiness: AtomicLazy<SetReadiness>,
}

pub fn from_std<T>((tx, rx): (mpsc::Sender<T>, mpsc::Receiver<T>)) -> (Sender<T>, Receiver<T>)
pub fn from_std_channel<T>((tx, rx): (mpsc::Sender<T>, mpsc::Receiver<T>)) -> (Sender<T>, Receiver<T>)
{
let (tx_ctl, rx_ctl) = ctl_pair();

let tx = Sender {
tx: tx,
tx: StdSender::Unbounded(tx),
ctl: tx_ctl,
};

let rx = Receiver {
rx: rx,
ctl: rx_ctl,
};

(tx, rx)
}

pub fn from_std_sync_channel<T>((tx, rx): (mpsc::SyncSender<T>, mpsc::Receiver<T>)) -> (Sender<T>, Receiver<T>)
{
let (tx_ctl, rx_ctl) = ctl_pair();

let tx = Sender {
tx: StdSender::Bounded(tx),
ctl: tx_ctl,
};

Expand Down Expand Up @@ -66,25 +102,27 @@ pub fn ctl_pair() -> (SenderCtl, ReceiverCtl) {

impl SenderCtl {
/// Call to track that a message has been sent
pub fn inc(&self) {
pub fn inc(&self) -> io::Result<()> {
if 0 == self.inner.pending.fetch_add(1, Ordering::Acquire) {
// Toggle readiness to readable
if let Some(set_readiness) = self.inner.set_readiness.as_ref() {
// TODO: Don't ignore result
let _ = set_readiness.set_readiness(EventSet::readable());
try!(set_readiness.set_readiness(EventSet::readable()));
}
}

Ok(())
}
}

impl ReceiverCtl {
pub fn dec(&self) {
pub fn dec(&self) -> io::Result<()> {
let first = self.inner.pending.load(Ordering::Acquire);

if first == 1 {
// Unset readiness
if let Some(set_readiness) = self.inner.set_readiness.as_ref() {
let _ = set_readiness.set_readiness(EventSet::none());
try!(set_readiness.set_readiness(EventSet::none()));
}
}

Expand All @@ -95,9 +133,11 @@ impl ReceiverCtl {
// There are still pending messages. Since readiness was
// previously unset, it must be reset here
if let Some(set_readiness) = self.inner.set_readiness.as_ref() {
let _ = set_readiness.set_readiness(EventSet::none());
try!(set_readiness.set_readiness(EventSet::none()));
}
}

Ok(())
}
}

Expand Down Expand Up @@ -137,17 +177,60 @@ impl Evented for ReceiverCtl {
}

impl<T> Sender<T> {
pub fn send(&self, t: T) -> Result<(), mpsc::SendError<T>> {
self.tx.send(t).map(|_| self.ctl.inc())
pub fn send(&self, t: T) -> Result<(), SendError<T>> {
self.tx.send(t).and_then(|_| {
try!(self.ctl.inc());
Ok(())
})
}

pub fn try_send(&self, t: T) -> Result<(), TrySendError<T>> {
self.tx.try_send(t).and_then(|_| {
try!(self.ctl.inc());
Ok(())
})
}
}

impl<T> Receiver<T> {
impl<T> Clone for Sender<T> {
fn clone(&self) -> Sender<T> {
Sender {
tx: self.tx.clone(),
ctl: self.ctl.clone(),
}
}
}

impl<T> StdSender<T> {
pub fn send(&self, t: T) -> Result<(), SendError<T>> {
match *self {
StdSender::Bounded(ref tx) => tx.send(t).map_err(SendError::from),
StdSender::Unbounded(ref tx) => tx.send(t).map_err(SendError::from),
}
}

pub fn try_send(&self, t: T) -> Result<(), TrySendError<T>> {
match *self {
StdSender::Bounded(ref tx) => tx.try_send(t).map_err(TrySendError::from),
StdSender::Unbounded(ref tx) => tx.send(t).map_err(TrySendError::from),
}
}
}

impl<T> Clone for StdSender<T> {
fn clone(&self) -> StdSender<T> {
match *self {
StdSender::Bounded(ref v) => StdSender::Bounded(v.clone()),
StdSender::Unbounded(ref v) => StdSender::Unbounded(v.clone()),
}
}
}

impl<T> Receiver<T> {
pub fn try_recv(&self) -> Result<T, mpsc::TryRecvError> {
// TODO: When there is no value to recv, unset readiness
self.rx.try_recv().and_then(|res| {
self.ctl.dec();
let _ = self.ctl.dec();
Ok(res)
})
}
Expand All @@ -166,3 +249,36 @@ impl<T> Evented for Receiver<T> {
self.ctl.deregister(poll)
}
}

impl<T> From<mpsc::SendError<T>> for SendError<T> {
fn from(src: mpsc::SendError<T>) -> SendError<T> {
SendError::Disconnected(src.0)
}
}

impl<T> From<io::Error> for SendError<T> {
fn from(src: io::Error) -> SendError<T> {
SendError::Io(src)
}
}

impl<T> From<mpsc::TrySendError<T>> for TrySendError<T> {
fn from(src: mpsc::TrySendError<T>) -> TrySendError<T> {
match src {
mpsc::TrySendError::Full(v) => TrySendError::Full(v),
mpsc::TrySendError::Disconnected(v) => TrySendError::Disconnected(v),
}
}
}

impl<T> From<mpsc::SendError<T>> for TrySendError<T> {
fn from(src: mpsc::SendError<T>) -> TrySendError<T> {
TrySendError::Disconnected(src.0)
}
}

impl<T> From<io::Error> for TrySendError<T> {
fn from(src: io::Error) -> TrySendError<T> {
TrySendError::Io(src)
}
}
Loading

0 comments on commit 88494d5

Please sign in to comment.