Skip to content

Commit

Permalink
Replace tokio-timer with futures-timer for higher resolution
Browse files Browse the repository at this point in the history
  • Loading branch information
doyoubi committed Nov 2, 2019
1 parent aa92827 commit b343e90
Show file tree
Hide file tree
Showing 2 changed files with 12 additions and 21 deletions.
3 changes: 1 addition & 2 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -11,8 +11,7 @@ repository = "https://github.com/mre/tokio-batch"
futures-preview = { version = "0.3.0-alpha.19", features = ["compat"] }
pin-utils = "0.1.0-alpha.4"
tokio = "0.1.22"
tokio-timer = "0.2.11"
futures01 = { package = "futures", version = "0.1.29" }
futures-timer = "1.0.2"

[dev-dependencies.doc-comment]
version = "0.3"
30 changes: 11 additions & 19 deletions src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,16 +9,15 @@ doctest!("../README.md");
use core::mem;
use core::pin::Pin;
use futures::stream::{Fuse, FusedStream, Stream};
use futures::Future;
use futures::task::{Context, Poll};
use futures::StreamExt;
#[cfg(feature = "sink")]
use futures_sink::Sink;
use pin_utils::{unsafe_pinned, unsafe_unpinned};

use futures01::Async;
use std::time::{Duration, Instant};
use tokio::prelude::Future;
use tokio::timer::Delay;
use std::time::Duration;
use futures_timer::Delay;

pub trait ChunksTimeoutStreamExt: Stream {
fn chunks_timeout(self, capacity: usize, duration: Duration) -> ChunksTimeout<Self>
Expand Down Expand Up @@ -48,7 +47,7 @@ where
St: Stream,
{
unsafe_unpinned!(items: Vec<St::Item>);
unsafe_unpinned!(clock: Option<Delay>);
unsafe_pinned!(clock: Option<Delay>);
unsafe_pinned!(stream: Fuse<St>);

pub fn new(stream: St, capacity: usize, duration: Duration) -> ChunksTimeout<St> {
Expand Down Expand Up @@ -114,7 +113,7 @@ impl<St: Stream> Stream for ChunksTimeout<St> {
Some(item) => {
if self.items.is_empty() {
*self.as_mut().clock() =
Some(Delay::new(Instant::now() + self.duration));
Some(Delay::new(self.duration));
}
self.as_mut().items().push(item);
if self.items.len() >= self.cap {
Expand Down Expand Up @@ -143,25 +142,20 @@ impl<St: Stream> Stream for ChunksTimeout<St> {
Poll::Pending => {}
}

match self.as_mut().clock().poll() {
Ok(Async::Ready(Some(()))) => {
match self.as_mut().clock().as_pin_mut().map(|clock| clock.poll(cx)) {
Some(Poll::Ready(())) => {
*self.as_mut().clock() = None;
return Poll::Ready(Some(self.as_mut().take()));
}
Ok(Async::Ready(None)) => {
Some(Poll::Pending) => {}
None => {
debug_assert!(
self.as_mut().items().is_empty(),
"Inner buffer is empty, but clock is available."
);
}
Ok(Async::NotReady) => {}
Err(_e) => {
if !self.as_mut().items().is_empty() {
*self.as_mut().clock() = None;
return Poll::Ready(Some(self.as_mut().take()));
}
}
}

return Poll::Pending;
}
}
Expand Down Expand Up @@ -198,7 +192,6 @@ where
#[cfg(test)]
mod tests {
use super::*;
use futures::compat::Future01CompatExt;
use futures::future;
use futures::{stream, FutureExt, StreamExt, TryFutureExt};
use std::iter;
Expand Down Expand Up @@ -266,8 +259,7 @@ mod tests {

let iter = vec![5].into_iter();
let stream1 = stream::iter(iter).then(move |n| {
Delay::new(Instant::now() + Duration::from_millis(300))
.compat()
Delay::new(Duration::from_millis(300))
.map(move |_| n)
});

Expand Down

0 comments on commit b343e90

Please sign in to comment.