Skip to content

Commit

Permalink
feat: Add TokioTimer to rt::tokio module (#73)
Browse files Browse the repository at this point in the history
  • Loading branch information
daxhuiberts authored Dec 9, 2023
1 parent 1dd414e commit 7af1744
Show file tree
Hide file tree
Showing 2 changed files with 63 additions and 2 deletions.
2 changes: 1 addition & 1 deletion src/rt/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,4 +4,4 @@
pub mod tokio;

#[cfg(feature = "tokio")]
pub use self::tokio::{TokioExecutor, TokioIo};
pub use self::tokio::{TokioExecutor, TokioIo, TokioTimer};
63 changes: 62 additions & 1 deletion src/rt/tokio.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,9 +4,10 @@ use std::{
future::Future,
pin::Pin,
task::{Context, Poll},
time::{Duration, Instant},
};

use hyper::rt::Executor;
use hyper::rt::{Executor, Sleep, Timer};
use pin_project_lite::pin_project;

/// Future executor that utilises `tokio` threads.
Expand All @@ -24,6 +25,21 @@ pin_project! {
}
}

/// A Timer that uses the tokio runtime.
#[non_exhaustive]
#[derive(Default, Clone, Debug)]
pub struct TokioTimer;

// Use TokioSleep to get tokio::time::Sleep to implement Unpin.
// see https://docs.rs/tokio/latest/tokio/time/struct.Sleep.html
pin_project! {
#[derive(Debug)]
struct TokioSleep {
#[pin]
inner: tokio::time::Sleep,
}
}

// ===== impl TokioExecutor =====

impl<Fut> Executor<Fut> for TokioExecutor
Expand Down Expand Up @@ -190,6 +206,51 @@ where
}
}

// ==== impl TokioTimer =====

impl Timer for TokioTimer {
fn sleep(&self, duration: Duration) -> Pin<Box<dyn Sleep>> {
Box::pin(TokioSleep {
inner: tokio::time::sleep(duration),
})
}

fn sleep_until(&self, deadline: Instant) -> Pin<Box<dyn Sleep>> {
Box::pin(TokioSleep {
inner: tokio::time::sleep_until(deadline.into()),
})
}

fn reset(&self, sleep: &mut Pin<Box<dyn Sleep>>, new_deadline: Instant) {
if let Some(sleep) = sleep.as_mut().downcast_mut_pin::<TokioSleep>() {
sleep.reset(new_deadline)
}
}
}

impl TokioTimer {
/// Create a new TokioTimer
pub fn new() -> Self {
Self {}
}
}

impl Future for TokioSleep {
type Output = ();

fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
self.project().inner.poll(cx)
}
}

impl Sleep for TokioSleep {}

impl TokioSleep {
fn reset(self: Pin<&mut Self>, deadline: Instant) {
self.project().inner.as_mut().reset(deadline.into());
}
}

#[cfg(test)]
mod tests {
use crate::rt::TokioExecutor;
Expand Down

0 comments on commit 7af1744

Please sign in to comment.