From eab53cf979cd12aed3d8cc805bf49f0e1e31d2dc Mon Sep 17 00:00:00 2001 From: Casey Primozic Date: Mon, 14 Sep 2020 00:05:57 -0700 Subject: [PATCH] Hacky gzip compression impl --- contrib/lib/Cargo.toml | 4 +- contrib/lib/src/compression/fairing.rs | 78 ++----------- contrib/lib/src/compression/mod.rs | 96 +++++++++++++-- contrib/lib/src/compression/responder.rs | 1 + contrib/lib/src/lib.rs | 41 ++++--- core/lib/src/response/response.rs | 142 ++++++++++++++--------- 6 files changed, 217 insertions(+), 145 deletions(-) diff --git a/contrib/lib/Cargo.toml b/contrib/lib/Cargo.toml index 60797a2c34..74b287e6cf 100644 --- a/contrib/lib/Cargo.toml +++ b/contrib/lib/Cargo.toml @@ -43,6 +43,8 @@ tokio = { version = "0.2.0", optional = true } rocket_contrib_codegen = { version = "0.5.0-dev", path = "../codegen", optional = true } rocket = { version = "0.5.0-dev", path = "../../core/lib/", default-features = false } log = "0.4" +lazy_static = "1.4" +futures = "0.3" # Serialization and templating dependencies. serde = { version = "1.0", optional = true } @@ -75,7 +77,7 @@ time = { version = "0.2.9", optional = true } # Compression dependencies brotli = { version = "3.3", optional = true } -flate2 = { version = "1.0", optional = true } +flate2 = { version = "1.0", optional = true, features = ["tokio"] } [package.metadata.docs.rs] all-features = true diff --git a/contrib/lib/src/compression/fairing.rs b/contrib/lib/src/compression/fairing.rs index 00f9897504..b0caf3fe24 100644 --- a/contrib/lib/src/compression/fairing.rs +++ b/contrib/lib/src/compression/fairing.rs @@ -1,26 +1,16 @@ -use rocket::config::{ConfigError, Value}; use rocket::fairing::{Fairing, Info, Kind}; use rocket::http::MediaType; -use rocket::Rocket; use rocket::{Request, Response}; -struct Context { - exclusions: Vec, -} - -impl Default for Context { - fn default() -> Context { - Context { - exclusions: vec![ - MediaType::parse_flexible("application/gzip").unwrap(), - MediaType::parse_flexible("application/zip").unwrap(), - MediaType::parse_flexible("image/*").unwrap(), - MediaType::parse_flexible("video/*").unwrap(), - MediaType::parse_flexible("application/wasm").unwrap(), - MediaType::parse_flexible("application/octet-stream").unwrap(), - ], - } - } +lazy_static! { + static ref EXCLUSIONS: Vec = vec![ + MediaType::parse_flexible("application/gzip").unwrap(), + MediaType::parse_flexible("application/zip").unwrap(), + MediaType::parse_flexible("image/*").unwrap(), + MediaType::parse_flexible("video/*").unwrap(), + MediaType::parse_flexible("application/wasm").unwrap(), + MediaType::parse_flexible("application/octet-stream").unwrap(), + ]; } /// Compresses all responses with Brotli or Gzip compression. @@ -95,6 +85,7 @@ impl Compression { } } +#[async_trait] impl Fairing for Compression { fn info(&self) -> Info { Info { @@ -103,52 +94,7 @@ impl Fairing for Compression { } } - fn on_attach(&self, rocket: Rocket) -> Result { - let mut ctxt = Context::default(); - - match rocket.config().get_table("compress").and_then(|t| { - t.get("exclude").ok_or_else(|| ConfigError::Missing(String::from("exclude"))) - }) { - Ok(excls) => match excls.as_array() { - Some(excls) => { - ctxt.exclusions = excls.iter().flat_map(|ex| { - if let Value::String(s) = ex { - let mt = MediaType::parse_flexible(s); - if mt.is_none() { - warn_!("Ignoring invalid media type '{:?}'", s); - } - mt - } else { - warn_!("Ignoring non-string media type '{:?}'", ex); - None - } - }).collect(); - } - None => { - warn_!( - "Exclusions is not an array; using default compression exclusions '{:?}'", - ctxt.exclusions - ); - } - }, - Err(ConfigError::Missing(_)) => { /* ignore missing */ } - Err(e) => { - e.pretty_print(); - warn_!( - "Using default compression exclusions '{:?}'", - ctxt.exclusions - ); - } - }; - - Ok(rocket.manage(ctxt)) - } - - fn on_response<'r>(&self, request: &'r Request<'_>, response: &mut Response<'r>) { - let context = request - .guard::>() - .expect("Compression Context registered in on_attach"); - - super::CompressionUtils::compress_response(request, response, &context.exclusions); + async fn on_response<'r>(&self, request: &'r Request<'_>, response: &mut Response<'r>) { + super::CompressionUtils::compress_response(request, response, &EXCLUSIONS); } } diff --git a/contrib/lib/src/compression/mod.rs b/contrib/lib/src/compression/mod.rs index 8ba9f3d5e1..abd94b4ecf 100644 --- a/contrib/lib/src/compression/mod.rs +++ b/contrib/lib/src/compression/mod.rs @@ -30,8 +30,10 @@ pub use self::responder::Compress; use std::io::Read; +use futures::future::FutureExt; +use futures::StreamExt; +use rocket::http::hyper::header::CONTENT_ENCODING; use rocket::http::MediaType; -use rocket::http::hyper::header::{ContentEncoding, Encoding}; use rocket::{Request, Response}; #[cfg(feature = "brotli_compression")] @@ -40,6 +42,57 @@ use brotli::enc::backward_references::BrotliEncoderMode; #[cfg(feature = "gzip_compression")] use flate2::read::GzEncoder; +pub enum Encoding { + /// The `chunked` encoding. + Chunked, + /// The `br` encoding. + Brotli, + /// The `gzip` encoding. + Gzip, + /// The `deflate` encoding. + Deflate, + /// The `compress` encoding. + Compress, + /// The `identity` encoding. + Identity, + /// The `trailers` encoding. + Trailers, + /// Some other encoding that is less common, can be any String. + EncodingExt(String), +} + +impl std::fmt::Display for Encoding { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + f.write_str(match *self { + Encoding::Chunked => "chunked", + Encoding::Brotli => "br", + Encoding::Gzip => "gzip", + Encoding::Deflate => "deflate", + Encoding::Compress => "compress", + Encoding::Identity => "identity", + Encoding::Trailers => "trailers", + Encoding::EncodingExt(ref s) => s.as_ref(), + }) + } +} + +impl std::str::FromStr for Encoding { + type Err = std::convert::Infallible; + + fn from_str(s: &str) -> Result { + match s { + "chunked" => Ok(Encoding::Chunked), + "br" => Ok(Encoding::Brotli), + "deflate" => Ok(Encoding::Deflate), + "gzip" => Ok(Encoding::Gzip), + "compress" => Ok(Encoding::Compress), + "identity" => Ok(Encoding::Identity), + "trailers" => Ok(Encoding::Trailers), + _ => Ok(Encoding::EncodingExt(s.to_owned())), + } + } +} + struct CompressionUtils; impl CompressionUtils { @@ -56,12 +109,15 @@ impl CompressionUtils { response.headers().get("Content-Encoding").next().is_some() } - fn set_body_and_encoding<'r, B: Read + 'r>( + fn set_body_and_encoding<'r, B: rocket::tokio::io::AsyncRead + Send + 'r>( response: &mut Response<'r>, body: B, encoding: Encoding, ) { - response.set_header(ContentEncoding(vec![encoding])); + response.set_header(::rocket::http::Header::new( + CONTENT_ENCODING.as_str(), + format!("{}", encoding), + )); response.set_streamed_body(body); } @@ -81,7 +137,11 @@ impl CompressionUtils { } } - fn compress_response(request: &Request<'_>, response: &mut Response<'_>, exclusions: &[MediaType]) { + fn compress_response( + request: &Request<'_>, + response: &mut Response<'_>, + exclusions: &[MediaType], + ) { if CompressionUtils::already_encoded(response) { return; } @@ -94,7 +154,7 @@ impl CompressionUtils { // Compression is done when the request accepts brotli or gzip encoding // and the corresponding feature is enabled - if cfg!(feature = "brotli_compression") && CompressionUtils::accepts_encoding(request, "br") + /*if cfg!(feature = "brotli_compression") && CompressionUtils::accepts_encoding(request, "br") { #[cfg(feature = "brotli_compression")] { @@ -118,15 +178,33 @@ impl CompressionUtils { ); } } - } else if cfg!(feature = "gzip_compression") - && CompressionUtils::accepts_encoding(request, "gzip") + } else */ + if cfg!(feature = "gzip_compression") && CompressionUtils::accepts_encoding(request, "gzip") { #[cfg(feature = "gzip_compression")] { if let Some(plain) = response.take_body() { - let compressor = GzEncoder::new(plain.into_inner(), flate2::Compression::default()); + let body = async { + let body = plain.into_bytes().await.unwrap_or_else(Vec::new); + let mut compressor = + GzEncoder::new(body.as_slice(), flate2::Compression::default()); + let mut buf = Vec::new(); + match compressor.read_to_end(&mut buf) { + Ok(_) => (), + Err(err) => { + error!("Error compressing response with gzip: {:?}", err); + return futures::stream::iter(vec![Err(err)]); + } + } + + futures::stream::iter(vec![Ok(std::io::Cursor::new(buf))]) + } + .into_stream() + .flatten(); + + let body = tokio::io::stream_reader(body); - CompressionUtils::set_body_and_encoding(response, compressor, Encoding::Gzip); + CompressionUtils::set_body_and_encoding(response, body, Encoding::Gzip); } } } diff --git a/contrib/lib/src/compression/responder.rs b/contrib/lib/src/compression/responder.rs index bd0c0d076a..fa6323a6de 100644 --- a/contrib/lib/src/compression/responder.rs +++ b/contrib/lib/src/compression/responder.rs @@ -41,6 +41,7 @@ impl<'r, 'o: 'r, R: Responder<'r, 'o>> Responder<'r, 'o> for Compress { .merge(self.0.respond_to(request)?) .finalize(); + println!("YOU SUCK"); CompressionUtils::compress_response(request, &mut response, &[]); Ok(response) } diff --git a/contrib/lib/src/lib.rs b/contrib/lib/src/lib.rs index 72271f1c5d..3e60a88c86 100644 --- a/contrib/lib/src/lib.rs +++ b/contrib/lib/src/lib.rs @@ -1,7 +1,6 @@ #![doc(html_root_url = "https://api.rocket.rs/v0.5")] #![doc(html_favicon_url = "https://rocket.rs/images/favicon.ico")] #![doc(html_logo_url = "https://rocket.rs/images/logo-boxed.png")] - #![warn(rust_2018_idioms)] #![allow(unused_extern_crates)] @@ -40,17 +39,33 @@ //! This crate is expected to grow with time, bringing in outside crates to be //! officially supported by Rocket. -#[allow(unused_imports)] #[macro_use] extern crate log; -#[allow(unused_imports)] #[macro_use] extern crate rocket; +#[allow(unused_imports)] +#[macro_use] +extern crate log; +#[allow(unused_imports)] +#[macro_use] +extern crate rocket; +#[macro_use] +extern crate lazy_static; -#[cfg(feature="json")] #[macro_use] pub mod json; -#[cfg(feature="serve")] pub mod serve; -#[cfg(feature="msgpack")] pub mod msgpack; -#[cfg(feature="templates")] pub mod templates; -#[cfg(feature="uuid")] pub mod uuid; -#[cfg(feature="databases")] pub mod databases; -#[cfg(feature = "helmet")] pub mod helmet; -// TODO.async: Migrate compression, reenable this, tests, and add to docs. -//#[cfg(any(feature="brotli_compression", feature="gzip_compression"))] pub mod compression; +#[cfg(feature = "json")] +#[macro_use] +pub mod json; +#[cfg(any(feature = "brotli_compression", feature = "gzip_compression"))] +pub mod compression; +#[cfg(feature = "databases")] +pub mod databases; +#[cfg(feature = "helmet")] +pub mod helmet; +#[cfg(feature = "msgpack")] +pub mod msgpack; +#[cfg(feature = "serve")] +pub mod serve; +#[cfg(feature = "templates")] +pub mod templates; +#[cfg(feature = "uuid")] +pub mod uuid; -#[cfg(feature="databases")] #[doc(hidden)] pub use rocket_contrib_codegen::*; +#[cfg(feature = "databases")] +#[doc(hidden)] +pub use rocket_contrib_codegen::*; diff --git a/core/lib/src/response/response.rs b/core/lib/src/response/response.rs index d29066f243..56437d7328 100644 --- a/core/lib/src/response/response.rs +++ b/core/lib/src/response/response.rs @@ -1,11 +1,11 @@ -use std::{io, fmt, str}; use std::borrow::Cow; use std::pin::Pin; +use std::{fmt, io, str}; use tokio::io::{AsyncRead, AsyncReadExt, AsyncSeek, AsyncSeekExt}; +use crate::http::{ContentType, Cookie, Header, HeaderMap, Status}; use crate::response::{self, Responder}; -use crate::http::{Header, HeaderMap, Status, ContentType, Cookie}; /// The default size, in bytes, of a chunk for streamed responses. pub const DEFAULT_CHUNK_SIZE: usize = 4096; @@ -15,7 +15,7 @@ pub enum Body { /// A fixed-size body. Sized(A, Option), /// A streamed/chunked body, akin to `Transfer-Encoding: chunked`. - Chunked(B, usize) + Chunked(B, usize), } impl Body { @@ -23,7 +23,7 @@ impl Body { pub fn as_mut(&mut self) -> Body<&mut A, &mut B> { match *self { Body::Sized(ref mut a, n) => Body::Sized(a, n), - Body::Chunked(ref mut b, n) => Body::Chunked(b, n) + Body::Chunked(ref mut b, n) => Body::Chunked(b, n), } } @@ -34,7 +34,7 @@ impl Body { pub fn map U, F2: FnOnce(B) -> U>(self, f1: F1, f2: F2) -> Body { match self { Body::Sized(a, n) => Body::Sized(f1(a), n), - Body::Chunked(b, n) => Body::Chunked(f2(b), n) + Body::Chunked(b, n) => Body::Chunked(f2(b), n), } } @@ -59,19 +59,20 @@ impl Body { /// Consumes `self` and returns the inner body. pub fn into_inner(self) -> T { match self { - Body::Sized(b, _) | Body::Chunked(b, _) => b + Body::Sized(b, _) | Body::Chunked(b, _) => b, } } } impl Body - where A: AsyncRead + AsyncSeek + Send + Unpin, - B: AsyncRead + Send + Unpin +where + A: AsyncRead + AsyncSeek + Send + Unpin, + B: AsyncRead + Send + Unpin, { pub fn known_size(&self) -> Option { match self { Body::Sized(_, Some(known)) => Some(*known), - _ => None + _ => None, } } @@ -82,12 +83,15 @@ impl Body if let Body::Sized(body, size) = self { match *size { Some(size) => Some(size), - None => async { - let pos = body.seek(io::SeekFrom::Current(0)).await.ok()?; - let end = body.seek(io::SeekFrom::End(0)).await.ok()?; - body.seek(io::SeekFrom::Start(pos)).await.ok()?; - Some(end as usize - pos as usize) - }.await + None => { + async { + let pos = body.seek(io::SeekFrom::Current(0)).await.ok()?; + let end = body.seek(io::SeekFrom::End(0)).await.ok()?; + body.seek(io::SeekFrom::Start(pos)).await.ok()?; + Some(end as usize - pos as usize) + } + .await + } } } else { None @@ -98,7 +102,9 @@ impl Body /// Send + Unpin)`. pub fn as_reader(&mut self) -> &mut (dyn AsyncRead + Send + Unpin) { type Reader<'a> = &'a mut (dyn AsyncRead + Send + Unpin); - self.as_mut().map(|a| a as Reader<'_>, |b| b as Reader<'_>).into_inner() + self.as_mut() + .map(|a| a as Reader<'_>, |b| b as Reader<'_>) + .into_inner() } /// Attempts to read `self` into a `Vec` and returns it. If reading fails, @@ -116,13 +122,15 @@ impl Body /// Attempts to read `self` into a `String` and returns it. If reading or /// conversion fails, returns `None`. pub async fn into_string(self) -> Option { - self.into_bytes().await.and_then(|bytes| match String::from_utf8(bytes) { - Ok(string) => Some(string), - Err(e) => { - error_!("Body is invalid UTF-8: {}", e); - None - } - }) + self.into_bytes() + .await + .and_then(|bytes| match String::from_utf8(bytes) { + Ok(string) => Some(string), + Err(e) => { + error_!("Body is invalid UTF-8: {}", e); + None + } + }) } } @@ -212,9 +220,7 @@ impl<'r> ResponseBuilder<'r> { /// ``` #[inline(always)] pub fn new(base: Response<'r>) -> ResponseBuilder<'r> { - ResponseBuilder { - response: base, - } + ResponseBuilder { response: base } } /// Sets the status of the `Response` being built to `status`. @@ -280,7 +286,8 @@ impl<'r> ResponseBuilder<'r> { /// ``` #[inline(always)] pub fn header<'h: 'r, H>(&mut self, header: H) -> &mut ResponseBuilder<'r> - where H: Into> + where + H: Into>, { self.response.set_header(header); self @@ -311,7 +318,8 @@ impl<'r> ResponseBuilder<'r> { /// ``` #[inline(always)] pub fn header_adjoin<'h: 'r, H>(&mut self, header: H) -> &mut ResponseBuilder<'r> - where H: Into> + where + H: Into>, { self.response.adjoin_header(header); self @@ -336,7 +344,11 @@ impl<'r> ResponseBuilder<'r> { /// ``` #[inline(always)] pub fn raw_header<'a, 'b, N, V>(&mut self, name: N, value: V) -> &mut ResponseBuilder<'r> - where N: Into>, V: Into>, 'a: 'r, 'b: 'r + where + N: Into>, + V: Into>, + 'a: 'r, + 'b: 'r, { self.response.set_raw_header(name, value); self @@ -362,7 +374,11 @@ impl<'r> ResponseBuilder<'r> { /// ``` #[inline(always)] pub fn raw_header_adjoin<'a, 'b, N, V>(&mut self, name: N, value: V) -> &mut ResponseBuilder<'r> - where N: Into>, V: Into>, 'a: 'r, 'b: 'r + where + N: Into>, + V: Into>, + 'a: 'r, + 'b: 'r, { self.response.adjoin_raw_header(name, value); self @@ -385,8 +401,9 @@ impl<'r> ResponseBuilder<'r> { /// .finalize(); /// ``` pub fn sized_body(&mut self, size: S, body: B) -> &mut ResponseBuilder<'r> - where B: AsyncRead + AsyncSeek + Send + Unpin + 'r, - S: Into> + where + B: AsyncRead + AsyncSeek + Send + Unpin + 'r, + S: Into>, { self.response.set_sized_body(size, body); self @@ -406,7 +423,8 @@ impl<'r> ResponseBuilder<'r> { /// ``` #[inline(always)] pub fn streamed_body(&mut self, body: B) -> &mut ResponseBuilder<'r> - where B: AsyncRead + Send + 'r + where + B: AsyncRead + Send + 'r, { self.response.set_streamed_body(body); self @@ -431,7 +449,8 @@ impl<'r> ResponseBuilder<'r> { /// ``` #[inline(always)] pub fn chunked_body(&mut self, body: B, chunk_size: usize) -> &mut ResponseBuilder<'r> - where B: AsyncRead + Send + 'r + where + B: AsyncRead + Send + 'r, { self.response.set_chunked_body(body, chunk_size); self @@ -455,8 +474,9 @@ impl<'r> ResponseBuilder<'r> { /// ``` #[inline(always)] pub fn raw_body(&mut self, body: Body) -> &mut ResponseBuilder<'r> - where S: AsyncRead + AsyncSeek + Send + Unpin + 'r, - C: AsyncRead + Send + Unpin + 'r + where + S: AsyncRead + AsyncSeek + Send + Unpin + 'r, + C: AsyncRead + Send + Unpin + 'r, { self.response.set_raw_body(body); self @@ -584,13 +604,11 @@ impl<'r> ResponseBuilder<'r> { } } -pub trait AsyncReadSeek: AsyncRead + AsyncSeek { } -impl AsyncReadSeek for T { } +pub trait AsyncReadSeek: AsyncRead + AsyncSeek {} +impl AsyncReadSeek for T {} -pub type ResponseBody<'r> = Body< - Pin>, - Pin> ->; +pub type ResponseBody<'r> = + Body>, Pin>>; /// A response, as returned by types implementing [`Responder`]. #[derive(Default)] @@ -709,7 +727,9 @@ impl<'r> Response<'r> { /// ``` #[inline(always)] pub fn content_type(&self) -> Option { - self.headers().get_one("Content-Type").and_then(|v| v.parse().ok()) + self.headers() + .get_one("Content-Type") + .and_then(|v| v.parse().ok()) } /// Sets the status of `self` to a custom `status` with status code `code` @@ -827,7 +847,9 @@ impl<'r> Response<'r> { /// ``` #[inline(always)] pub fn set_raw_header<'a: 'r, 'b: 'r, N, V>(&mut self, name: N, value: V) -> bool - where N: Into>, V: Into> + where + N: Into>, + V: Into>, { self.set_header(Header::new(name, value)) } @@ -884,7 +906,9 @@ impl<'r> Response<'r> { /// ``` #[inline(always)] pub fn adjoin_raw_header<'a: 'r, 'b: 'r, N, V>(&mut self, name: N, value: V) - where N: Into>, V: Into> + where + N: Into>, + V: Into>, { self.adjoin_header(Header::new(name, value)); } @@ -1051,7 +1075,7 @@ impl<'r> Response<'r> { if let Some(body) = self.take_body() { self.body = match body { Body::Sized(_, n) => Some(Body::Sized(Box::pin(io::Cursor::new(&[])), n)), - Body::Chunked(..) => None + Body::Chunked(..) => None, }; } } @@ -1076,8 +1100,9 @@ impl<'r> Response<'r> { /// # }) /// ``` pub fn set_sized_body(&mut self, size: S, body: B) - where B: AsyncRead + AsyncSeek + Send + Unpin + 'r, - S: Into> + where + B: AsyncRead + AsyncSeek + Send + Unpin + 'r, + S: Into>, { self.body = Some(Body::Sized(Box::pin(body), size.into())); } @@ -1094,13 +1119,16 @@ impl<'r> Response<'r> { /// use rocket::Response; /// /// # rocket::async_test(async { - /// let mut response = Response::new(); - /// response.set_streamed_body(repeat(97).take(5)); - /// assert_eq!(response.body_string().await.unwrap(), "aaaaa"); + /// let mut response = Response::new(); + /// response.set_streamed_body(repeat(97).take(5)); + /// assert_eq!(response.body_string().await.unwrap(), "aaaaa"); /// # }) /// ``` #[inline(always)] - pub fn set_streamed_body(&mut self, body: B) where B: AsyncRead + Send + 'r { + pub fn set_streamed_body(&mut self, body: B) + where + B: AsyncRead + Send + 'r, + { self.set_chunked_body(body, DEFAULT_CHUNK_SIZE); } @@ -1121,7 +1149,8 @@ impl<'r> Response<'r> { /// ``` #[inline(always)] pub fn set_chunked_body(&mut self, body: B, chunk_size: usize) - where B: AsyncRead + Send + 'r + where + B: AsyncRead + Send + 'r, { self.body = Some(Body::Chunked(Box::pin(body), chunk_size)); } @@ -1148,8 +1177,9 @@ impl<'r> Response<'r> { /// ``` #[inline(always)] pub fn set_raw_body(&mut self, body: Body) - where S: AsyncRead + AsyncSeek + Send + Unpin + 'r, - C: AsyncRead + Send + Unpin + 'r + where + S: AsyncRead + AsyncSeek + Send + Unpin + 'r, + C: AsyncRead + Send + Unpin + 'r, { self.body = Some(match body { Body::Sized(a, n) => Body::Sized(Box::pin(a), n), @@ -1258,7 +1288,7 @@ impl fmt::Debug for Response<'_> { match self.body { Some(ref body) => body.fmt(f), - None => writeln!(f, "Empty Body") + None => writeln!(f, "Empty Body"), } } }