Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Switch to pin-project-lite #2393

Merged
merged 2 commits into from
Jan 28, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ httparse = "1.0"
h2 = { version = "0.3", optional = true }
itoa = "0.4.1"
tracing = { version = "0.1", default-features = false, features = ["std"] }
pin-project = "1.0"
pin-project-lite = "0.2.4"
tower-service = "0.3"
tokio = { version = "1", features = ["sync"] }
want = "0.3"
Expand Down
6 changes: 2 additions & 4 deletions src/body/body.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,7 @@ use super::DecodedLength;
#[cfg(feature = "stream")]
use crate::common::sync_wrapper::SyncWrapper;
use crate::common::Future;
#[cfg(any(feature = "http1", feature = "http2"))]
#[cfg(feature = "client")]
#[cfg(all(feature = "client", any(feature = "http1", feature = "http2")))]
use crate::common::Never;
use crate::common::{task, watch, Pin, Poll};
#[cfg(all(feature = "http2", any(feature = "client", feature = "server")))]
Expand Down Expand Up @@ -74,8 +73,7 @@ struct Extra {
delayed_eof: Option<DelayEof>,
}

#[cfg(any(feature = "http1", feature = "http2"))]
#[cfg(feature = "client")]
#[cfg(all(feature = "client", any(feature = "http1", feature = "http2")))]
type DelayEofUntil = oneshot::Receiver<Never>;

enum DelayEof {
Expand Down
57 changes: 35 additions & 22 deletions src/client/conn.rs
Original file line number Diff line number Diff line change
Expand Up @@ -51,19 +51,21 @@ use std::fmt;
#[cfg(feature = "http2")]
use std::marker::PhantomData;
use std::sync::Arc;
#[cfg(feature = "runtime")]
#[cfg(feature = "http2")]
#[cfg(all(feature = "runtime", feature = "http2"))]
use std::time::Duration;

use bytes::Bytes;
use futures_util::future::{self, Either, FutureExt as _};
use pin_project::pin_project;
use pin_project_lite::pin_project;
use tokio::io::{AsyncRead, AsyncWrite};
use tower_service::Service;

use super::dispatch;
use crate::body::HttpBody;
use crate::common::{task, exec::{BoxSendFuture, Exec}, Future, Pin, Poll};
use crate::common::{
exec::{BoxSendFuture, Exec},
task, Future, Pin, Poll,
};
use crate::proto;
use crate::rt::Executor;
#[cfg(feature = "http1")]
Expand All @@ -73,15 +75,23 @@ use crate::{Body, Request, Response};
#[cfg(feature = "http1")]
type Http1Dispatcher<T, B, R> = proto::dispatch::Dispatcher<proto::dispatch::Client<B>, B, T, R>;

#[pin_project(project = ProtoClientProj)]
enum ProtoClient<T, B>
where
B: HttpBody,
{
#[cfg(feature = "http1")]
H1(#[pin] Http1Dispatcher<T, B, proto::h1::ClientTransaction>),
#[cfg(feature = "http2")]
H2(#[pin] proto::h2::ClientTask<B>, PhantomData<fn(T)>),
pin_project! {
#[project = ProtoClientProj]
enum ProtoClient<T, B>
where
B: HttpBody,
{
#[cfg(feature = "http1")]
H1 {
#[pin]
h1: Http1Dispatcher<T, B, proto::h1::ClientTransaction>,
},
#[cfg(feature = "http2")]
H2 {
#[pin]
h2: proto::h2::ClientTask<B>, _phantom: PhantomData<fn(T)>,
},
}
}

/// Returns a handshake future over some IO.
Expand Down Expand Up @@ -398,7 +408,7 @@ where
pub fn into_parts(self) -> Parts<T> {
match self.inner.expect("already upgraded") {
#[cfg(feature = "http1")]
ProtoClient::H1(h1) => {
ProtoClient::H1 { h1 } => {
let (io, read_buf, _) = h1.into_inner();
Parts {
io,
Expand All @@ -407,7 +417,7 @@ where
}
}
#[cfg(feature = "http2")]
ProtoClient::H2(..) => {
ProtoClient::H2 { .. } => {
panic!("http2 cannot into_inner");
}
}
Expand All @@ -427,9 +437,9 @@ where
pub fn poll_without_shutdown(&mut self, cx: &mut task::Context<'_>) -> Poll<crate::Result<()>> {
match *self.inner.as_mut().expect("already upgraded") {
#[cfg(feature = "http1")]
ProtoClient::H1(ref mut h1) => h1.poll_without_shutdown(cx),
ProtoClient::H1 { ref mut h1 } => h1.poll_without_shutdown(cx),
#[cfg(feature = "http2")]
ProtoClient::H2(ref mut h2, _) => Pin::new(h2).poll(cx).map_ok(|_| ()),
ProtoClient::H2 { ref mut h2, .. } => Pin::new(h2).poll(cx).map_ok(|_| ()),
}
}

Expand Down Expand Up @@ -458,7 +468,7 @@ where
proto::Dispatched::Shutdown => Poll::Ready(Ok(())),
#[cfg(feature = "http1")]
proto::Dispatched::Upgrade(pending) => match self.inner.take() {
Some(ProtoClient::H1(h1)) => {
Some(ProtoClient::H1 { h1 }) => {
let (io, buf, _) = h1.into_inner();
pending.fulfill(Upgraded::new(io, buf));
Poll::Ready(Ok(()))
Expand Down Expand Up @@ -705,14 +715,17 @@ impl Builder {
}
let cd = proto::h1::dispatch::Client::new(rx);
let dispatch = proto::h1::Dispatcher::new(cd, conn);
ProtoClient::H1(dispatch)
ProtoClient::H1 { h1: dispatch }
}
#[cfg(feature = "http2")]
Proto::Http2 => {
let h2 =
proto::h2::client::handshake(io, rx, &opts.h2_builder, opts.exec.clone())
.await?;
ProtoClient::H2(h2, PhantomData)
ProtoClient::H2 {
h2,
_phantom: PhantomData,
}
}
};

Expand Down Expand Up @@ -766,9 +779,9 @@ where
fn poll(self: Pin<&mut Self>, cx: &mut task::Context<'_>) -> Poll<Self::Output> {
match self.project() {
#[cfg(feature = "http1")]
ProtoClientProj::H1(c) => c.poll(cx),
ProtoClientProj::H1 { h1 } => h1.poll(cx),
#[cfg(feature = "http2")]
ProtoClientProj::H2(c, _) => c.poll(cx),
ProtoClientProj::H2 { h2, .. } => h2.poll(cx),
}
}
}
Expand Down
27 changes: 14 additions & 13 deletions src/client/connect/http.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ use std::time::Duration;

use futures_util::future::Either;
use http::uri::{Scheme, Uri};
use pin_project::pin_project;
use pin_project_lite::pin_project;
use tokio::net::{TcpSocket, TcpStream};
use tokio::time::Sleep;

Expand Down Expand Up @@ -373,18 +373,19 @@ impl HttpInfo {
}
}

// Not publicly exported (so missing_docs doesn't trigger).
//
// We return this `Future` instead of the `Pin<Box<dyn Future>>` directly
// so that users don't rely on it fitting in a `Pin<Box<dyn Future>>` slot
// (and thus we can change the type in the future).
#[must_use = "futures do nothing unless polled"]
#[pin_project]
#[allow(missing_debug_implementations)]
pub struct HttpConnecting<R> {
#[pin]
fut: BoxConnecting,
_marker: PhantomData<R>,
pin_project! {
// Not publicly exported (so missing_docs doesn't trigger).
//
// We return this `Future` instead of the `Pin<Box<dyn Future>>` directly
// so that users don't rely on it fitting in a `Pin<Box<dyn Future>>` slot
// (and thus we can change the type in the future).
#[must_use = "futures do nothing unless polled"]
#[allow(missing_debug_implementations)]
pub struct HttpConnecting<R> {
#[pin]
fut: BoxConnecting,
_marker: PhantomData<R>,
}
}

type ConnectResult = Result<TcpStream, ConnectError>;
Expand Down
3 changes: 1 addition & 2 deletions src/client/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -52,8 +52,7 @@
pub use self::connect::HttpConnector;

pub mod connect;
#[cfg(test)]
#[cfg(feature = "runtime")]
#[cfg(all(test, feature = "runtime"))]
mod tests;

cfg_feature! {
Expand Down
25 changes: 13 additions & 12 deletions src/client/pool.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ use futures_channel::oneshot;
use tokio::time::{Duration, Instant, Interval};

use super::client::Ver;
use crate::common::{task, exec::Exec, Future, Pin, Poll, Unpin};
use crate::common::{exec::Exec, task, Future, Pin, Poll, Unpin};

// FIXME: allow() required due to `impl Trait` leaking types to this lint
#[allow(missing_debug_implementations)]
Expand Down Expand Up @@ -714,16 +714,17 @@ impl Expiration {
}

#[cfg(feature = "runtime")]
#[pin_project::pin_project]
struct IdleTask<T> {
#[pin]
interval: Interval,
pool: WeakOpt<Mutex<PoolInner<T>>>,
// This allows the IdleTask to be notified as soon as the entire
// Pool is fully dropped, and shutdown. This channel is never sent on,
// but Err(Canceled) will be received when the Pool is dropped.
#[pin]
pool_drop_notifier: oneshot::Receiver<crate::common::Never>,
pin_project_lite::pin_project! {
struct IdleTask<T> {
#[pin]
interval: Interval,
pool: WeakOpt<Mutex<PoolInner<T>>>,
// This allows the IdleTask to be notified as soon as the entire
// Pool is fully dropped, and shutdown. This channel is never sent on,
// but Err(Canceled) will be received when the Pool is dropped.
#[pin]
pool_drop_notifier: oneshot::Receiver<crate::common::Never>,
}
}

#[cfg(feature = "runtime")]
Expand Down Expand Up @@ -776,7 +777,7 @@ mod tests {
use std::time::Duration;

use super::{Connecting, Key, Pool, Poolable, Reservation, WeakOpt};
use crate::common::{task, exec::Exec, Future, Pin};
use crate::common::{exec::Exec, task, Future, Pin};

/// Test unique reservations.
#[derive(Debug, PartialEq, Eq)]
Expand Down
19 changes: 10 additions & 9 deletions src/common/drain.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
use std::mem;

use pin_project::pin_project;
use pin_project_lite::pin_project;
use tokio::sync::watch;

use super::{task, Future, Pin, Poll};
Expand All @@ -21,14 +21,15 @@ pub(crate) struct Watch {
rx: watch::Receiver<()>,
}

#[allow(missing_debug_implementations)]
#[pin_project]
pub struct Watching<F, FN> {
#[pin]
future: F,
state: State<FN>,
watch: Pin<Box<dyn Future<Output = ()> + Send + Sync>>,
_rx: watch::Receiver<()>,
pin_project! {
#[allow(missing_debug_implementations)]
pub struct Watching<F, FN> {
#[pin]
future: F,
state: State<FN>,
watch: Pin<Box<dyn Future<Output = ()> + Send + Sync>>,
_rx: watch::Receiver<()>,
}
}

enum State<F> {
Expand Down
3 changes: 1 addition & 2 deletions src/common/exec.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,7 @@ use std::sync::Arc;

#[cfg(feature = "server")]
use crate::body::{Body, HttpBody};
#[cfg(feature = "http2")]
#[cfg(feature = "server")]
#[cfg(all(feature = "http2", feature = "server"))]
use crate::proto::h2::server::H2Stream;
use crate::rt::Executor;
#[cfg(feature = "server")]
Expand Down
12 changes: 4 additions & 8 deletions src/common/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,26 +8,22 @@ macro_rules! ready {
}

pub(crate) mod buf;
#[cfg(any(feature = "http1", feature = "http2"))]
#[cfg(feature = "server")]
#[cfg(all(feature = "server", any(feature = "http1", feature = "http2")))]
pub(crate) mod date;
#[cfg(any(feature = "http1", feature = "http2"))]
#[cfg(feature = "server")]
#[cfg(all(feature = "server", any(feature = "http1", feature = "http2")))]
pub(crate) mod drain;
#[cfg(any(feature = "http1", feature = "http2"))]
pub(crate) mod exec;
pub(crate) mod io;
#[cfg(any(feature = "http1", feature = "http2"))]
#[cfg(feature = "client")]
#[cfg(all(feature = "client", any(feature = "http1", feature = "http2")))]
mod lazy;
mod never;
#[cfg(feature = "stream")]
pub(crate) mod sync_wrapper;
pub(crate) mod task;
pub(crate) mod watch;

#[cfg(any(feature = "http1", feature = "http2"))]
#[cfg(feature = "client")]
#[cfg(all(feature = "client", any(feature = "http1", feature = "http2")))]
pub(crate) use self::lazy::{lazy, Started as Lazy};
#[cfg(any(
feature = "client",
Expand Down
11 changes: 4 additions & 7 deletions src/headers.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,10 +2,9 @@
use bytes::BytesMut;
use http::header::CONTENT_LENGTH;
use http::header::{HeaderValue, ValueIter};
#[cfg(feature = "http2")]
#[cfg(feature = "client")]
use http::Method;
use http::HeaderMap;
#[cfg(all(feature = "http2", feature = "client"))]
use http::Method;

#[cfg(feature = "http1")]
pub(super) fn connection_keep_alive(value: &HeaderValue) -> bool {
Expand All @@ -29,8 +28,7 @@ fn connection_has(value: &HeaderValue, needle: &str) -> bool {
false
}

#[cfg(feature = "http1")]
#[cfg(feature = "server")]
#[cfg(all(feature = "http1", feature = "server"))]
pub(super) fn content_length_parse(value: &HeaderValue) -> Option<u64> {
value.to_str().ok().and_then(|s| s.parse().ok())
}
Expand Down Expand Up @@ -66,8 +64,7 @@ pub(super) fn content_length_parse_all_values(values: ValueIter<'_, HeaderValue>
}
}

#[cfg(feature = "http2")]
#[cfg(feature = "client")]
#[cfg(all(feature = "http2", feature = "client"))]
pub(super) fn method_has_defined_payload_semantics(method: &Method) -> bool {
match *method {
Method::GET | Method::HEAD | Method::DELETE | Method::CONNECT => false,
Expand Down
11 changes: 7 additions & 4 deletions src/proto/h1/dispatch.rs
Original file line number Diff line number Diff line change
Expand Up @@ -44,10 +44,13 @@ cfg_server! {
}

cfg_client! {
pub(crate) struct Client<B> {
callback: Option<crate::client::dispatch::Callback<Request<B>, http::Response<Body>>>,
rx: ClientRx<B>,
rx_closed: bool,
pin_project_lite::pin_project! {
pub(crate) struct Client<B> {
callback: Option<crate::client::dispatch::Callback<Request<B>, http::Response<Body>>>,
#[pin]
rx: ClientRx<B>,
rx_closed: bool,
}
}

type ClientRx<B> = crate::client::dispatch::Receiver<Request<B>, http::Response<Body>>;
Expand Down
Loading