Skip to content

Commit 334209d

Browse files
feat(server): add AutoConnection (#11)
A way to auto detect HTTP version from the client.
1 parent 229757e commit 334209d

File tree

7 files changed

+684
-0
lines changed

7 files changed

+684
-0
lines changed

Cargo.toml

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,8 @@ hyper = "=1.0.0-rc.4"
1919
futures-channel = "0.3"
2020
futures-util = { version = "0.3", default-features = false }
2121
http = "0.2"
22+
http-body = "1.0.0-rc.2"
23+
bytes = "1"
2224

2325
once_cell = "1.14"
2426

@@ -30,9 +32,11 @@ tower-service = "0.3"
3032
tower = { version = "0.4", features = ["make", "util"] }
3133

3234
[dev-dependencies]
35+
hyper = { version = "1.0.0-rc.3", features = ["full"] }
3336
bytes = "1"
3437
http-body-util = "0.1.0-rc.3"
3538
tokio = { version = "1", features = ["macros", "test-util"] }
39+
tokio-test = "0.4"
3640

3741
[target.'cfg(any(target_os = "linux", target_os = "macos"))'.dev-dependencies]
3842
pnet_datalink = "0.27.2"
@@ -50,6 +54,7 @@ http1 = ["hyper/http1"]
5054
http2 = ["hyper/http2"]
5155

5256
tcp = []
57+
auto = ["hyper/server", "http1", "http2"]
5358
runtime = []
5459

5560
# internal features used in CI

src/common/mod.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,7 @@ pub mod exec;
1414
#[cfg(feature = "client")]
1515
mod lazy;
1616
pub(crate) mod never;
17+
pub(crate) mod rewind;
1718
#[cfg(feature = "client")]
1819
mod sync;
1920

src/common/rewind.rs

Lines changed: 161 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,161 @@
1+
use std::marker::Unpin;
2+
use std::{cmp, io};
3+
4+
use bytes::{Buf, Bytes};
5+
use tokio::io::{AsyncRead, AsyncWrite, ReadBuf};
6+
7+
use std::{
8+
pin::Pin,
9+
task::{self, Poll},
10+
};
11+
12+
/// Combine a buffer with an IO, rewinding reads to use the buffer.
13+
#[derive(Debug)]
14+
pub(crate) struct Rewind<T> {
15+
pre: Option<Bytes>,
16+
inner: T,
17+
}
18+
19+
impl<T> Rewind<T> {
20+
#[cfg(test)]
21+
pub(crate) fn new(io: T) -> Self {
22+
Rewind {
23+
pre: None,
24+
inner: io,
25+
}
26+
}
27+
28+
#[allow(dead_code)]
29+
pub(crate) fn new_buffered(io: T, buf: Bytes) -> Self {
30+
Rewind {
31+
pre: Some(buf),
32+
inner: io,
33+
}
34+
}
35+
36+
#[cfg(test)]
37+
pub(crate) fn rewind(&mut self, bs: Bytes) {
38+
debug_assert!(self.pre.is_none());
39+
self.pre = Some(bs);
40+
}
41+
42+
// pub(crate) fn into_inner(self) -> (T, Bytes) {
43+
// (self.inner, self.pre.unwrap_or_else(Bytes::new))
44+
// }
45+
46+
// pub(crate) fn get_mut(&mut self) -> &mut T {
47+
// &mut self.inner
48+
// }
49+
}
50+
51+
impl<T> AsyncRead for Rewind<T>
52+
where
53+
T: AsyncRead + Unpin,
54+
{
55+
fn poll_read(
56+
mut self: Pin<&mut Self>,
57+
cx: &mut task::Context<'_>,
58+
buf: &mut ReadBuf<'_>,
59+
) -> Poll<io::Result<()>> {
60+
if let Some(mut prefix) = self.pre.take() {
61+
// If there are no remaining bytes, let the bytes get dropped.
62+
if !prefix.is_empty() {
63+
let copy_len = cmp::min(prefix.len(), buf.remaining());
64+
// TODO: There should be a way to do following two lines cleaner...
65+
buf.put_slice(&prefix[..copy_len]);
66+
prefix.advance(copy_len);
67+
// Put back what's left
68+
if !prefix.is_empty() {
69+
self.pre = Some(prefix);
70+
}
71+
72+
return Poll::Ready(Ok(()));
73+
}
74+
}
75+
Pin::new(&mut self.inner).poll_read(cx, buf)
76+
}
77+
}
78+
79+
impl<T> AsyncWrite for Rewind<T>
80+
where
81+
T: AsyncWrite + Unpin,
82+
{
83+
fn poll_write(
84+
mut self: Pin<&mut Self>,
85+
cx: &mut task::Context<'_>,
86+
buf: &[u8],
87+
) -> Poll<io::Result<usize>> {
88+
Pin::new(&mut self.inner).poll_write(cx, buf)
89+
}
90+
91+
fn poll_write_vectored(
92+
mut self: Pin<&mut Self>,
93+
cx: &mut task::Context<'_>,
94+
bufs: &[io::IoSlice<'_>],
95+
) -> Poll<io::Result<usize>> {
96+
Pin::new(&mut self.inner).poll_write_vectored(cx, bufs)
97+
}
98+
99+
fn poll_flush(mut self: Pin<&mut Self>, cx: &mut task::Context<'_>) -> Poll<io::Result<()>> {
100+
Pin::new(&mut self.inner).poll_flush(cx)
101+
}
102+
103+
fn poll_shutdown(mut self: Pin<&mut Self>, cx: &mut task::Context<'_>) -> Poll<io::Result<()>> {
104+
Pin::new(&mut self.inner).poll_shutdown(cx)
105+
}
106+
107+
fn is_write_vectored(&self) -> bool {
108+
self.inner.is_write_vectored()
109+
}
110+
}
111+
112+
#[cfg(test)]
113+
mod tests {
114+
// FIXME: re-implement tests with `async/await`, this import should
115+
// trigger a warning to remind us
116+
use super::Rewind;
117+
use bytes::Bytes;
118+
use tokio::io::AsyncReadExt;
119+
120+
#[cfg(not(miri))]
121+
#[tokio::test]
122+
async fn partial_rewind() {
123+
let underlying = [104, 101, 108, 108, 111];
124+
125+
let mock = tokio_test::io::Builder::new().read(&underlying).build();
126+
127+
let mut stream = Rewind::new(mock);
128+
129+
// Read off some bytes, ensure we filled o1
130+
let mut buf = [0; 2];
131+
stream.read_exact(&mut buf).await.expect("read1");
132+
133+
// Rewind the stream so that it is as if we never read in the first place.
134+
stream.rewind(Bytes::copy_from_slice(&buf[..]));
135+
136+
let mut buf = [0; 5];
137+
stream.read_exact(&mut buf).await.expect("read1");
138+
139+
// At this point we should have read everything that was in the MockStream
140+
assert_eq!(&buf, &underlying);
141+
}
142+
143+
#[cfg(not(miri))]
144+
#[tokio::test]
145+
async fn full_rewind() {
146+
let underlying = [104, 101, 108, 108, 111];
147+
148+
let mock = tokio_test::io::Builder::new().read(&underlying).build();
149+
150+
let mut stream = Rewind::new(mock);
151+
152+
let mut buf = [0; 5];
153+
stream.read_exact(&mut buf).await.expect("read1");
154+
155+
// Rewind the stream so that it is as if we never read in the first place.
156+
stream.rewind(Bytes::copy_from_slice(&buf[..]));
157+
158+
let mut buf = [0; 5];
159+
stream.read_exact(&mut buf).await.expect("read1");
160+
}
161+
}

src/lib.rs

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,12 @@
11
#![deny(missing_docs)]
2+
#![cfg_attr(docsrs, feature(doc_auto_cfg, doc_cfg))]
23

34
//! hyper-util
45
56
#[cfg(feature = "client")]
67
pub mod client;
78
mod common;
89
pub mod rt;
10+
pub mod server;
11+
12+
mod error;

0 commit comments

Comments
 (0)