Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Custom readiness queue #371

Merged
merged 1 commit into from
May 31, 2016
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
282 changes: 282 additions & 0 deletions src/channel.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,282 @@
#![allow(unused_imports)]

use {io, Evented, EventSet, Poll, PollOpt, Registration, SetReadiness, Token};
use lazy::{Lazy, AtomicLazy};
use std::sync::{mpsc, Arc};
use std::sync::atomic::{AtomicUsize, Ordering};

#[derive(Clone)]
pub struct SenderCtl {
inner: Arc<Inner>,
}

pub struct ReceiverCtl {
registration: Lazy<Registration>,
inner: Arc<Inner>,
}

pub struct Sender<T> {
tx: StdSender<T>,
ctl: SenderCtl,
}

enum StdSender<T> {
Bounded(mpsc::SyncSender<T>),
Unbounded(mpsc::Sender<T>),
}

pub struct Receiver<T> {
rx: mpsc::Receiver<T>,
ctl: ReceiverCtl,
}

#[derive(Debug)]
pub enum SendError<T> {
Io(io::Error),
Disconnected(T),
}

#[derive(Debug)]
pub enum TrySendError<T> {
Io(io::Error),
Full(T),
Disconnected(T),
}

struct Inner {
pending: AtomicUsize,
set_readiness: AtomicLazy<SetReadiness>,
}

pub fn from_std_channel<T>((tx, rx): (mpsc::Sender<T>, mpsc::Receiver<T>)) -> (Sender<T>, Receiver<T>)
{
let (tx_ctl, rx_ctl) = ctl_pair();

let tx = Sender {
tx: StdSender::Unbounded(tx),
ctl: tx_ctl,
};

let rx = Receiver {
rx: rx,
ctl: rx_ctl,
};

(tx, rx)
}

pub fn from_std_sync_channel<T>((tx, rx): (mpsc::SyncSender<T>, mpsc::Receiver<T>)) -> (Sender<T>, Receiver<T>)
{
let (tx_ctl, rx_ctl) = ctl_pair();

let tx = Sender {
tx: StdSender::Bounded(tx),
ctl: tx_ctl,
};

let rx = Receiver {
rx: rx,
ctl: rx_ctl,
};

(tx, rx)
}

pub fn ctl_pair() -> (SenderCtl, ReceiverCtl) {
let inner = Arc::new(Inner {
pending: AtomicUsize::new(0),
set_readiness: AtomicLazy::new(),
});

let tx = SenderCtl {
inner: inner.clone(),
};

let rx = ReceiverCtl {
registration: Lazy::new(),
inner: inner,
};

(tx, rx)
}

impl SenderCtl {
/// Call to track that a message has been sent
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe worth documenting what can be done in the case of an error? E.g. the readiness is not set, but the increment still happened

pub fn inc(&self) -> io::Result<()> {
if 0 == self.inner.pending.fetch_add(1, Ordering::Acquire) {
// Toggle readiness to readable
if let Some(set_readiness) = self.inner.set_readiness.as_ref() {
try!(set_readiness.set_readiness(EventSet::readable()));
}
}

Ok(())
}
}

impl ReceiverCtl {
pub fn dec(&self) -> io::Result<()> {
let first = self.inner.pending.load(Ordering::Acquire);

if first == 1 {
// Unset readiness
if let Some(set_readiness) = self.inner.set_readiness.as_ref() {
try!(set_readiness.set_readiness(EventSet::none()));
}
}

// Decrement
let second = self.inner.pending.fetch_sub(1, Ordering::AcqRel);

if first == 1 && second > 0 {
// There are still pending messages. Since readiness was
// previously unset, it must be reset here
if let Some(set_readiness) = self.inner.set_readiness.as_ref() {
try!(set_readiness.set_readiness(EventSet::none()));
}
}

Ok(())
}
}

impl Evented for ReceiverCtl {
fn register(&self, poll: &Poll, token: Token, interest: EventSet, opts: PollOpt) -> io::Result<()> {
if self.registration.is_some() {
return Err(io::Error::new(io::ErrorKind::Other, "receiver already registered"));
}

let (registration, set_readiness) = Registration::new(poll, token, interest, opts);


if self.inner.pending.load(Ordering::Relaxed) > 0 {
// TODO: Don't drop readiness
let _ = set_readiness.set_readiness(EventSet::readable());
}

self.registration.set(registration).ok().expect("unexpected state encountered");
self.inner.set_readiness.set(set_readiness).ok().expect("unexpected state encountered");

Ok(())
}

fn reregister(&self, poll: &Poll, token: Token, interest: EventSet, opts: PollOpt) -> io::Result<()> {
match self.registration.as_ref() {
Some(registration) => registration.update(poll, token, interest, opts),
None => Err(io::Error::new(io::ErrorKind::Other, "receiver not registered")),
}
}

fn deregister(&self, poll: &Poll) -> io::Result<()> {
match self.registration.as_ref() {
Some(registration) => registration.deregister(poll),
None => Err(io::Error::new(io::ErrorKind::Other, "receiver not registered")),
}
}
}

impl<T> Sender<T> {
pub fn send(&self, t: T) -> Result<(), SendError<T>> {
self.tx.send(t).and_then(|_| {
try!(self.ctl.inc());
Ok(())
})
}

pub fn try_send(&self, t: T) -> Result<(), TrySendError<T>> {
self.tx.try_send(t).and_then(|_| {
try!(self.ctl.inc());
Ok(())
})
}
}

impl<T> Clone for Sender<T> {
fn clone(&self) -> Sender<T> {
Sender {
tx: self.tx.clone(),
ctl: self.ctl.clone(),
}
}
}

impl<T> StdSender<T> {
pub fn send(&self, t: T) -> Result<(), SendError<T>> {
match *self {
StdSender::Bounded(ref tx) => tx.send(t).map_err(SendError::from),
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Shouldn't this be using try_send?

StdSender::Unbounded(ref tx) => tx.send(t).map_err(SendError::from),
}
}

pub fn try_send(&self, t: T) -> Result<(), TrySendError<T>> {
match *self {
StdSender::Bounded(ref tx) => tx.try_send(t).map_err(TrySendError::from),
StdSender::Unbounded(ref tx) => tx.send(t).map_err(TrySendError::from),
}
}
}

impl<T> Clone for StdSender<T> {
fn clone(&self) -> StdSender<T> {
match *self {
StdSender::Bounded(ref v) => StdSender::Bounded(v.clone()),
StdSender::Unbounded(ref v) => StdSender::Unbounded(v.clone()),
}
}
}

impl<T> Receiver<T> {
pub fn try_recv(&self) -> Result<T, mpsc::TryRecvError> {
self.rx.try_recv().and_then(|res| {
let _ = self.ctl.dec();
Ok(res)
})
}
}

impl<T> Evented for Receiver<T> {
fn register(&self, poll: &Poll, token: Token, interest: EventSet, opts: PollOpt) -> io::Result<()> {
self.ctl.register(poll, token, interest, opts)
}

fn reregister(&self, poll: &Poll, token: Token, interest: EventSet, opts: PollOpt) -> io::Result<()> {
self.ctl.reregister(poll, token, interest, opts)
}

fn deregister(&self, poll: &Poll) -> io::Result<()> {
self.ctl.deregister(poll)
}
}

impl<T> From<mpsc::SendError<T>> for SendError<T> {
fn from(src: mpsc::SendError<T>) -> SendError<T> {
SendError::Disconnected(src.0)
}
}

impl<T> From<io::Error> for SendError<T> {
fn from(src: io::Error) -> SendError<T> {
SendError::Io(src)
}
}

impl<T> From<mpsc::TrySendError<T>> for TrySendError<T> {
fn from(src: mpsc::TrySendError<T>) -> TrySendError<T> {
match src {
mpsc::TrySendError::Full(v) => TrySendError::Full(v),
mpsc::TrySendError::Disconnected(v) => TrySendError::Disconnected(v),
}
}
}

impl<T> From<mpsc::SendError<T>> for TrySendError<T> {
fn from(src: mpsc::SendError<T>) -> TrySendError<T> {
TrySendError::Disconnected(src.0)
}
}

impl<T> From<io::Error> for TrySendError<T> {
fn from(src: io::Error) -> TrySendError<T> {
TrySendError::Io(src)
}
}
43 changes: 41 additions & 2 deletions src/event.rs
Original file line number Diff line number Diff line change
Expand Up @@ -160,6 +160,12 @@ impl EventSet {
EventSet(0x008)
}

// Private
#[inline]
fn drop() -> EventSet {
EventSet(0x10)
}

#[inline]
pub fn all() -> EventSet {
EventSet::readable() |
Expand All @@ -170,7 +176,7 @@ impl EventSet {

#[inline]
pub fn is_none(&self) -> bool {
self.0 == 0
(*self & !EventSet::drop()) == EventSet::none()
}

#[inline]
Expand Down Expand Up @@ -266,7 +272,10 @@ impl fmt::Debug for EventSet {
(EventSet::readable(), "Readable"),
(EventSet::writable(), "Writable"),
(EventSet::error(), "Error"),
(EventSet::hup(), "Hup")];
(EventSet::hup(), "Hup"),
(EventSet::drop(), "Drop")];

try!(write!(fmt, "EventSet {{"));

for &(flag, msg) in &flags {
if self.contains(flag) {
Expand All @@ -277,6 +286,8 @@ impl fmt::Debug for EventSet {
}
}

try!(write!(fmt, "}}"));

Ok(())
}
}
Expand Down Expand Up @@ -312,6 +323,34 @@ impl Event {
}
}

/*
*
* ===== Mio internal helpers =====
*
*/

pub fn as_usize(events: EventSet) -> usize {
events.0
}

pub fn from_usize(events: usize) -> EventSet {
EventSet(events)
}

/// Returns true if the `EventSet` does not have any public OR private flags
/// set.
pub fn is_empty(events: EventSet) -> bool {
events.0 == 0
}

pub fn is_drop(events: EventSet) -> bool {
events.contains(EventSet::drop())
}

pub fn drop() -> EventSet {
EventSet::drop()
}

// Used internally to mutate an `Event` in place
// Not used on all platforms
#[allow(dead_code)]
Expand Down
Loading