diff --git a/Cargo.toml b/Cargo.toml index f5459bf84..ac7d351e8 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -23,8 +23,8 @@ test = [] [dependencies] crossbeam-channel = "0.3.6" -futures-core-preview = "0.3.0-alpha.12" -futures-io-preview = "0.3.0-alpha.12" +futures-core-preview = "0.3.0-alpha.13" +futures-io-preview = "0.3.0-alpha.13" log = "0.4.6" mio-st = "0.2.1" num_cpus = "1.8.0" @@ -32,6 +32,6 @@ slab = "0.4.0" std-logger = "0.3.2" [dev-dependencies] -futures-test-preview = "0.3.0-alpha.12" -futures-util-preview = "0.3.0-alpha.12" +futures-test-preview = "0.3.0-alpha.13" +futures-util-preview = "0.3.0-alpha.13" lazy_static = "1.1.0" diff --git a/src/actor/context.rs b/src/actor/context.rs index aad208ca9..ff8073f30 100644 --- a/src/actor/context.rs +++ b/src/actor/context.rs @@ -3,7 +3,7 @@ use std::future::Future; use std::ops::DerefMut; use std::pin::Pin; -use std::task::{LocalWaker, Poll}; +use std::task::{Waker, Poll}; use crate::actor_ref::{ActorRef, Local}; use crate::mailbox::MailBox; @@ -245,7 +245,7 @@ impl<'ctx, M, S> Future for ReceiveMessage<'ctx, M, S> { type Output = M; - fn poll(mut self: Pin<&mut Self>, _waker: &LocalWaker) -> Poll { + fn poll(mut self: Pin<&mut Self>, _waker: &Waker) -> Poll { let ReceiveMessage { ref mut inbox, ref mut selector, diff --git a/src/actor/mod.rs b/src/actor/mod.rs index 95a540d5f..34af2e42b 100644 --- a/src/actor/mod.rs +++ b/src/actor/mod.rs @@ -37,7 +37,7 @@ use std::fmt; use std::future::Future; use std::pin::Pin; -use std::task::{LocalWaker, Poll}; +use std::task::{Waker, Poll}; mod context; @@ -269,7 +269,7 @@ pub trait Actor { /// /// Just like with [`Future`]s polling after it returned [`Poll::Ready`] may /// cause undefined behaviour, including but not limited to panicking. - fn try_poll(self: Pin<&mut Self>, waker: &LocalWaker) -> Poll>; + fn try_poll(self: Pin<&mut Self>, waker: &Waker) -> Poll>; } impl Actor for Fut @@ -277,7 +277,7 @@ impl Actor for Fut { type Error = E; - fn try_poll(self: Pin<&mut Self>, waker: &LocalWaker) -> Poll> { + fn try_poll(self: Pin<&mut Self>, waker: &Waker) -> Poll> { self.poll(waker) } } diff --git a/src/channel/bounded.rs b/src/channel/bounded.rs index 70ca2f468..33c4fcd03 100644 --- a/src/channel/bounded.rs +++ b/src/channel/bounded.rs @@ -7,7 +7,7 @@ use std::collections::VecDeque; use std::future::Future; use std::marker::Unpin; use std::pin::Pin; -use std::task::{LocalWaker, Poll}; +use std::task::{Waker, Poll}; use futures_core::stream::Stream; @@ -98,7 +98,7 @@ impl Receiver { impl Stream for Receiver { type Item = T; - fn poll_next(self: Pin<&mut Self>, lw: &LocalWaker) -> Poll> { + fn poll_next(self: Pin<&mut Self>, lw: &Waker) -> Poll> { let this = Pin::get_mut(self); match this.try_receive() { Ok(Some(value)) => Poll::Ready(Some(value)), @@ -122,7 +122,7 @@ pub struct ReceiveOne<'r, T> { impl<'r, T: Unpin> Future for ReceiveOne<'r, T> { type Output = Result; - fn poll(self: Pin<&mut Self>, lw: &LocalWaker) -> Poll { + fn poll(self: Pin<&mut Self>, lw: &Waker) -> Poll { let this = Pin::get_mut(self); match this.inner.try_receive() { Ok(Some(value)) => Poll::Ready(Ok(value)), @@ -142,7 +142,7 @@ struct ChannelInner { values: VecDeque, /// Waker set by calling `Receiver.poll_next` or `ReceiveOne.poll` and /// awoken by `Sender.send`, if set. - waker: Option, + waker: Option, } impl ChannelInner { diff --git a/src/channel/oneshot.rs b/src/channel/oneshot.rs index b278cfcad..b9ffabd2b 100644 --- a/src/channel/oneshot.rs +++ b/src/channel/oneshot.rs @@ -77,7 +77,7 @@ use std::future::Future; use std::marker::Unpin; use std::pin::Pin; -use std::task::{LocalWaker, Poll}; +use std::task::{Waker, Poll}; use crate::channel::{NoReceiver, NoValue}; use crate::util::Shared; @@ -126,7 +126,7 @@ pub struct Receiver { impl Future for Receiver { type Output = Result; - fn poll(self: Pin<&mut Self>, lw: &LocalWaker) -> Poll { + fn poll(self: Pin<&mut Self>, lw: &Waker) -> Poll { let this = Pin::get_mut(self); if let Some(value) = this.inner.borrow_mut().value.take() { return Poll::Ready(Ok(value)); @@ -148,7 +148,7 @@ struct ChannelInner { value: Option, /// Waker possibly set by calling `Receiver.poll` and awoken by /// `Sender.send`, if set. - waker: Option, + waker: Option, } /// Creates a new asynchronous one shot channel, returning the sending and diff --git a/src/channel/unbounded.rs b/src/channel/unbounded.rs index c80918d9d..06cb4f721 100644 --- a/src/channel/unbounded.rs +++ b/src/channel/unbounded.rs @@ -4,7 +4,7 @@ use std::collections::VecDeque; use std::future::Future; use std::marker::Unpin; use std::pin::Pin; -use std::task::{LocalWaker, Poll}; +use std::task::{Waker, Poll}; use futures_core::stream::Stream; @@ -79,7 +79,7 @@ impl Receiver { impl Stream for Receiver { type Item = T; - fn poll_next(self: Pin<&mut Self>, lw: &LocalWaker) -> Poll> { + fn poll_next(self: Pin<&mut Self>, lw: &Waker) -> Poll> { let this = Pin::get_mut(self); match this.try_receive() { Ok(Some(value)) => Poll::Ready(Some(value)), @@ -103,7 +103,7 @@ pub struct ReceiveOne<'r, T> { impl<'r, T: Unpin> Future for ReceiveOne<'r, T> { type Output = Result; - fn poll(self: Pin<&mut Self>, lw: &LocalWaker) -> Poll { + fn poll(self: Pin<&mut Self>, lw: &Waker) -> Poll { let this = Pin::get_mut(self); match this.inner.try_receive() { Ok(Some(value)) => Poll::Ready(Ok(value)), @@ -122,7 +122,7 @@ struct ChannelInner { values: VecDeque, /// Waker set by calling `Receiver.poll_next` or `ReceiveOne.poll` and /// awoken by `Sender.send`, if set. - waker: Option, + waker: Option, } /// Creates a new asynchronous unbounded channel, returning the sending and diff --git a/src/net/tcp.rs b/src/net/tcp.rs index 9458852a7..f1435b303 100644 --- a/src/net/tcp.rs +++ b/src/net/tcp.rs @@ -4,7 +4,7 @@ use std::fmt; use std::io::{self, Read, Write}; use std::net::{Shutdown, SocketAddr}; use std::pin::Pin; -use std::task::{LocalWaker, Poll}; +use std::task::{Waker, Poll}; use futures_io::{AsyncRead, AsyncWrite, Initializer}; use log::debug; @@ -220,7 +220,7 @@ impl Actor for TcpListener { type Error = TcpListenerError; - fn try_poll(self: Pin<&mut Self>, _waker: &LocalWaker) -> Poll> { + fn try_poll(self: Pin<&mut Self>, _waker: &Waker) -> Poll> { // This is safe because only the `ActorSystemRef`, `MioTcpListener` and // the `MailBox` are mutably borrowed and all are `Unpin`. let &mut TcpListener { @@ -407,7 +407,7 @@ impl TcpStream { /// connected, without removing that data from the queue. On success, /// returns the number of bytes peeked. Successive calls return the same /// data. - pub fn poll_peek(&mut self, _waker: &LocalWaker, buf: &mut [u8]) -> Poll> { + pub fn poll_peek(&mut self, _waker: &Waker, buf: &mut [u8]) -> Poll> { try_io!(self.inner.peek(buf)) } @@ -435,21 +435,21 @@ impl AsyncRead for TcpStream { Initializer::nop() } - fn poll_read(&mut self, _waker: &LocalWaker, buf: &mut [u8]) -> Poll> { + fn poll_read(&mut self, _waker: &Waker, buf: &mut [u8]) -> Poll> { try_io!(self.inner.read(buf)) } } impl AsyncWrite for TcpStream { - fn poll_write(&mut self, _waker: &LocalWaker, buf: &[u8]) -> Poll> { + fn poll_write(&mut self, _waker: &Waker, buf: &[u8]) -> Poll> { try_io!(self.inner.write(buf)) } - fn poll_flush(&mut self, _waker: &LocalWaker) -> Poll> { + fn poll_flush(&mut self, _waker: &Waker) -> Poll> { try_io!(self.inner.flush()) } - fn poll_close(&mut self, waker: &LocalWaker) -> Poll> { + fn poll_close(&mut self, waker: &Waker) -> Poll> { self.poll_flush(waker) } } diff --git a/src/net/udp.rs b/src/net/udp.rs index e79b0356e..e0a1031d9 100644 --- a/src/net/udp.rs +++ b/src/net/udp.rs @@ -2,7 +2,7 @@ use std::io; use std::net::SocketAddr; -use std::task::{LocalWaker, Poll}; +use std::task::{Waker, Poll}; use mio_st::net::{ConnectedUdpSocket as MioConnectedUdpSocket, UdpSocket as MioUdpSocket}; use mio_st::poll::PollOption; @@ -40,19 +40,19 @@ impl ConnectedUdpSocket { /// Sends data on the socket to the connected socket. On success, returns /// the number of bytes written. - pub fn poll_send(&mut self, _waker: &LocalWaker, buf: &[u8]) -> Poll> { + pub fn poll_send(&mut self, _waker: &Waker, buf: &[u8]) -> Poll> { try_io!(self.socket.send(buf)) } /// Receives data from the socket. On success, returns the number of bytes /// read. - pub fn poll_recv(&mut self, _waker: &LocalWaker, buf: &mut [u8]) -> Poll> { + pub fn poll_recv(&mut self, _waker: &Waker, buf: &mut [u8]) -> Poll> { try_io!(self.socket.recv(buf)) } /// Receives data from the socket, without removing it from the input queue. /// On success, returns the number of bytes read. - pub fn poll_peek(&mut self, _waker: &LocalWaker, buf: &mut [u8]) -> Poll> { + pub fn poll_peek(&mut self, _waker: &Waker, buf: &mut [u8]) -> Poll> { try_io!(self.socket.peek(buf)) } @@ -102,20 +102,20 @@ impl UdpSocket { /// Sends data to the given `target` address. On success, returns the number /// of bytes written. - pub fn poll_send_to(&mut self, _waker: &LocalWaker, buf: &[u8], target: SocketAddr) -> Poll> { + pub fn poll_send_to(&mut self, _waker: &Waker, buf: &[u8], target: SocketAddr) -> Poll> { try_io!(self.socket.send_to(buf, target)) } /// Receives data from the socket. On success, returns the number of bytes /// read and the address from whence the data came. - pub fn poll_recv_from(&mut self, _waker: &LocalWaker, buf: &mut [u8]) -> Poll> { + pub fn poll_recv_from(&mut self, _waker: &Waker, buf: &mut [u8]) -> Poll> { try_io!(self.socket.recv_from(buf)) } /// Receives data from the socket, without removing it from the input queue. /// On success, returns the number of bytes read and the address from whence /// the data came. - pub fn poll_peek_from(&mut self, _waker: &LocalWaker, buf: &mut [u8]) -> Poll> { + pub fn poll_peek_from(&mut self, _waker: &Waker, buf: &mut [u8]) -> Poll> { try_io!(self.socket.peek_from(buf)) } diff --git a/src/scheduler/mod.rs b/src/scheduler/mod.rs index 4c496a8bb..1061369d2 100644 --- a/src/scheduler/mod.rs +++ b/src/scheduler/mod.rs @@ -4,7 +4,7 @@ use std::cell::RefMut; use std::collections::BinaryHeap; use std::mem; use std::pin::Pin; -use std::task::LocalWaker; +use std::task::Waker; use log::{debug, trace}; use slab::Slab; @@ -145,7 +145,7 @@ impl<'s> AddingProcess<'s> { /// Add a new inactive actor process to the scheduler. pub fn add_actor(self, priority: Priority, supervisor: S, new_actor: NA, actor: NA::Actor, mailbox: Shared>, - waker: LocalWaker + waker: Waker ) where S: Supervisor<::Error, NA::Argument> + 'static, NA: NewActor + 'static, diff --git a/src/scheduler/process/actor.rs b/src/scheduler/process/actor.rs index 50dbd18b8..c6fbfffc2 100644 --- a/src/scheduler/process/actor.rs +++ b/src/scheduler/process/actor.rs @@ -2,7 +2,7 @@ use std::fmt; use std::pin::Pin; -use std::task::{LocalWaker, Poll}; +use std::task::{Waker, Poll}; use std::time::{Duration, Instant}; use log::{error, trace}; @@ -26,14 +26,14 @@ pub struct ActorProcess { /// is restarted. inbox: Shared>, /// Waker used in the futures context. - waker: LocalWaker, + waker: Waker, } impl ActorProcess { /// Create a new `ActorProcess`. pub(crate) const fn new(id: ProcessId, priority: Priority, supervisor: S, new_actor: NA, actor: NA::Actor, inbox: Shared>, - waker: LocalWaker + waker: Waker ) -> ActorProcess { ActorProcess { id, diff --git a/src/timer.rs b/src/timer.rs index 5f2ca77d1..f8884558c 100644 --- a/src/timer.rs +++ b/src/timer.rs @@ -14,7 +14,7 @@ use std::future::Future; use std::pin::Pin; -use std::task::{LocalWaker, Poll}; +use std::task::{Waker, Poll}; use std::time::{Duration, Instant}; use futures_core::stream::{FusedStream, Stream}; @@ -86,7 +86,7 @@ impl Timer { impl Future for Timer { type Output = DeadlinePassed; - fn poll(self: Pin<&mut Self>, _waker: &LocalWaker) -> Poll { + fn poll(self: Pin<&mut Self>, _waker: &Waker) -> Poll { if self.deadline <= Instant::now() { Poll::Ready(DeadlinePassed) } else { @@ -110,7 +110,7 @@ impl Future for Timer { /// /// # use std::future::Future; /// # use std::pin::Pin; -/// # use std::task::{LocalWaker, Poll}; +/// # use std::task::{Waker, Poll}; /// use std::time::Duration; /// /// use heph::actor::Context; @@ -120,7 +120,7 @@ impl Future for Timer { /// # /// # impl Future for OtherFuture { /// # type Output = (); -/// # fn poll(self: Pin<&mut Self>, _waker: &LocalWaker) -> Poll { +/// # fn poll(self: Pin<&mut Self>, _waker: &Waker) -> Poll { /// # Poll::Pending /// # } /// # } @@ -176,7 +176,7 @@ impl Future for Deadline { type Output = Result; - fn poll(self: Pin<&mut Self>, waker: &LocalWaker) -> Poll { + fn poll(self: Pin<&mut Self>, waker: &Waker) -> Poll { if self.deadline <= Instant::now() { Poll::Ready(Err(DeadlinePassed)) } else { @@ -261,7 +261,7 @@ impl Interval { impl Stream for Interval { type Item = DeadlinePassed; - fn poll_next(self: Pin<&mut Self>, _waker: &LocalWaker) -> Poll> { + fn poll_next(self: Pin<&mut Self>, _waker: &Waker) -> Poll> { if self.deadline <= Instant::now() { // Determine the next deadline. let next_deadline = Instant::now() + self.interval; diff --git a/src/waker.rs b/src/waker.rs index 0df8c62ca..96363d9ae 100644 --- a/src/waker.rs +++ b/src/waker.rs @@ -1,33 +1,14 @@ //! Module containing the futures `Wake` implementation. -use std::sync::Arc; -use std::task::{local_waker_from_nonlocal, LocalWaker, Wake}; +use std::task::Waker; use crossbeam_channel::Sender; use crate::scheduler::ProcessId; -/// Create a new `LocalWaker`. +/// Create a new `Waker`. /// /// The implementation will send `ProcessId` into the `sender` channel. -pub fn new_waker(pid: ProcessId, sender: Sender) -> LocalWaker { - let waker = Arc::new(Waker { - pid, - sender, - }); - local_waker_from_nonlocal(waker) -} - -/// The implementation behind [`new_waker`]. -#[derive(Debug)] -struct Waker { - pid: ProcessId, - sender: Sender, -} - -impl Wake for Waker { - fn wake(arc_self: &Arc) { - // We can't do anything if we encounter an error. - let _ = arc_self.sender.send(arc_self.pid); - } +pub fn new_waker(pid: ProcessId, sender: Sender) -> Waker { + unimplemented!(); }