Skip to content

Commit

Permalink
feat(client): add proxy::Tunnel legacy util
Browse files Browse the repository at this point in the history
  • Loading branch information
seanmonstar committed Jul 30, 2024
1 parent 7afb1ed commit 7237179
Show file tree
Hide file tree
Showing 6 changed files with 262 additions and 0 deletions.
2 changes: 2 additions & 0 deletions src/client/legacy/connect/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,8 @@ pub mod dns;
#[cfg(feature = "tokio")]
mod http;

pub mod proxy;

pub(crate) mod capture;
pub use capture::{capture_connection, CaptureConnection};

Expand Down
5 changes: 5 additions & 0 deletions src/client/legacy/connect/proxy/mod.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
//! Proxy helpers

mod tunnel;

pub use self::tunnel::Tunnel;
181 changes: 181 additions & 0 deletions src/client/legacy/connect/proxy/tunnel.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,181 @@
use std::future::Future;
use std::marker::{PhantomData, Unpin};
use std::pin::Pin;
use std::task::{self, Poll};

use http::{HeaderValue, Uri};
use hyper::rt::{Read, Write};
use pin_project_lite::pin_project;
use tower_service::Service;

/// Tunnel Proxy via HTTP CONNECT
#[derive(Debug)]
pub struct Tunnel<C> {
auth: Option<HeaderValue>,
inner: C,
proxy_dst: Uri,
}

#[derive(Debug)]
pub enum TunnelError<C> {
Inner(C),
Io(std::io::Error),
MissingHost,
ProxyAuthRequired,
ProxyHeadersTooLong,
TunnelUnexpectedEof,
TunnelUnsuccessful,
}

pin_project! {
// Not publicly exported (so missing_docs doesn't trigger).
//
// We return this `Future` instead of the `Pin<Box<dyn Future>>` directly
// so that users don't rely on it fitting in a `Pin<Box<dyn Future>>` slot
// (and thus we can change the type in the future).
#[must_use = "futures do nothing unless polled"]
#[allow(missing_debug_implementations)]
pub struct Tunneling<F, T, E> {
#[pin]
fut: BoxTunneling<T, E>,
_marker: PhantomData<F>,
}
}

type BoxTunneling<T, E> = Pin<Box<dyn Future<Output = Result<T, TunnelError<E>>> + Send>>;

impl<C> Tunnel<C> {
/// Create a new Tunnel service.
pub fn new(proxy_dst: Uri, connector: C) -> Self {
Self {
auth: None,
inner: connector,
proxy_dst,
}
}

/// Add `proxy-authorization` header value to the CONNECT request.
pub fn with_auth(mut self, mut auth: HeaderValue) -> Self {
// just in case the user forgot
auth.set_sensitive(true);
self.auth = Some(auth);
self
}
}

impl<C> Service<Uri> for Tunnel<C>
where
C: Service<Uri>,
C::Future: Send + 'static,
C::Response: Read + Write + Unpin + Send + 'static,
C::Error: Send + 'static,
{
type Response = C::Response;
type Error = TunnelError<C::Error>;
type Future = Tunneling<C::Future, C::Response, C::Error>;

fn poll_ready(&mut self, cx: &mut task::Context<'_>) -> Poll<Result<(), Self::Error>> {
futures_util::ready!(self.inner.poll_ready(cx)).map_err(TunnelError::Inner)?;
Poll::Ready(Ok(()))
}

fn call(&mut self, dst: Uri) -> Self::Future {
let connecting = self.inner.call(self.proxy_dst.clone());

Tunneling {
fut: Box::pin(async move {
let conn = connecting.await.map_err(TunnelError::Inner)?;
tunnel(
conn,
dst.host().ok_or(TunnelError::MissingHost)?,
dst.port().map(|p| p.as_u16()).unwrap_or(443),
None,
None,
)
.await
}),
_marker: PhantomData,
}
}
}

impl<F, T, E> Future for Tunneling<F, T, E>
where
F: Future<Output = Result<T, E>>,
{
type Output = Result<T, TunnelError<E>>;

fn poll(self: Pin<&mut Self>, cx: &mut task::Context<'_>) -> Poll<Self::Output> {
self.project().fut.poll(cx)
}
}

async fn tunnel<T, E>(
mut conn: T,
host: &str,
port: u16,
user_agent: Option<HeaderValue>,
auth: Option<HeaderValue>,
) -> Result<T, TunnelError<E>>
where
T: Read + Write + Unpin,
{
let mut buf = format!(
"\
CONNECT {host}:{port} HTTP/1.1\r\n\
Host: {host}:{port}\r\n\
"
)
.into_bytes();

// user-agent
if let Some(user_agent) = user_agent {
buf.extend_from_slice(b"User-Agent: ");
buf.extend_from_slice(user_agent.as_bytes());
buf.extend_from_slice(b"\r\n");
}

// proxy-authorization
if let Some(value) = auth {
//log::debug!("tunnel to {host}:{port} using basic auth");
buf.extend_from_slice(b"Proxy-Authorization: ");
buf.extend_from_slice(value.as_bytes());
buf.extend_from_slice(b"\r\n");
}

// headers end
buf.extend_from_slice(b"\r\n");

crate::rt::write_all(&mut conn, &buf)
.await
.map_err(TunnelError::Io)?;

let mut buf = [0; 8192];
let mut pos = 0;

loop {
let n = crate::rt::read(&mut conn, &mut buf[pos..])
.await
.map_err(TunnelError::Io)?;

if n == 0 {
return Err(TunnelError::TunnelUnexpectedEof);
}
pos += n;

let recvd = &buf[..pos];
if recvd.starts_with(b"HTTP/1.1 200") || recvd.starts_with(b"HTTP/1.0 200") {
if recvd.ends_with(b"\r\n\r\n") {
return Ok(conn);
}
if pos == buf.len() {
return Err(TunnelError::ProxyHeadersTooLong);
}
// else read more
} else if recvd.starts_with(b"HTTP/1.1 407") {
return Err(TunnelError::ProxyAuthRequired);
} else {
return Err(TunnelError::TunnelUnsuccessful);
}
}
}
33 changes: 33 additions & 0 deletions src/rt/io.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
use std::marker::Unpin;
use std::pin::Pin;
use std::task::Poll;

use futures_util::future;
use futures_util::ready;
use hyper::rt::{Read, ReadBuf, Write};

pub(crate) async fn read<T>(io: &mut T, buf: &mut [u8]) -> Result<usize, std::io::Error>
where
T: Read + Unpin,
{
future::poll_fn(move |cx| {
let mut buf = ReadBuf::new(buf);
ready!(Pin::new(&mut *io).poll_read(cx, buf.unfilled()))?;
Poll::Ready(Ok(buf.filled().len()))
})
.await
}

pub(crate) async fn write_all<T>(io: &mut T, buf: &[u8]) -> Result<(), std::io::Error>
where
T: Write + Unpin,
{
let mut n = 0;
future::poll_fn(move |cx| {
while n < buf.len() {
n += ready!(Pin::new(&mut *io).poll_write(cx, &buf[n..])?);
}
Poll::Ready(Ok(()))
})
.await
}
5 changes: 5 additions & 0 deletions src/rt/mod.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,10 @@
//! Runtime utilities

#[cfg(feature = "client-legacy")]
mod io;
#[cfg(feature = "client-legacy")]
pub(crate) use self::io::{read, write_all};

#[cfg(feature = "tokio")]
pub mod tokio;

Expand Down
36 changes: 36 additions & 0 deletions tests/proxy.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
use tokio::io::{AsyncReadExt, AsyncWriteExt};
use tokio::net::TcpListener;
use tower_service::Service;

use hyper_util::client::legacy::connect::{proxy::Tunnel, HttpConnector};

#[tokio::test]
async fn test_tunnel_works() {
let tcp = TcpListener::bind("127.0.0.1:0").await.expect("bind");
let addr = tcp.local_addr().expect("local_addr");

let proxy_dst = format!("http://{}", addr).parse().expect("uri");
let mut connector = Tunnel::new(proxy_dst, HttpConnector::new());
let t1 = tokio::spawn(async move {
let _conn = connector
.call("https://hyper.rs".parse().unwrap())
.await
.expect("tunnel");
});

let t2 = tokio::spawn(async move {
let (mut io, _) = tcp.accept().await.expect("accept");
let mut buf = [0u8; 64];
let n = io.read(&mut buf).await.expect("read 1");
assert_eq!(
&buf[..n],
b"CONNECT hyper.rs:443 HTTP/1.1\r\nHost: hyper.rs:443\r\n\r\n"
);
io.write_all(b"HTTP/1.1 200 OK\r\n\r\n")
.await
.expect("write 1");
});

t1.await.expect("task 1");
t2.await.expect("task 2");
}

0 comments on commit 7237179

Please sign in to comment.