Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Implement automatic decompression of responses #125

Merged
merged 18 commits into from
Feb 9, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
18 commits
Select commit Hold shift + click to select a range
3574c39
Add a test case for automatically decompressing response data.
acw Feb 8, 2022
9a07aec
Wire up the auto-decompression flag into the request extensions.
acw Feb 8, 2022
2bb45ff
[WIP] A probably-wrong variant of `Chunk` for decompressing data on t…
acw Feb 8, 2022
dac9806
Clean up the decompression polling logic, using BytesMut instead of Vec.
acw Feb 8, 2022
ec487ed
Wire everything up, and get the basic tests working.
acw Feb 8, 2022
88146fb
Put the whole `CompressedHttpBody` back into the queue, not just the …
acw Feb 8, 2022
7aa1e6d
Switch to a more open-ended `ViceroyRequestMetadata` for holding exte…
acw Feb 8, 2022
767e04d
Add tests for the other two `send` variants.
acw Feb 8, 2022
ad9b061
Make the error case test more closely match the Compute@Edge implemen…
acw Feb 8, 2022
e92afab
Make sure decompression happens correctly when sending out of WASM.
acw Feb 8, 2022
936689b
Formatting oops.
acw Feb 8, 2022
6298ffe
Address a Clippy concern with field initialization and default().
acw Feb 8, 2022
6ec48e7
Provisionally update the lockfiles.
acw Feb 8, 2022
b76a120
Revert "Provisionally update the lockfiles."
acw Feb 8, 2022
3ae231b
Try a simpler update to just the trap-test lockfile.
acw Feb 9, 2022
151574d
Use a string in the test fixture, rather than reading a text file.
acw Feb 9, 2022
cfd9684
Handle "x-gzip" content encodings, as well as "gzip".
acw Feb 9, 2022
80597bc
Clean up the comments around `auto_decompress_response_set, add a fut…
acw Feb 9, 2022
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 13 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

13 changes: 13 additions & 0 deletions cli/tests/trap-test/Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

31 changes: 31 additions & 0 deletions cli/tests/upstream.rs
Original file line number Diff line number Diff line change
Expand Up @@ -151,3 +151,34 @@ async fn override_host_works() -> TestResult {

Ok(())
}

/// Test that we can transparently gunzip responses when required.
#[tokio::test(flavor = "multi_thread")]
async fn transparent_gunzip() -> TestResult {
let resp = Test::using_fixture("gzipped-response.wasm")
.backend("echo", "http://127.0.0.1:9000/", None)
.host(9000, |mut req| {
let mut response_builder = Response::builder();

for (key, value) in req.headers_mut().drain() {
if let Some(real_key) = key {
response_builder = response_builder.header(real_key, value);
}
}

response_builder
.status(StatusCode::OK)
.body(req.into_body())
.unwrap()
})
.against_empty()
.await;

assert_eq!(resp.status(), StatusCode::OK);
assert_eq!(
"hello, world!\n",
resp.into_body().read_into_string().await.unwrap()
);

Ok(())
}
1 change: 1 addition & 0 deletions lib/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ bytesize = "1.0.0"
cfg-if = "1.0"
cranelift-entity = "0.77.0"
fastly-shared = "0.3.2"
flate2 = "1.0.22"
futures = "0.3.5"
http = "0.2.1"
http-body = "0.4.0"
Expand Down
59 changes: 54 additions & 5 deletions lib/src/body.rs
Original file line number Diff line number Diff line change
@@ -1,17 +1,23 @@
//! Body type, for request and response bodies.

use {
crate::error,
bytes::{BufMut, BytesMut},
flate2::write::GzDecoder,
futures::pin_mut,
http::header::HeaderMap,
http_body::{Body as HttpBody, SizeHint},
std::{
collections::VecDeque,
io::Write,
pin::Pin,
task::{Context, Poll},
},
tokio::sync::mpsc,
};

type DecoderState = Box<GzDecoder<bytes::buf::Writer<BytesMut>>>;

/// A chunk of bytes in a [`Body`].
///
/// A chunk represents a block of data in a body. Representing bodies as chunks allows us to append
Expand All @@ -31,6 +37,15 @@ pub enum Chunk {
/// one individual chunk. That stream is effectively "flattened" on-demand, as the `Body`
/// containing it is read.
Channel(mpsc::Receiver<Chunk>),
/// A version of `HttpBody` that assumes that the interior data is gzip-compressed.
CompressedHttpBody(DecoderState, hyper::Body),
}

impl Chunk {
pub fn compressed_body(body: hyper::Body) -> Chunk {
let initial_state = Box::new(GzDecoder::new(BytesMut::new().writer()));
Chunk::CompressedHttpBody(initial_state, body)
}
}

impl From<&[u8]> for Chunk {
Expand Down Expand Up @@ -98,7 +113,7 @@ impl Body {
}

/// Read the entire body into a byte vector.
pub async fn read_into_vec(self) -> Result<Vec<u8>, hyper::Error> {
pub async fn read_into_vec(self) -> Result<Vec<u8>, error::Error> {
let mut body = Box::new(self);
let mut bytes = Vec::new();

Expand All @@ -113,7 +128,7 @@ impl Body {
/// # Panics
///
/// Panics if the body is not valid UTF-8.
pub async fn read_into_string(self) -> Result<String, hyper::Error> {
pub async fn read_into_string(self) -> Result<String, error::Error> {
Ok(String::from_utf8(self.read_into_vec().await?).expect("Body was not UTF-8"))
}
}
Expand Down Expand Up @@ -142,13 +157,13 @@ impl IntoIterator for Body {

impl HttpBody for Body {
type Data = bytes::Bytes;
type Error = hyper::Error;
type Error = error::Error;

fn poll_data(
mut self: Pin<&mut Self>,
cx: &mut Context,
) -> Poll<Option<Result<Self::Data, Self::Error>>> {
while let Some(chunk) = self.chunks.pop_front() {
while let Some(mut chunk) = self.chunks.pop_front() {
match chunk {
Chunk::HttpBody(mut body) => {
let body_mut = &mut body;
Expand Down Expand Up @@ -199,6 +214,39 @@ impl HttpBody for Body {
}
}
}
Chunk::CompressedHttpBody(ref mut decoder_state, ref mut body) => {
pin_mut!(body);

match body.poll_data(cx) {
Poll::Pending => {
// put the body back, so we can poll it again next time
self.chunks.push_front(chunk);
return Poll::Pending;
}
Poll::Ready(None) => match decoder_state.try_finish() {
Err(e) => return Poll::Ready(Some(Err(e.into()))),
Ok(()) => {
let chunk = decoder_state.get_mut().get_mut().split().freeze();
return Poll::Ready(Some(Ok(chunk)));
}
},
Poll::Ready(Some(Err(e))) => return Poll::Ready(Some(Err(e.into()))),
Poll::Ready(Some(Ok(bytes))) => {
match decoder_state.write_all(&bytes) {
Err(e) => return Poll::Ready(Some(Err(e.into()))),
Ok(()) => {
decoder_state.flush().unwrap();
let resulting_bytes =
decoder_state.get_mut().get_mut().split().freeze();
// put the body back, so we can poll it again next time
self.chunks.push_front(chunk);

return Poll::Ready(Some(Ok(resulting_bytes)));
}
}
}
}
}
}
}

Expand All @@ -218,8 +266,9 @@ impl HttpBody for Body {
let mut size = 0;
for chunk in self.chunks.iter() {
match chunk {
// If this is a streaming body, immediately give up on the hint.
// If this is a streaming body or a compressed chunk, immediately give up on the hint.
Chunk::Channel(_) => return SizeHint::default(),
Chunk::CompressedHttpBody(_, _) => return SizeHint::default(),
Chunk::HttpBody(body) => {
// An `HttpBody` size hint will either be exact, or wide open. If the latter,
// bail out with a wide-open range.
Expand Down
18 changes: 16 additions & 2 deletions lib/src/session.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,8 +13,8 @@ use {
streaming_body::StreamingBody,
upstream::{PendingRequest, SelectTarget},
wiggle_abi::types::{
BodyHandle, DictionaryHandle, EndpointHandle, PendingRequestHandle, RequestHandle,
ResponseHandle,
BodyHandle, ContentEncodings, DictionaryHandle, EndpointHandle, PendingRequestHandle,
RequestHandle, ResponseHandle,
},
},
cranelift_entity::{entity_impl, PrimaryMap},
Expand Down Expand Up @@ -658,6 +658,20 @@ impl Session {
}
}

/// Additional Viceroy-specific metadata for requests.
#[derive(Clone, Debug)]
pub struct ViceroyRequestMetadata {
pub auto_decompress_encodings: ContentEncodings,
}

impl Default for ViceroyRequestMetadata {
fn default() -> Self {
ViceroyRequestMetadata {
auto_decompress_encodings: ContentEncodings::empty(),
}
}
}

#[derive(Clone, Copy, Eq, Hash, PartialEq)]
#[repr(transparent)]
pub struct AsyncItemHandle(u32);
Expand Down
50 changes: 44 additions & 6 deletions lib/src/upstream.rs
Original file line number Diff line number Diff line change
@@ -1,10 +1,14 @@
use crate::{
body::Body, config::Backend, error::Error, headers::filter_outgoing_headers,
wiggle_abi::types::PendingRequestHandle,
body::{Body, Chunk},
config::Backend,
error::Error,
headers::filter_outgoing_headers,
session::ViceroyRequestMetadata,
wiggle_abi::types::{ContentEncodings, PendingRequestHandle},
};
use futures::Future;
use http::{uri, HeaderValue};
use hyper::{client::HttpConnector, Client, HeaderMap, Request, Response, Uri};
use hyper::{client::HttpConnector, header, Client, HeaderMap, Request, Response, Uri};
use std::{
io,
pin::Pin,
Expand All @@ -20,6 +24,11 @@ use tokio::{
use tokio_rustls::{client::TlsStream, TlsConnector};
use webpki::DNSNameRef;

static GZIP_VALUES: [HeaderValue; 2] = [
HeaderValue::from_static("gzip"),
HeaderValue::from_static("x-gzip"),
];

/// A custom Hyper client connector, which is needed to override Hyper's default behavior of
/// connecting to host specified by the request's URI; we instead want to connect to the host
/// specified by our backend configuration, regardless of what the URI says.
Expand Down Expand Up @@ -164,21 +173,50 @@ pub fn send_request(
backend,
);

let try_decompression = req
.extensions()
.get::<ViceroyRequestMetadata>()
.map(|vrm| {
vrm.auto_decompress_encodings
.contains(ContentEncodings::GZIP)
})
.unwrap_or(false);

filter_outgoing_headers(req.headers_mut());
req.headers_mut().insert(hyper::header::HOST, host);
*req.uri_mut() = uri;

async move {
Ok(Client::builder()
let basic_response = Client::builder()
.set_host(false)
.build(connector)
.request(req)
.await
.map_err(|e| {
eprintln!("Error: {:?}", e);
e
})?
.map(Body::from))
})?;

if try_decompression
&& basic_response
.headers()
.get(header::CONTENT_ENCODING)
.map(|x| GZIP_VALUES.contains(x))
.unwrap_or(false)
{
let mut decompressing_response =
basic_response.map(Chunk::compressed_body).map(Body::from);

decompressing_response
.headers_mut()
.remove(header::CONTENT_ENCODING);
decompressing_response
.headers_mut()
.remove(header::CONTENT_LENGTH);
Ok(decompressing_response)
} else {
Ok(basic_response.map(Body::from))
}
}
}

Expand Down
28 changes: 22 additions & 6 deletions lib/src/wiggle_abi/req_impl.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
use {
crate::{
error::Error,
session::Session,
session::{Session, ViceroyRequestMetadata},
upstream::{self, PendingRequest},
wiggle_abi::{
fastly_http_req::FastlyHttpReq,
Expand Down Expand Up @@ -470,13 +470,29 @@ impl FastlyHttpReq for Session {

fn auto_decompress_response_set(
&mut self,
_h: RequestHandle,
req_handle: RequestHandle,
encodings: ContentEncodings,
) -> Result<(), Error> {
if u32::from(encodings) == 1 {
unimplemented!("calling auto_decompress_response_set with GZIP has not yet been implemented in Viceroy");
} else {
Ok(())
// NOTE: We're going to hide this flag in the extensions of the request in order to decrease
// the book-keeping burden inside Session. The flag will get picked up later, in `send_request`.
let extensions = &mut self.request_parts_mut(req_handle)?.extensions;

match extensions.get_mut::<ViceroyRequestMetadata>() {
None => {
extensions.insert(ViceroyRequestMetadata {
auto_decompress_encodings: encodings,
// future note: at time of writing, this is the only field of
// this structure, but there is an intention to add more fields.
// When we do, and if/when an error appears, what you're looking
// for is:
// ..Default::default()
});
}
Some(vrm) => {
vrm.auto_decompress_encodings = encodings;
}
}

Ok(())
}
}
Binary file added test-fixtures/data/hello_world.gz
Binary file not shown.
Loading