Skip to content

Commit

Permalink
Use futures_channel for faster and simpler code
Browse files Browse the repository at this point in the history
  • Loading branch information
richard-uk1 committed Dec 5, 2019
1 parent dffd31a commit 3094f1c
Showing 1 changed file with 22 additions and 69 deletions.
91 changes: 22 additions & 69 deletions crates/timers/src/future.rs
Original file line number Diff line number Diff line change
@@ -1,16 +1,12 @@
//! `Future`- and `Stream`-backed timers APIs.
use super::sys::*;
use crate::callback::Timeout;
use crate::callback::{Timeout, Interval};

use futures_channel::mpsc;
use futures_channel::{oneshot, mpsc};
use futures_core::stream::Stream;
use std::future::Future;
use std::pin::Pin;
use std::task::{Poll, Context, Waker};
use std::sync::{Arc, Mutex};
use wasm_bindgen::JsCast;
use wasm_bindgen::prelude::*;
use std::task::{Poll, Context};

/// A scheduled timeout as a `Future`.
///
Expand Down Expand Up @@ -45,15 +41,7 @@ use wasm_bindgen::prelude::*;
#[must_use = "futures do nothing unless polled or spawned"]
pub struct TimeoutFuture {
inner: Timeout,
state: Arc<Mutex<TimeoutFutureState>>,
}

/// A state machine for the timeout future.
#[derive(Debug)]
enum TimeoutFutureState {
Init,
Polled(Waker),
Complete,
rx: oneshot::Receiver<()>,
}

impl TimeoutFuture {
Expand All @@ -75,37 +63,23 @@ impl TimeoutFuture {
/// });
/// ```
pub fn new(millis: u32) -> TimeoutFuture {
let state = Arc::new(Mutex::new(TimeoutFutureState::Init));
let state_ref = Arc::downgrade(&state);
let (tx, rx) = oneshot::channel();
let inner = Timeout::new(millis, move || {
let state = match state_ref.upgrade() {
Some(s) => s,
None => return
};
let mut state = state.lock().expect("mutex should not be poisoned");
match &*state {
TimeoutFutureState::Polled(waker) => {
waker.wake_by_ref();
}
_ => ()
}
(*state) = TimeoutFutureState::Complete;
// if the receiver was dropped we do nothing.
let _ = tx.send(());
});
TimeoutFuture { inner, state }
TimeoutFuture { inner, rx }
}
}

impl Future for TimeoutFuture {
type Output = ();

fn poll(self: Pin<&mut Self>, cx: &mut Context) -> Poll<Self::Output> {
let mut state = self.state.lock().unwrap();
match *state {
TimeoutFutureState::Init | TimeoutFutureState::Polled(_) => {
(*state) = TimeoutFutureState::Polled(cx.waker().clone());
Poll::Pending
}
TimeoutFutureState::Complete => Poll::Ready(()),
fn poll(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<Self::Output> {
match Future::poll(Pin::new(&mut self.rx), cx) {
Poll::Pending => Poll::Pending,
Poll::Ready(Ok(t)) => Poll::Ready(t),
Poll::Ready(Err(e)) => panic!("{}", e),
}
}
}
Expand All @@ -120,10 +94,8 @@ impl Future for TimeoutFuture {
#[derive(Debug)]
#[must_use = "streams do nothing unless polled or spawned"]
pub struct IntervalStream {
millis: u32,
id: Option<i32>,
closure: Closure<dyn FnMut()>,
inner: mpsc::UnboundedReceiver<()>,
receiver: mpsc::UnboundedReceiver<()>,
inner: Interval,
}

impl IntervalStream {
Expand All @@ -136,8 +108,8 @@ impl IntervalStream {
/// # Example
///
/// ```no_run
/// use gloo_timers::future::IntervalStream;
/// use futures_util::stream::StreamExt;
/// use gloo_timers::future::IntervalStream;
/// use wasm_bindgen_futures::spawn_local;
///
/// spawn_local(async {
Expand All @@ -148,38 +120,19 @@ impl IntervalStream {
/// ```
pub fn new(millis: u32) -> IntervalStream {
let (sender, receiver) = mpsc::unbounded();
let closure = Closure::wrap(Box::new(move || {
sender.unbounded_send(()).unwrap();
}) as Box<dyn FnMut()>);

IntervalStream {
millis,
id: None,
closure,
inner: receiver,
}
}
}
let inner = Interval::new(millis, move || {
// if the receiver was dropped we do nothing.
let _ = sender.unbounded_send(());
});

impl Drop for IntervalStream {
fn drop(&mut self) {
if let Some(id) = self.id {
clear_interval(id);
}
IntervalStream { receiver, inner }
}
}

impl Stream for IntervalStream {
type Item = ();

fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<Option<Self::Item>> {
if self.id.is_none() {
self.id = Some(set_interval(
self.closure.as_ref().unchecked_ref::<js_sys::Function>(),
self.millis as i32,
));
}

Pin::new(&mut self.inner).poll_next(cx)
Stream::poll_next(Pin::new(&mut self.receiver), cx)
}
}

0 comments on commit 3094f1c

Please sign in to comment.