Skip to content

Commit

Permalink
update signal driver
Browse files Browse the repository at this point in the history
  • Loading branch information
seanmonstar committed Sep 23, 2020
1 parent 2ba0c30 commit f823cf5
Show file tree
Hide file tree
Showing 7 changed files with 57 additions and 51 deletions.
22 changes: 19 additions & 3 deletions tokio/src/io/poll_evented.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
use crate::io::driver::{Direction, ReadyEvent};
use crate::io::driver::{Direction, Handle, ReadyEvent};
use crate::io::registration::Registration;
use crate::io::{AsyncRead, AsyncWrite, ReadBuf};

Expand Down Expand Up @@ -90,6 +90,7 @@ where
/// The runtime is usually set implicitly when this function is called
/// from a future driven by a tokio runtime, otherwise runtime can be set
/// explicitly with [`Runtime::enter`](crate::runtime::Runtime::enter) function.
#[cfg_attr(feature = "signal", allow(unused))]
pub(crate) fn new(io: E) -> io::Result<Self> {
PollEvented::new_with_ready(io, mio::Ready::all())
}
Expand Down Expand Up @@ -118,8 +119,17 @@ where
/// The runtime is usually set implicitly when this function is called
/// from a future driven by a tokio runtime, otherwise runtime can be set
/// explicitly with [`Runtime::enter`](crate::runtime::Runtime::enter) function.
#[cfg_attr(feature = "signal", allow(unused))]
pub(crate) fn new_with_ready(io: E, ready: mio::Ready) -> io::Result<Self> {
let registration = Registration::new_with_ready(&io, ready)?;
Self::new_with_ready_and_handle(io, ready, Handle::current())
}

pub(crate) fn new_with_ready_and_handle(
io: E,
ready: mio::Ready,
handle: Handle,
) -> io::Result<Self> {
let registration = Registration::new_with_ready_and_handle(&io, ready, handle)?;
Ok(Self {
io: Some(io),
registration,
Expand All @@ -128,7 +138,13 @@ where

/// Returns a shared reference to the underlying I/O object this readiness
/// stream is wrapping.
#[cfg(any(feature = "process", feature = "tcp", feature = "udp", feature = "uds",))]
#[cfg(any(
feature = "process",
feature = "tcp",
feature = "udp",
feature = "uds",
feature = "signal"
))]
pub(crate) fn get_ref(&self) -> &E {
self.io.as_ref().unwrap()
}
Expand Down
19 changes: 1 addition & 18 deletions tokio/src/io/registration.rs
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ cfg_io_driver! {
/// [`poll_read_ready`]: method@Self::poll_read_ready`
/// [`poll_write_ready`]: method@Self::poll_write_ready`
#[derive(Debug)]
pub(super) struct Registration {
pub(crate) struct Registration {
/// Handle to the associated driver.
handle: Handle,

Expand Down Expand Up @@ -74,23 +74,6 @@ impl Registration {
///
/// - `Ok` if the registration happened successfully
/// - `Err` if an error was encountered during registration
///
///
/// # Panics
///
/// This function panics if thread-local runtime is not set.
///
/// The runtime is usually set implicitly when this function is called
/// from a future driven by a tokio runtime, otherwise runtime can be set
/// explicitly with [`Runtime::enter`](crate::runtime::Runtime::enter) function.
pub(super) fn new_with_ready<T>(io: &T, ready: mio::Ready) -> io::Result<Registration>
where
T: Evented,
{
Self::new_with_ready_and_handle(io, ready, Handle::current())
}

/// Same as `new_with_ready` but also accepts an explicit handle.
pub(crate) fn new_with_ready_and_handle<T>(
io: &T,
ready: mio::Ready,
Expand Down
9 changes: 9 additions & 0 deletions tokio/src/macros/cfg.rs
Original file line number Diff line number Diff line change
Expand Up @@ -134,6 +134,15 @@ macro_rules! cfg_io_driver {
}
}

macro_rules! cfg_not_io_driver {
($($item:item)*) => {
$(
#[cfg(not(feature = "io-driver"))]
$item
)*
}
}

macro_rules! cfg_io_readiness {
($($item:item)*) => {
$(
Expand Down
5 changes: 0 additions & 5 deletions tokio/src/net/unix/datagram/socket.rs
Original file line number Diff line number Diff line change
Expand Up @@ -705,11 +705,6 @@ impl TryFrom<UnixDatagram> for mio_uds::UnixDatagram {
type Error = io::Error;

/// Consumes value, returning the mio I/O object.
///
/// See [`PollEvented::into_inner`] for more details about
/// resource deregistration that happens during the call.
///
/// [`PollEvented::into_inner`]: crate::io::PollEvented::into_inner
fn try_from(value: UnixDatagram) -> Result<Self, Self::Error> {
value.io.into_inner()
}
Expand Down
5 changes: 0 additions & 5 deletions tokio/src/net/unix/listener.rs
Original file line number Diff line number Diff line change
Expand Up @@ -192,11 +192,6 @@ impl TryFrom<UnixListener> for mio_uds::UnixListener {
type Error = io::Error;

/// Consumes value, returning the mio I/O object.
///
/// See [`PollEvented::into_inner`] for more details about
/// resource deregistration that happens during the call.
///
/// [`PollEvented::into_inner`]: crate::io::PollEvented::into_inner
fn try_from(value: UnixListener) -> Result<Self, Self::Error> {
value.io.into_inner()
}
Expand Down
5 changes: 0 additions & 5 deletions tokio/src/net/unix/stream.rs
Original file line number Diff line number Diff line change
Expand Up @@ -143,11 +143,6 @@ impl TryFrom<UnixStream> for mio_uds::UnixStream {
type Error = io::Error;

/// Consumes value, returning the mio I/O object.
///
/// See [`PollEvented::into_inner`] for more details about
/// resource deregistration that happens during the call.
///
/// [`PollEvented::into_inner`]: crate::io::PollEvented::into_inner
fn try_from(value: UnixStream) -> Result<Self, Self::Error> {
value.io.into_inner()
}
Expand Down
43 changes: 28 additions & 15 deletions tokio/src/signal/unix/driver.rs
Original file line number Diff line number Diff line change
@@ -1,13 +1,15 @@
//! Signal driver
use crate::io::driver::Driver as IoDriver;
use crate::io::Registration;
use crate::io::PollEvented;
use crate::park::Park;
use crate::runtime::context;
use crate::signal::registry::globals;
use mio_uds::UnixStream;
use std::io::{self, Read};
use std::ptr;
use std::sync::{Arc, Weak};
use std::task::{Context, Poll, RawWaker, RawWakerVTable, Waker};
use std::time::Duration;

/// Responsible for registering wakeups when an OS signal is received, and
Expand All @@ -21,11 +23,7 @@ pub(crate) struct Driver {
park: IoDriver,

/// A pipe for receiving wake events from the signal handler
receiver: UnixStream,

/// The actual registraiton for `receiver` when active.
/// Lazily bound at the first signal registration.
registration: Registration,
receiver: PollEvented<UnixStream>,

/// Shared state
inner: Arc<Inner>,
Expand Down Expand Up @@ -58,13 +56,12 @@ impl Driver {
// either, since we can't compare Handles or assume they will always
// point to the exact same reactor.
let receiver = globals().receiver.try_clone()?;
let registration =
Registration::new_with_ready_and_handle(&receiver, mio::Ready::all(), park.handle())?;
let receiver =
PollEvented::new_with_ready_and_handle(receiver, mio::Ready::all(), park.handle())?;

Ok(Self {
park,
receiver,
registration,
inner: Arc::new(Inner(())),
})
}
Expand All @@ -79,29 +76,45 @@ impl Driver {

fn process(&self) {
// Check if the pipe is ready to read and therefore has "woken" us up
match self.registration.take_read_ready() {
Ok(Some(ready)) => assert!(ready.is_readable()),
Ok(None) => return, // No wake has arrived, bail
Err(e) => panic!("reactor gone: {}", e),
}
//
// To do so, we will `poll_read_ready` with a noop waker, since we don't
// need to actually be notified when read ready...
let waker = unsafe { Waker::from_raw(RawWaker::new(ptr::null(), &NOOP_WAKER_VTABLE)) };
let mut cx = Context::from_waker(&waker);

let ev = match self.receiver.poll_read_ready(&mut cx) {
Poll::Ready(Ok(ev)) => ev,
Poll::Ready(Err(e)) => panic!("reactor gone: {}", e),
Poll::Pending => return, // No wake has arrived, bail
};

// Drain the pipe completely so we can receive a new readiness event
// if another signal has come in.
let mut buf = [0; 128];
loop {
match (&self.receiver).read(&mut buf) {
match self.receiver.get_ref().read(&mut buf) {
Ok(0) => panic!("EOF on self-pipe"),
Ok(_) => continue, // Keep reading
Err(e) if e.kind() == io::ErrorKind::WouldBlock => break,
Err(e) => panic!("Bad read on self-pipe: {}", e),
}
}

self.receiver.clear_readiness(ev);

// Broadcast any signals which were received
globals().broadcast();
}
}

const NOOP_WAKER_VTABLE: RawWakerVTable = RawWakerVTable::new(noop_clone, noop, noop, noop);

unsafe fn noop_clone(_data: *const ()) -> RawWaker {
RawWaker::new(ptr::null(), &NOOP_WAKER_VTABLE)
}

unsafe fn noop(_data: *const ()) {}

// ===== impl Park for Driver =====

impl Park for Driver {
Expand Down

0 comments on commit f823cf5

Please sign in to comment.