Skip to content

Commit

Permalink
Merge pull request #252 from alexrudy/feature/client/transport-dyn
Browse files Browse the repository at this point in the history
feat: Transport trait is dyn-compatible
  • Loading branch information
alexrudy authored Dec 31, 2024
2 parents 76633ad + c31e0d0 commit 074dc73
Show file tree
Hide file tree
Showing 4 changed files with 40 additions and 26 deletions.
5 changes: 2 additions & 3 deletions examples/single_threaded.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,6 @@ use hyperdriver::info::HasConnectionInfo;
use hyperdriver::server::Accept;
use hyperdriver::service::{make_service_fn, RequestExecutor};
use hyperdriver::stream::TcpStream;
use hyperdriver::IntoRequestParts;
use pin_project::pin_project;
use tokio::io::{self, AsyncWriteExt};
use tokio::net::TcpListener;
Expand Down Expand Up @@ -392,8 +391,8 @@ impl Transport for TransportNotSend {

type Future = Pin<Box<dyn Future<Output = Result<Self::IO, Self::Error>> + Send>>;

fn connect<R: IntoRequestParts>(&mut self, req: R) -> <Self as Transport>::Future {
self.tcp.connect(req.into_request_parts()).boxed()
fn connect(&mut self, req: http::request::Parts) -> <Self as Transport>::Future {
self.tcp.connect(req).boxed()
}

fn poll_ready(
Expand Down
2 changes: 1 addition & 1 deletion src/client/conn/connector.rs
Original file line number Diff line number Diff line change
Expand Up @@ -422,7 +422,7 @@ where
+ Send
+ Sync
+ 'static,
T: Transport + Send + 'static,
T: Transport + Clone + Send + 'static,
T::IO: Unpin,
<<T as Transport>::IO as HasConnectionInfo>::Addr: Send,
S: tower::Service<ExecuteRequest<C, BIn>, Response = http::Response<BOut>>
Expand Down
53 changes: 34 additions & 19 deletions src/client/conn/transport/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ pub mod tls;
/// To implement a transport stream, implement a [`tower::Service`] which accepts a URI and returns
/// an IO stream, which must be compatible with a [`super::Protocol`]. For example, HTTP protocols
/// require an IO stream which implements [`tokio::io::AsyncRead`] and [`tokio::io::AsyncWrite`].
pub trait Transport: Clone + Send {
pub trait Transport: Send {
/// The type of IO stream used by this transport
type IO: HasConnectionInfo + Send + 'static;

Expand All @@ -54,9 +54,7 @@ pub trait Transport: Clone + Send {
type Future: Future<Output = Result<Self::IO, <Self as Transport>::Error>> + Send + 'static;

/// Connect to a remote server and return a stream.
fn connect<R>(&mut self, req: R) -> <Self as Transport>::Future
where
R: IntoRequestParts;
fn connect(&mut self, req: http::request::Parts) -> <Self as Transport>::Future;

/// Poll the transport to see if it is ready to accept a new connection.
fn poll_ready(
Expand All @@ -78,10 +76,10 @@ where
type Error = T::Error;
type Future = T::Future;

fn connect<R>(&mut self, req: R) -> <Self as Service<http::request::Parts>>::Future
where
R: IntoRequestParts,
{
fn connect(
&mut self,
req: http::request::Parts,
) -> <Self as Service<http::request::Parts>>::Future {
self.call(req.into_request_parts())
}

Expand Down Expand Up @@ -113,10 +111,7 @@ where
type Error = T::Error;
type Future = T::Future;

fn connect<R>(&mut self, req: R) -> <Self as Transport>::Future
where
R: IntoRequestParts,
{
fn connect(&mut self, req: http::request::Parts) -> <Self as Transport>::Future {
let parts = req.into_request_parts();
self.0.call(parts.uri)
}
Expand All @@ -131,12 +126,21 @@ where

/// Extension trait for Transports to provide additional configuration options.
pub trait TransportExt: Transport {
/// Connect to a remote server and return a stream.
fn connect_with<R>(&mut self, req: R) -> <Self as Transport>::Future
where
R: IntoRequestParts,
{
self.connect(req.into_request_parts())
}

#[cfg(feature = "stream")]
/// Wrap the transport in a converter which produces a Stream
fn into_stream(self) -> IntoStream<Self>
where
Self::IO: Into<Stream> + AsyncRead + AsyncWrite + Unpin + Send + 'static,
<<Self as Transport>::IO as HasConnectionInfo>::Addr: Into<BraidAddr>,
Self: Sized,
{
IntoStream::new(self)
}
Expand Down Expand Up @@ -489,6 +493,7 @@ mod oneshot {
use crate::IntoRequestParts;

use super::Transport;
use super::TransportExt;

#[pin_project::pin_project(project=OneshotStateProj)]
enum OneshotState<T: Transport, R> {
Expand Down Expand Up @@ -542,7 +547,7 @@ mod oneshot {
match this.state.as_mut().project() {
OneshotStateProj::Pending { transport, request } => {
ready!(transport.poll_ready(cx))?;
let fut = transport.connect(request.take().unwrap());
let fut = transport.connect_with(request.take().unwrap());
this.state.set(OneshotState::Ready(fut));
}
OneshotStateProj::Ready(fut) => {
Expand All @@ -554,15 +559,25 @@ mod oneshot {
}
}

#[cfg(all(test, feature = "stream"))]
#[cfg(test)]
mod tests {
use super::*;

use crate::{info::HasTlsConnectionInfo, stream::tcp::TcpStream};
use static_assertions::assert_impl_all;
use std::future::Ready;

assert_impl_all!(Stream: HasTlsConnectionInfo, HasConnectionInfo);
assert_impl_all!(Stream: Send, Sync, Unpin);
use static_assertions::assert_obj_safe;
assert_obj_safe!(Transport<IO=(), Error=(), Future=Ready<Result<(),()>>>);

assert_impl_all!(TcpStream: HasConnectionInfo);
#[cfg(feature = "stream")]
mod stream {
use super::*;

use crate::{info::HasTlsConnectionInfo, stream::tcp::TcpStream};
use static_assertions::assert_impl_all;

assert_impl_all!(Stream: HasTlsConnectionInfo, HasConnectionInfo);
assert_impl_all!(Stream: Send, Sync, Unpin);

assert_impl_all!(TcpStream: HasConnectionInfo);
}
}
6 changes: 3 additions & 3 deletions src/client/pool/service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -219,7 +219,7 @@ where

impl<T, P, S, BIn, K> ConnectionPoolService<T, P, S, BIn, K>
where
T: Transport,
T: Transport + Clone,
T::IO: Unpin,
P: Protocol<T::IO, BIn, Error = ConnectionError> + Clone + Send + Sync + 'static,
<P as Protocol<T::IO, BIn>>::Connection: PoolableConnection<BIn> + Send + 'static,
Expand Down Expand Up @@ -258,7 +258,7 @@ where
+ Send
+ Sync
+ 'static,
T: Transport + Send + 'static,
T: Transport + Clone + Send + 'static,
T::IO: PoolableStream + Unpin,
<<T as Transport>::IO as HasConnectionInfo>::Addr: Send,
S: tower::Service<ExecuteRequest<Pooled<C, BIn>, BIn>, Response = http::Response<BOut>>
Expand Down Expand Up @@ -302,7 +302,7 @@ where
+ Send
+ Sync
+ 'static,
T: Transport + 'static,
T: Transport + Clone + 'static,
T::IO: PoolableStream + Unpin,
S: tower::Service<ExecuteRequest<Pooled<C, BIn>, BIn>, Response = http::Response<BOut>>
+ Clone
Expand Down

0 comments on commit 074dc73

Please sign in to comment.