Skip to content

Commit

Permalink
Merge pull request #28 from quasiyoke/feature/http-crate-1.0
Browse files Browse the repository at this point in the history
Upgrade http crate to 1.0, fix examples
  • Loading branch information
messense authored Jun 17, 2024
2 parents 39dd4f3 + cf86ab8 commit d9dfc90
Show file tree
Hide file tree
Showing 10 changed files with 226 additions and 131 deletions.
12 changes: 7 additions & 5 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -37,10 +37,11 @@ required-features = [ "warp-compat" ]
bytes = "1.0.1"
futures-util = "0.3.16"
futures-channel = "0.3.16"
headers = "0.3.0"
headers = "0.4.0"
htmlescape = "0.3.1"
http = "0.2.3"
http-body = "0.4.0"
http = "1.0.0"
http-body = "1.0.0"
http-body-util = "0.1.0"
lazy_static = "1.4.0"
libc = { version = "0.2.0", optional = true }
log = "0.4.0"
Expand All @@ -58,12 +59,13 @@ uuid = { version = "1.1.2", features = ["v4"] }
xml-rs = "0.8.0"
xmltree = "0.10.0"

hyper = { version = "0.14.0", optional = true }
hyper = { version = "1.1.0", optional = true }
warp = { version = "0.3.0", optional = true, default-features = false }
actix-web = { version = "4.0.0-beta.15", optional = true }

[dev-dependencies]
clap = { version = "4.0.0", features = ["derive"] }
env_logger = "0.11.0"
hyper = { version = "0.14.0", features = [ "http1", "http2", "server", "stream", "runtime" ] }
hyper = { version = "1.1.0", features = ["http1", "server"] }
hyper-util = { version = "0.1.2", features = ["tokio"] }
tokio = { version = "1.3.0", features = ["full"] }
56 changes: 39 additions & 17 deletions examples/hyper.rs
Original file line number Diff line number Diff line change
@@ -1,31 +1,53 @@
use std::{convert::Infallible, net::SocketAddr};

use hyper::{server::conn::http1, service::service_fn};
use hyper_util::rt::TokioIo;
use tokio::net::TcpListener;

use dav_server::{fakels::FakeLs, localfs::LocalFs, DavHandler};
use std::convert::Infallible;

#[tokio::main]
async fn main() {
env_logger::init();
let dir = "/tmp";
let addr = ([127, 0, 0, 1], 4918).into();
let addr: SocketAddr = ([127, 0, 0, 1], 4918).into();

let dav_server = DavHandler::builder()
.filesystem(LocalFs::new(dir, false, false, false))
.locksystem(FakeLs::new())
.build_handler();

let make_service = hyper::service::make_service_fn(move |_| {
let listener = TcpListener::bind(addr).await.unwrap();

println!("Listening {addr}");

// We start a loop to continuously accept incoming connections
loop {
let (stream, _) = listener.accept().await.unwrap();
let dav_server = dav_server.clone();
async move {
let func = move |req| {
let dav_server = dav_server.clone();
async move { Ok::<_, Infallible>(dav_server.handle(req).await) }
};
Ok::<_, Infallible>(hyper::service::service_fn(func))
}
});

println!("hyper example: listening on {:?} serving {}", addr, dir);
let _ = hyper::Server::bind(&addr)
.serve(make_service)
.await
.map_err(|e| eprintln!("server error: {}", e));

// Use an adapter to access something implementing `tokio::io` traits as if they implement
// `hyper::rt` IO traits.
let io = TokioIo::new(stream);

// Spawn a tokio task to serve multiple connections concurrently
tokio::task::spawn(async move {
// Finally, we bind the incoming connection to our `hello` service
if let Err(err) = http1::Builder::new()
// `service_fn` converts our function in a `Service`
.serve_connection(
io,
service_fn({
move |req| {
let dav_server = dav_server.clone();
async move { Ok::<_, Infallible>(dav_server.handle(req).await) }
}
}),
)
.await
{
eprintln!("Failed serving: {err:?}");
}
});
}
}
61 changes: 37 additions & 24 deletions examples/sample-litmus-server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,14 +5,13 @@
// Connect to http://localhost:4918/
//

use std::convert::Infallible;
use std::error::Error;
use std::net::SocketAddr;
use std::str::FromStr;
use std::{convert::Infallible, error::Error, net::SocketAddr};

use clap::Parser;
use futures_util::future::TryFutureExt;
use headers::{authorization::Basic, Authorization, HeaderMapExt};
use hyper::{server::conn::http1, service::service_fn};
use hyper_util::rt::TokioIo;
use tokio::net::TcpListener;

use dav_server::{body::Body, fakels, localfs, memfs, memls, DavConfig, DavHandler};

Expand Down Expand Up @@ -45,7 +44,7 @@ impl Server {

async fn handle(
&self,
req: hyper::Request<hyper::Body>,
req: hyper::Request<hyper::body::Incoming>,
) -> Result<hyper::Response<Body>, Infallible> {
let user = if self.auth {
// we want the client to authenticate.
Expand Down Expand Up @@ -112,26 +111,40 @@ async fn main() -> Result<(), Box<dyn Error>> {
let fakels = args.fakels;

let dav_server = Server::new(dir.to_string(), memls, fakels, auth);
let make_service = hyper::service::make_service_fn(|_| {
let dav_server = dav_server.clone();
async move {
let func = move |req| {
let dav_server = dav_server.clone();
async move { dav_server.clone().handle(req).await }
};
Ok::<_, hyper::Error>(hyper::service::service_fn(func))
}
});

let port = args.port;
let addr = format!("0.0.0.0:{}", port);
let addr = SocketAddr::from_str(&addr)?;

let server = hyper::Server::try_bind(&addr)?
.serve(make_service)
.map_err(|e| eprintln!("server error: {}", e));
let addr: SocketAddr = ([0, 0, 0, 0], port).into();
let listener = TcpListener::bind(addr).await?;

println!("Serving {} on {}", name, port);
let _ = server.await;
Ok(())

// We start a loop to continuously accept incoming connections
loop {
let (stream, _) = listener.accept().await?;
let dav_server = dav_server.clone();

// Use an adapter to access something implementing `tokio::io` traits as if they implement
// `hyper::rt` IO traits.
let io = TokioIo::new(stream);

// Spawn a tokio task to serve multiple connections concurrently
tokio::task::spawn(async move {
// Finally, we bind the incoming connection to our `hello` service
if let Err(err) = http1::Builder::new()
// `service_fn` converts our function in a `Service`
.serve_connection(
io,
service_fn({
move |req| {
let dav_server = dav_server.clone();
async move { dav_server.clone().handle(req).await }
}
}),
)
.await
{
eprintln!("Error serving connection: {:?}", err);
}
});
}
}
62 changes: 34 additions & 28 deletions src/actix.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,10 +12,12 @@
//! }
//! ```
//!
use std::io;

use std::pin::Pin;
use std::task::{Context, Poll};
use std::{
convert::TryInto,
io,
pin::Pin,
task::{Context, Poll},
};

use actix_web::body::BoxBody;
use actix_web::error::PayloadError;
Expand Down Expand Up @@ -45,11 +47,11 @@ impl FromRequest for DavRequest {

fn from_request(req: &HttpRequest, payload: &mut dev::Payload) -> Self::Future {
let mut builder = http::Request::builder()
.method(req.method().to_owned())
.uri(req.uri().to_owned())
.version(req.version().to_owned());
.method(req.method().as_ref())
.uri(req.uri().to_string())
.version(from_actix_http_version(req.version()));
for (name, value) in req.headers().iter() {
builder = builder.header(name, value);
builder = builder.header(name.as_str(), value.as_ref());
}
let path = req.match_info().unprocessed();
let tail = req.match_info().unprocessed();
Expand Down Expand Up @@ -82,29 +84,20 @@ impl http_body::Body for DavBody {
type Data = Bytes;
type Error = io::Error;

fn poll_data(
fn poll_frame(
self: Pin<&mut Self>,
cx: &mut Context<'_>,
) -> Poll<Option<Result<Self::Data, Self::Error>>> {
let this = self.project();
match this.body.poll_next(cx) {
Poll::Ready(Some(Ok(data))) => Poll::Ready(Some(Ok(data))),
Poll::Ready(Some(Err(err))) => Poll::Ready(Some(Err(match err {
) -> Poll<Option<Result<http_body::Frame<Self::Data>, Self::Error>>> {
self.project()
.body
.poll_next(cx)
.map_ok(http_body::Frame::data)
.map_err(|err| match err {
PayloadError::Incomplete(Some(err)) => err,
PayloadError::Incomplete(None) => io::ErrorKind::BrokenPipe.into(),
PayloadError::Io(err) => err,
other => io::Error::new(io::ErrorKind::Other, format!("{:?}", other)),
}))),
Poll::Ready(None) => Poll::Ready(None),
Poll::Pending => Poll::Pending,
}
}

fn poll_trailers(
self: Pin<&mut Self>,
_cx: &mut Context,
) -> Poll<Result<Option<http::HeaderMap>, Self::Error>> {
Poll::Ready(Ok(None))
err => io::Error::new(io::ErrorKind::Other, format!("{err:?}")),
})
}
}

Expand All @@ -126,9 +119,9 @@ impl actix_web::Responder for DavResponse {
use crate::body::{Body, BodyType};

let (parts, body) = self.0.into_parts();
let mut builder = HttpResponse::build(parts.status);
let mut builder = HttpResponse::build(parts.status.as_u16().try_into().unwrap());
for (name, value) in parts.headers.into_iter() {
builder.append_header((name.unwrap(), value));
builder.append_header((name.unwrap().as_str(), value.as_ref()));
}
// I noticed that actix-web returns an empty chunked body
// (\r\n0\r\n\r\n) and _no_ Transfer-Encoding header on
Expand All @@ -144,3 +137,16 @@ impl actix_web::Responder for DavResponse {
resp
}
}

/// Converts HTTP version from `actix_web` version of `http` crate while `actix_web` remains on old version.
/// https://github.com/actix/actix-web/issues/3384
fn from_actix_http_version(v: actix_web::http::Version) -> http::Version {
match v {
actix_web::http::Version::HTTP_3 => http::Version::HTTP_3,
actix_web::http::Version::HTTP_2 => http::Version::HTTP_2,
actix_web::http::Version::HTTP_11 => http::Version::HTTP_11,
actix_web::http::Version::HTTP_10 => http::Version::HTTP_10,
actix_web::http::Version::HTTP_09 => http::Version::HTTP_09,
v => unreachable!("unexpected HTTP version {:?}", v),
}
}
29 changes: 7 additions & 22 deletions src/body.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@ use std::task::{Context, Poll};

use bytes::{Buf, Bytes};
use futures_util::stream::Stream;
use http::header::HeaderMap;
use http_body::Body as HttpBody;

use crate::async_stream::AsyncStream;
Expand Down Expand Up @@ -52,18 +51,11 @@ impl HttpBody for Body {
type Data = Bytes;
type Error = io::Error;

fn poll_data(
fn poll_frame(
self: Pin<&mut Self>,
cx: &mut Context,
) -> Poll<Option<Result<Self::Data, Self::Error>>> {
self.poll_next(cx)
}

fn poll_trailers(
self: Pin<&mut Self>,
_cx: &mut Context,
) -> Poll<Result<Option<HeaderMap>, Self::Error>> {
Poll::Ready(Ok(None))
cx: &mut Context<'_>,
) -> Poll<Option<Result<http_body::Frame<Self::Data>, Self::Error>>> {
self.poll_next(cx).map_ok(http_body::Frame::data)
}
}

Expand Down Expand Up @@ -119,19 +111,12 @@ where
type Data = ReqData;
type Error = ReqError;

fn poll_data(
fn poll_frame(
self: Pin<&mut Self>,
cx: &mut Context<'_>,
) -> Poll<Option<Result<Self::Data, Self::Error>>> {
) -> Poll<Option<Result<http_body::Frame<Self::Data>, Self::Error>>> {
let this = self.project();
this.body.poll_next(cx)
}

fn poll_trailers(
self: Pin<&mut Self>,
_cx: &mut Context,
) -> Poll<Result<Option<HeaderMap>, Self::Error>> {
Poll::Ready(Ok(None))
this.body.poll_next(cx).map_ok(http_body::Frame::data)
}
}

Expand Down
11 changes: 9 additions & 2 deletions src/davhandler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ use futures_util::stream::Stream;
use headers::HeaderMapExt;
use http::{Request, Response, StatusCode};
use http_body::Body as HttpBody;
use http_body_util::BodyExt;

use crate::body::{Body, StreamBody};
use crate::davheaders;
Expand Down Expand Up @@ -375,13 +376,19 @@ impl DavInner {
{
let mut data = Vec::new();
pin_utils::pin_mut!(body);
while let Some(res) = body.data().await {
let mut buf = res.map_err(|_| {

while let Some(res) = body.frame().await {
let mut data_frame = res.map_err(|_| {
DavError::IoError(io::Error::new(
io::ErrorKind::UnexpectedEof,
"UnexpectedEof",
))
})?;

let Some(buf) = data_frame.data_mut() else {
continue;
};

while buf.has_remaining() {
if data.len() + buf.remaining() > max_size {
return Err(StatusCode::PAYLOAD_TOO_LARGE.into());
Expand Down
2 changes: 1 addition & 1 deletion src/handle_gethead.rs
Original file line number Diff line number Diff line change
Expand Up @@ -127,7 +127,7 @@ impl crate::DavInner {
if let Some(r) = req.headers().typed_get::<headers::Range>() {
trace!("handle_gethead: range header {:?}", r);
use std::ops::Bound::*;
for range in r.iter() {
for range in r.satisfiable_ranges(len) {
let (start, mut count, valid) = match range {
(Included(s), Included(e)) if e >= s => (s, e - s + 1, true),
(Included(s), Unbounded) if s <= len => (s, len - s, true),
Expand Down
Loading

0 comments on commit d9dfc90

Please sign in to comment.