Skip to content

Commit

Permalink
Update to futures alpha 13
Browse files Browse the repository at this point in the history
This breaks the waker implementation, will be fixed in future commits.

Closes #163.
  • Loading branch information
Thomasdezeeuw committed Feb 23, 2019
1 parent 976ddd0 commit 375ef94
Show file tree
Hide file tree
Showing 12 changed files with 49 additions and 68 deletions.
8 changes: 4 additions & 4 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -23,15 +23,15 @@ test = []

[dependencies]
crossbeam-channel = "0.3.6"
futures-core-preview = "0.3.0-alpha.12"
futures-io-preview = "0.3.0-alpha.12"
futures-core-preview = "0.3.0-alpha.13"
futures-io-preview = "0.3.0-alpha.13"
log = "0.4.6"
mio-st = "0.2.1"
num_cpus = "1.8.0"
slab = "0.4.0"
std-logger = "0.3.2"

[dev-dependencies]
futures-test-preview = "0.3.0-alpha.12"
futures-util-preview = "0.3.0-alpha.12"
futures-test-preview = "0.3.0-alpha.13"
futures-util-preview = "0.3.0-alpha.13"
lazy_static = "1.1.0"
4 changes: 2 additions & 2 deletions src/actor/context.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
use std::future::Future;
use std::ops::DerefMut;
use std::pin::Pin;
use std::task::{LocalWaker, Poll};
use std::task::{Waker, Poll};

use crate::actor_ref::{ActorRef, Local};
use crate::mailbox::MailBox;
Expand Down Expand Up @@ -245,7 +245,7 @@ impl<'ctx, M, S> Future for ReceiveMessage<'ctx, M, S>
{
type Output = M;

fn poll(mut self: Pin<&mut Self>, _waker: &LocalWaker) -> Poll<Self::Output> {
fn poll(mut self: Pin<&mut Self>, _waker: &Waker) -> Poll<Self::Output> {
let ReceiveMessage {
ref mut inbox,
ref mut selector,
Expand Down
6 changes: 3 additions & 3 deletions src/actor/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@
use std::fmt;
use std::future::Future;
use std::pin::Pin;
use std::task::{LocalWaker, Poll};
use std::task::{Waker, Poll};

mod context;

Expand Down Expand Up @@ -269,15 +269,15 @@ pub trait Actor {
///
/// Just like with [`Future`]s polling after it returned [`Poll::Ready`] may
/// cause undefined behaviour, including but not limited to panicking.
fn try_poll(self: Pin<&mut Self>, waker: &LocalWaker) -> Poll<Result<(), Self::Error>>;
fn try_poll(self: Pin<&mut Self>, waker: &Waker) -> Poll<Result<(), Self::Error>>;
}

impl<Fut, E> Actor for Fut
where Fut: Future<Output = Result<(), E>>
{
type Error = E;

fn try_poll(self: Pin<&mut Self>, waker: &LocalWaker) -> Poll<Result<(), Self::Error>> {
fn try_poll(self: Pin<&mut Self>, waker: &Waker) -> Poll<Result<(), Self::Error>> {
self.poll(waker)
}
}
8 changes: 4 additions & 4 deletions src/channel/bounded.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ use std::collections::VecDeque;
use std::future::Future;
use std::marker::Unpin;
use std::pin::Pin;
use std::task::{LocalWaker, Poll};
use std::task::{Waker, Poll};

use futures_core::stream::Stream;

Expand Down Expand Up @@ -98,7 +98,7 @@ impl<T> Receiver<T> {
impl<T: Unpin> Stream for Receiver<T> {
type Item = T;

fn poll_next(self: Pin<&mut Self>, lw: &LocalWaker) -> Poll<Option<Self::Item>> {
fn poll_next(self: Pin<&mut Self>, lw: &Waker) -> Poll<Option<Self::Item>> {
let this = Pin::get_mut(self);
match this.try_receive() {
Ok(Some(value)) => Poll::Ready(Some(value)),
Expand All @@ -122,7 +122,7 @@ pub struct ReceiveOne<'r, T> {
impl<'r, T: Unpin> Future for ReceiveOne<'r, T> {
type Output = Result<T, NoValue>;

fn poll(self: Pin<&mut Self>, lw: &LocalWaker) -> Poll<Self::Output> {
fn poll(self: Pin<&mut Self>, lw: &Waker) -> Poll<Self::Output> {
let this = Pin::get_mut(self);
match this.inner.try_receive() {
Ok(Some(value)) => Poll::Ready(Ok(value)),
Expand All @@ -142,7 +142,7 @@ struct ChannelInner<T> {
values: VecDeque<T>,
/// Waker set by calling `Receiver.poll_next` or `ReceiveOne.poll` and
/// awoken by `Sender.send`, if set.
waker: Option<LocalWaker>,
waker: Option<Waker>,
}

impl<T> ChannelInner<T> {
Expand Down
6 changes: 3 additions & 3 deletions src/channel/oneshot.rs
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,7 @@
use std::future::Future;
use std::marker::Unpin;
use std::pin::Pin;
use std::task::{LocalWaker, Poll};
use std::task::{Waker, Poll};

use crate::channel::{NoReceiver, NoValue};
use crate::util::Shared;
Expand Down Expand Up @@ -126,7 +126,7 @@ pub struct Receiver<T> {
impl<T: Unpin> Future for Receiver<T> {
type Output = Result<T, NoValue>;

fn poll(self: Pin<&mut Self>, lw: &LocalWaker) -> Poll<Self::Output> {
fn poll(self: Pin<&mut Self>, lw: &Waker) -> Poll<Self::Output> {
let this = Pin::get_mut(self);
if let Some(value) = this.inner.borrow_mut().value.take() {
return Poll::Ready(Ok(value));
Expand All @@ -148,7 +148,7 @@ struct ChannelInner<T> {
value: Option<T>,
/// Waker possibly set by calling `Receiver.poll` and awoken by
/// `Sender.send`, if set.
waker: Option<LocalWaker>,
waker: Option<Waker>,
}

/// Creates a new asynchronous one shot channel, returning the sending and
Expand Down
8 changes: 4 additions & 4 deletions src/channel/unbounded.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ use std::collections::VecDeque;
use std::future::Future;
use std::marker::Unpin;
use std::pin::Pin;
use std::task::{LocalWaker, Poll};
use std::task::{Waker, Poll};

use futures_core::stream::Stream;

Expand Down Expand Up @@ -79,7 +79,7 @@ impl<T> Receiver<T> {
impl<T: Unpin> Stream for Receiver<T> {
type Item = T;

fn poll_next(self: Pin<&mut Self>, lw: &LocalWaker) -> Poll<Option<Self::Item>> {
fn poll_next(self: Pin<&mut Self>, lw: &Waker) -> Poll<Option<Self::Item>> {
let this = Pin::get_mut(self);
match this.try_receive() {
Ok(Some(value)) => Poll::Ready(Some(value)),
Expand All @@ -103,7 +103,7 @@ pub struct ReceiveOne<'r, T> {
impl<'r, T: Unpin> Future for ReceiveOne<'r, T> {
type Output = Result<T, NoValue>;

fn poll(self: Pin<&mut Self>, lw: &LocalWaker) -> Poll<Self::Output> {
fn poll(self: Pin<&mut Self>, lw: &Waker) -> Poll<Self::Output> {
let this = Pin::get_mut(self);
match this.inner.try_receive() {
Ok(Some(value)) => Poll::Ready(Ok(value)),
Expand All @@ -122,7 +122,7 @@ struct ChannelInner<T> {
values: VecDeque<T>,
/// Waker set by calling `Receiver.poll_next` or `ReceiveOne.poll` and
/// awoken by `Sender.send`, if set.
waker: Option<LocalWaker>,
waker: Option<Waker>,
}

/// Creates a new asynchronous unbounded channel, returning the sending and
Expand Down
14 changes: 7 additions & 7 deletions src/net/tcp.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ use std::fmt;
use std::io::{self, Read, Write};
use std::net::{Shutdown, SocketAddr};
use std::pin::Pin;
use std::task::{LocalWaker, Poll};
use std::task::{Waker, Poll};

use futures_io::{AsyncRead, AsyncWrite, Initializer};
use log::debug;
Expand Down Expand Up @@ -220,7 +220,7 @@ impl<S, NA> Actor for TcpListener<S, NA>
{
type Error = TcpListenerError<NA::Error>;

fn try_poll(self: Pin<&mut Self>, _waker: &LocalWaker) -> Poll<Result<(), Self::Error>> {
fn try_poll(self: Pin<&mut Self>, _waker: &Waker) -> Poll<Result<(), Self::Error>> {
// This is safe because only the `ActorSystemRef`, `MioTcpListener` and
// the `MailBox` are mutably borrowed and all are `Unpin`.
let &mut TcpListener {
Expand Down Expand Up @@ -407,7 +407,7 @@ impl TcpStream {
/// connected, without removing that data from the queue. On success,
/// returns the number of bytes peeked. Successive calls return the same
/// data.
pub fn poll_peek(&mut self, _waker: &LocalWaker, buf: &mut [u8]) -> Poll<io::Result<usize>> {
pub fn poll_peek(&mut self, _waker: &Waker, buf: &mut [u8]) -> Poll<io::Result<usize>> {
try_io!(self.inner.peek(buf))
}

Expand Down Expand Up @@ -435,21 +435,21 @@ impl AsyncRead for TcpStream {
Initializer::nop()
}

fn poll_read(&mut self, _waker: &LocalWaker, buf: &mut [u8]) -> Poll<io::Result<usize>> {
fn poll_read(&mut self, _waker: &Waker, buf: &mut [u8]) -> Poll<io::Result<usize>> {
try_io!(self.inner.read(buf))
}
}

impl AsyncWrite for TcpStream {
fn poll_write(&mut self, _waker: &LocalWaker, buf: &[u8]) -> Poll<io::Result<usize>> {
fn poll_write(&mut self, _waker: &Waker, buf: &[u8]) -> Poll<io::Result<usize>> {
try_io!(self.inner.write(buf))
}

fn poll_flush(&mut self, _waker: &LocalWaker) -> Poll<io::Result<()>> {
fn poll_flush(&mut self, _waker: &Waker) -> Poll<io::Result<()>> {
try_io!(self.inner.flush())
}

fn poll_close(&mut self, waker: &LocalWaker) -> Poll<io::Result<()>> {
fn poll_close(&mut self, waker: &Waker) -> Poll<io::Result<()>> {
self.poll_flush(waker)
}
}
14 changes: 7 additions & 7 deletions src/net/udp.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@

use std::io;
use std::net::SocketAddr;
use std::task::{LocalWaker, Poll};
use std::task::{Waker, Poll};

use mio_st::net::{ConnectedUdpSocket as MioConnectedUdpSocket, UdpSocket as MioUdpSocket};
use mio_st::poll::PollOption;
Expand Down Expand Up @@ -40,19 +40,19 @@ impl ConnectedUdpSocket {

/// Sends data on the socket to the connected socket. On success, returns
/// the number of bytes written.
pub fn poll_send(&mut self, _waker: &LocalWaker, buf: &[u8]) -> Poll<io::Result<usize>> {
pub fn poll_send(&mut self, _waker: &Waker, buf: &[u8]) -> Poll<io::Result<usize>> {
try_io!(self.socket.send(buf))
}

/// Receives data from the socket. On success, returns the number of bytes
/// read.
pub fn poll_recv(&mut self, _waker: &LocalWaker, buf: &mut [u8]) -> Poll<io::Result<usize>> {
pub fn poll_recv(&mut self, _waker: &Waker, buf: &mut [u8]) -> Poll<io::Result<usize>> {
try_io!(self.socket.recv(buf))
}

/// Receives data from the socket, without removing it from the input queue.
/// On success, returns the number of bytes read.
pub fn poll_peek(&mut self, _waker: &LocalWaker, buf: &mut [u8]) -> Poll<io::Result<usize>> {
pub fn poll_peek(&mut self, _waker: &Waker, buf: &mut [u8]) -> Poll<io::Result<usize>> {
try_io!(self.socket.peek(buf))
}

Expand Down Expand Up @@ -102,20 +102,20 @@ impl UdpSocket {

/// Sends data to the given `target` address. On success, returns the number
/// of bytes written.
pub fn poll_send_to(&mut self, _waker: &LocalWaker, buf: &[u8], target: SocketAddr) -> Poll<io::Result<usize>> {
pub fn poll_send_to(&mut self, _waker: &Waker, buf: &[u8], target: SocketAddr) -> Poll<io::Result<usize>> {
try_io!(self.socket.send_to(buf, target))
}

/// Receives data from the socket. On success, returns the number of bytes
/// read and the address from whence the data came.
pub fn poll_recv_from(&mut self, _waker: &LocalWaker, buf: &mut [u8]) -> Poll<io::Result<(usize, SocketAddr)>> {
pub fn poll_recv_from(&mut self, _waker: &Waker, buf: &mut [u8]) -> Poll<io::Result<(usize, SocketAddr)>> {
try_io!(self.socket.recv_from(buf))
}

/// Receives data from the socket, without removing it from the input queue.
/// On success, returns the number of bytes read and the address from whence
/// the data came.
pub fn poll_peek_from(&mut self, _waker: &LocalWaker, buf: &mut [u8]) -> Poll<io::Result<(usize, SocketAddr)>> {
pub fn poll_peek_from(&mut self, _waker: &Waker, buf: &mut [u8]) -> Poll<io::Result<(usize, SocketAddr)>> {
try_io!(self.socket.peek_from(buf))
}

Expand Down
4 changes: 2 additions & 2 deletions src/scheduler/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ use std::cell::RefMut;
use std::collections::BinaryHeap;
use std::mem;
use std::pin::Pin;
use std::task::LocalWaker;
use std::task::Waker;

use log::{debug, trace};
use slab::Slab;
Expand Down Expand Up @@ -145,7 +145,7 @@ impl<'s> AddingProcess<'s> {
/// Add a new inactive actor process to the scheduler.
pub fn add_actor<S, NA>(self, priority: Priority, supervisor: S,
new_actor: NA, actor: NA::Actor, mailbox: Shared<MailBox<NA::Message>>,
waker: LocalWaker
waker: Waker
)
where S: Supervisor<<NA::Actor as Actor>::Error, NA::Argument> + 'static,
NA: NewActor + 'static,
Expand Down
6 changes: 3 additions & 3 deletions src/scheduler/process/actor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@

use std::fmt;
use std::pin::Pin;
use std::task::{LocalWaker, Poll};
use std::task::{Waker, Poll};
use std::time::{Duration, Instant};

use log::{error, trace};
Expand All @@ -26,14 +26,14 @@ pub struct ActorProcess<S, NA: NewActor> {
/// is restarted.
inbox: Shared<MailBox<NA::Message>>,
/// Waker used in the futures context.
waker: LocalWaker,
waker: Waker,
}

impl<S, NA: NewActor> ActorProcess<S, NA> {
/// Create a new `ActorProcess`.
pub(crate) const fn new(id: ProcessId, priority: Priority, supervisor: S,
new_actor: NA, actor: NA::Actor, inbox: Shared<MailBox<NA::Message>>,
waker: LocalWaker
waker: Waker
) -> ActorProcess<S, NA> {
ActorProcess {
id,
Expand Down
12 changes: 6 additions & 6 deletions src/timer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@

use std::future::Future;
use std::pin::Pin;
use std::task::{LocalWaker, Poll};
use std::task::{Waker, Poll};
use std::time::{Duration, Instant};

use futures_core::stream::{FusedStream, Stream};
Expand Down Expand Up @@ -86,7 +86,7 @@ impl Timer {
impl Future for Timer {
type Output = DeadlinePassed;

fn poll(self: Pin<&mut Self>, _waker: &LocalWaker) -> Poll<Self::Output> {
fn poll(self: Pin<&mut Self>, _waker: &Waker) -> Poll<Self::Output> {
if self.deadline <= Instant::now() {
Poll::Ready(DeadlinePassed)
} else {
Expand All @@ -110,7 +110,7 @@ impl Future for Timer {
///
/// # use std::future::Future;
/// # use std::pin::Pin;
/// # use std::task::{LocalWaker, Poll};
/// # use std::task::{Waker, Poll};
/// use std::time::Duration;
///
/// use heph::actor::Context;
Expand All @@ -120,7 +120,7 @@ impl Future for Timer {
/// #
/// # impl Future for OtherFuture {
/// # type Output = ();
/// # fn poll(self: Pin<&mut Self>, _waker: &LocalWaker) -> Poll<Self::Output> {
/// # fn poll(self: Pin<&mut Self>, _waker: &Waker) -> Poll<Self::Output> {
/// # Poll::Pending
/// # }
/// # }
Expand Down Expand Up @@ -176,7 +176,7 @@ impl<Fut> Future for Deadline<Fut>
{
type Output = Result<Fut::Output, DeadlinePassed>;

fn poll(self: Pin<&mut Self>, waker: &LocalWaker) -> Poll<Self::Output> {
fn poll(self: Pin<&mut Self>, waker: &Waker) -> Poll<Self::Output> {
if self.deadline <= Instant::now() {
Poll::Ready(Err(DeadlinePassed))
} else {
Expand Down Expand Up @@ -261,7 +261,7 @@ impl Interval {
impl Stream for Interval {
type Item = DeadlinePassed;

fn poll_next(self: Pin<&mut Self>, _waker: &LocalWaker) -> Poll<Option<Self::Item>> {
fn poll_next(self: Pin<&mut Self>, _waker: &Waker) -> Poll<Option<Self::Item>> {
if self.deadline <= Instant::now() {
// Determine the next deadline.
let next_deadline = Instant::now() + self.interval;
Expand Down
Loading

0 comments on commit 375ef94

Please sign in to comment.