Skip to content

Commit

Permalink
futures: add support for std::futures::Future and futures 0.3
Browse files Browse the repository at this point in the history
- To support async functions, the inner future must be boxed.
- If someone can figure out how to support !Unpin futures, please do so.
  • Loading branch information
mbilker committed Apr 10, 2019
1 parent f135737 commit ac50c45
Show file tree
Hide file tree
Showing 2 changed files with 36 additions and 11 deletions.
3 changes: 2 additions & 1 deletion tokio-trace-futures/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,8 @@ version = "0.0.1"
authors = ["Eliza Weisman <eliza@buoyant.io>"]

[features]
default = ["with-tokio"]
default = ["with-std-future", "with-tokio"]
with-std-future = []
with-tokio = ["tokio"]

[dependencies]
Expand Down
44 changes: 34 additions & 10 deletions tokio-trace-futures/src/lib.rs
Original file line number Diff line number Diff line change
@@ -1,10 +1,18 @@
#![cfg_attr(feature = "with-std-future", feature(futures_api))]

extern crate futures;
#[cfg(feature = "with-tokio")]
extern crate tokio;
#[cfg_attr(test, macro_use)]
extern crate tokio_trace;

use futures::{Future, Poll, Sink, StartSend, Stream};
#[cfg(feature = "with-std-future")]
use std::{
pin::Pin,
task::Waker,
};

use futures::{Sink, StartSend, Stream};
use tokio_trace::{dispatcher, Dispatch, Span};

pub mod executor;
Expand All @@ -14,6 +22,10 @@ pub trait Instrument: Sized {
fn instrument(self, span: Span) -> Instrumented<Self> {
Instrumented { inner: self, span }
}

fn boxed_instrument(self, span: Span) -> Instrumented<Pin<Box<Self>>> {
Instrumented { inner: Box::pin(self), span }
}
}

pub trait WithSubscriber: Sized {
Expand Down Expand Up @@ -42,11 +54,23 @@ pub struct WithDispatch<T> {

impl<T: Sized> Instrument for T {}

impl<T: Future> Future for Instrumented<T> {
#[cfg(feature = "with-std-future")]
impl<P: std::future::Future> std::future::Future for Instrumented<Pin<Box<P>>> {
type Output = P::Output;

fn poll(self: Pin<&mut Self>, lw: &Waker) -> std::task::Poll<Self::Output> {
let this = self.get_mut();
let span = &mut this.span;
let inner = this.inner.as_mut();
span.enter(|| P::poll(inner, lw))
}
}

impl<T: futures::Future> futures::Future for Instrumented<T> {
type Item = T::Item;
type Error = T::Error;

fn poll(&mut self) -> Poll<Self::Item, Self::Error> {
fn poll(&mut self) -> futures::Poll<Self::Item, Self::Error> {
let span = &mut self.span;
let inner = &mut self.inner;
span.enter(|| inner.poll())
Expand All @@ -57,7 +81,7 @@ impl<T: Stream> Stream for Instrumented<T> {
type Item = T::Item;
type Error = T::Error;

fn poll(&mut self) -> Poll<Option<Self::Item>, Self::Error> {
fn poll(&mut self) -> futures::Poll<Option<Self::Item>, Self::Error> {
let span = &mut self.span;
let inner = &mut self.inner;
span.enter(|| inner.poll())
Expand All @@ -74,7 +98,7 @@ impl<T: Sink> Sink for Instrumented<T> {
span.enter(|| inner.start_send(item))
}

fn poll_complete(&mut self) -> Poll<(), Self::SinkError> {
fn poll_complete(&mut self) -> futures::Poll<(), Self::SinkError> {
let span = &mut self.span;
let inner = &mut self.inner;
span.enter(|| inner.poll_complete())
Expand Down Expand Up @@ -102,11 +126,11 @@ impl<T> Instrumented<T> {

impl<T: Sized> WithSubscriber for T {}

impl<T: Future> Future for WithDispatch<T> {
impl<T: futures::Future> futures::Future for WithDispatch<T> {
type Item = T::Item;
type Error = T::Error;

fn poll(&mut self) -> Poll<Self::Item, Self::Error> {
fn poll(&mut self) -> futures::Poll<Self::Item, Self::Error> {
let inner = &mut self.inner;
dispatcher::with_default(&self.dispatch, || inner.poll())
}
Expand Down Expand Up @@ -139,7 +163,7 @@ mod tests {
extern crate tokio;

use super::{test_support::*, *};
use futures::{future, stream, task, Async};
use futures::{Async, Future, future, stream, task};
use tokio_trace::{subscriber::with_default, Level};

struct PollN<T, E> {
Expand All @@ -148,10 +172,10 @@ mod tests {
polls: usize,
}

impl<T, E> Future for PollN<T, E> {
impl<T, E> futures::Future for PollN<T, E> {
type Item = T;
type Error = E;
fn poll(&mut self) -> Poll<Self::Item, Self::Error> {
fn poll(&mut self) -> futures::Poll<Self::Item, Self::Error> {
self.polls += 1;
if self.polls == self.finish_at {
self.and_return
Expand Down

0 comments on commit ac50c45

Please sign in to comment.