-
-
Notifications
You must be signed in to change notification settings - Fork 154
feat(server): add AutoConnection #11
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Merged
Merged
Changes from all commits
Commits
Show all changes
14 commits
Select commit
Hold shift + click to select a range
c065585
feat(server): add AutoConn
programatik29 124fdb8
rename to AutoConnection, split code to modules and minor code improv…
programatik29 f0c9736
move http-body-util to dev-dependencies
programatik29 6fca103
builder api and replace PrependAsyncRead with Rewind
programatik29 d6aafb9
fix miri CI
programatik29 99ee112
merge master branch
programatik29 29b9651
new builder style
programatik29 a72b999
arrange features
programatik29 e05b091
update dependencies
programatik29 3bb1aa5
allow passing custom executor
programatik29 5dcd704
merge master branch
programatik29 ee6505a
Merge branch 'master' into auto-conn
seanmonstar 2cda709
fix merge conflicts
programatik29 e58bbad
Merge branch 'master' into auto-conn
programatik29 File filter
Filter by extension
Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
There are no files selected for viewing
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,161 @@ | ||
use std::marker::Unpin; | ||
use std::{cmp, io}; | ||
|
||
use bytes::{Buf, Bytes}; | ||
use tokio::io::{AsyncRead, AsyncWrite, ReadBuf}; | ||
|
||
use std::{ | ||
pin::Pin, | ||
task::{self, Poll}, | ||
}; | ||
|
||
/// Combine a buffer with an IO, rewinding reads to use the buffer. | ||
#[derive(Debug)] | ||
pub(crate) struct Rewind<T> { | ||
pre: Option<Bytes>, | ||
inner: T, | ||
} | ||
|
||
impl<T> Rewind<T> { | ||
#[cfg(test)] | ||
pub(crate) fn new(io: T) -> Self { | ||
Rewind { | ||
pre: None, | ||
inner: io, | ||
} | ||
} | ||
|
||
#[allow(dead_code)] | ||
pub(crate) fn new_buffered(io: T, buf: Bytes) -> Self { | ||
Rewind { | ||
pre: Some(buf), | ||
inner: io, | ||
} | ||
} | ||
|
||
#[cfg(test)] | ||
pub(crate) fn rewind(&mut self, bs: Bytes) { | ||
debug_assert!(self.pre.is_none()); | ||
self.pre = Some(bs); | ||
} | ||
|
||
// pub(crate) fn into_inner(self) -> (T, Bytes) { | ||
// (self.inner, self.pre.unwrap_or_else(Bytes::new)) | ||
// } | ||
|
||
// pub(crate) fn get_mut(&mut self) -> &mut T { | ||
// &mut self.inner | ||
// } | ||
} | ||
|
||
impl<T> AsyncRead for Rewind<T> | ||
where | ||
T: AsyncRead + Unpin, | ||
{ | ||
fn poll_read( | ||
mut self: Pin<&mut Self>, | ||
cx: &mut task::Context<'_>, | ||
buf: &mut ReadBuf<'_>, | ||
) -> Poll<io::Result<()>> { | ||
if let Some(mut prefix) = self.pre.take() { | ||
// If there are no remaining bytes, let the bytes get dropped. | ||
if !prefix.is_empty() { | ||
let copy_len = cmp::min(prefix.len(), buf.remaining()); | ||
// TODO: There should be a way to do following two lines cleaner... | ||
buf.put_slice(&prefix[..copy_len]); | ||
prefix.advance(copy_len); | ||
// Put back what's left | ||
if !prefix.is_empty() { | ||
self.pre = Some(prefix); | ||
} | ||
|
||
return Poll::Ready(Ok(())); | ||
} | ||
} | ||
Pin::new(&mut self.inner).poll_read(cx, buf) | ||
} | ||
} | ||
|
||
impl<T> AsyncWrite for Rewind<T> | ||
where | ||
T: AsyncWrite + Unpin, | ||
{ | ||
fn poll_write( | ||
mut self: Pin<&mut Self>, | ||
cx: &mut task::Context<'_>, | ||
buf: &[u8], | ||
) -> Poll<io::Result<usize>> { | ||
Pin::new(&mut self.inner).poll_write(cx, buf) | ||
} | ||
|
||
fn poll_write_vectored( | ||
mut self: Pin<&mut Self>, | ||
cx: &mut task::Context<'_>, | ||
bufs: &[io::IoSlice<'_>], | ||
) -> Poll<io::Result<usize>> { | ||
Pin::new(&mut self.inner).poll_write_vectored(cx, bufs) | ||
} | ||
|
||
fn poll_flush(mut self: Pin<&mut Self>, cx: &mut task::Context<'_>) -> Poll<io::Result<()>> { | ||
Pin::new(&mut self.inner).poll_flush(cx) | ||
} | ||
|
||
fn poll_shutdown(mut self: Pin<&mut Self>, cx: &mut task::Context<'_>) -> Poll<io::Result<()>> { | ||
Pin::new(&mut self.inner).poll_shutdown(cx) | ||
} | ||
|
||
fn is_write_vectored(&self) -> bool { | ||
self.inner.is_write_vectored() | ||
} | ||
} | ||
|
||
#[cfg(test)] | ||
mod tests { | ||
// FIXME: re-implement tests with `async/await`, this import should | ||
// trigger a warning to remind us | ||
use super::Rewind; | ||
use bytes::Bytes; | ||
use tokio::io::AsyncReadExt; | ||
|
||
#[cfg(not(miri))] | ||
#[tokio::test] | ||
async fn partial_rewind() { | ||
let underlying = [104, 101, 108, 108, 111]; | ||
|
||
let mock = tokio_test::io::Builder::new().read(&underlying).build(); | ||
|
||
let mut stream = Rewind::new(mock); | ||
|
||
// Read off some bytes, ensure we filled o1 | ||
let mut buf = [0; 2]; | ||
stream.read_exact(&mut buf).await.expect("read1"); | ||
|
||
// Rewind the stream so that it is as if we never read in the first place. | ||
stream.rewind(Bytes::copy_from_slice(&buf[..])); | ||
|
||
let mut buf = [0; 5]; | ||
stream.read_exact(&mut buf).await.expect("read1"); | ||
|
||
// At this point we should have read everything that was in the MockStream | ||
assert_eq!(&buf, &underlying); | ||
} | ||
|
||
#[cfg(not(miri))] | ||
#[tokio::test] | ||
async fn full_rewind() { | ||
let underlying = [104, 101, 108, 108, 111]; | ||
|
||
let mock = tokio_test::io::Builder::new().read(&underlying).build(); | ||
|
||
let mut stream = Rewind::new(mock); | ||
|
||
let mut buf = [0; 5]; | ||
stream.read_exact(&mut buf).await.expect("read1"); | ||
|
||
// Rewind the stream so that it is as if we never read in the first place. | ||
stream.rewind(Bytes::copy_from_slice(&buf[..])); | ||
|
||
let mut buf = [0; 5]; | ||
stream.read_exact(&mut buf).await.expect("read1"); | ||
} | ||
} |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -1,8 +1,12 @@ | ||
#![deny(missing_docs)] | ||
#![cfg_attr(docsrs, feature(doc_auto_cfg, doc_cfg))] | ||
|
||
//! hyper-util | ||
|
||
#[cfg(feature = "client")] | ||
pub mod client; | ||
mod common; | ||
pub mod rt; | ||
pub mod server; | ||
|
||
mod error; |
Oops, something went wrong.
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
Uh oh!
There was an error while loading. Please reload this page.