Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Remove patched async-compression, disable compression for multipart responses instead #1749

Merged
merged 2 commits into from
Sep 12, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 5 additions & 3 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

8 changes: 0 additions & 8 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -32,11 +32,3 @@ members = [
# debug = false
strip = "debuginfo"
incremental = false

# Temporary patch to async-compression
# It is used by tower-http's CompressionLayer. The compression code was not handling
# the Poll::Pending result from the underlying stream, so it was accumulating the
# entire compressed response in memory before sending it, which creates issues with
# deferred responses getting received too late
[patch.crates-io]
async-compression = { git = 'https://github.com/geal/async-compression', tag = 'encoder-flush' }
21 changes: 21 additions & 0 deletions NEXT_CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -91,4 +91,25 @@ We changed `QueryPlannerResponse` to:

By [@bnjjj](https://github.com/bnjjj) in https://github.com/apollographql/router/pull/1504

### Disable compression of multipart HTTP responses ([Issue #1572](https://github.com/apollographql/router/issues/1572))

For features such a `@defer`, the Router may send a stream of multiple GraphQL responses
in a single HTTP response.
The body of the HTTP response is a single byte stream.
When HTTP compression is used, that byte stream is compressed as a whole.
Due to limitations in current versions of the `async-compression` crate,
[issue #1572](https://github.com/apollographql/router/issues/1572) was a bug where
some GraphQL responses might not be sent to the client until more of them became available.
This buffering yields better compression, but defeats the point of `@defer`.

Our previous work-around involved a patched `async-compression`,
which was not trivial to apply when using the Router as a dependency
since [Cargo patching](https://doc.rust-lang.org/cargo/reference/overriding-dependencies.html)
is done in a project’s root `Cargo.toml`.

The Router now reverts to using unpatched `async-compression`,
and instead disables compression of multipart responses.
We aim to re-enable compression soon, with a proper solution that is being designed in
<https://github.com/Nemo157/async-compression/issues/154>.

## 📚 Documentation
1 change: 1 addition & 0 deletions apollo-router/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -182,6 +182,7 @@ uname = "0.1.1"
insta = { version = "1.19.1", features = [ "json", "redactions" ] }
jsonpath_lib = "0.3.0"
maplit = "1.0.2"
memchr = { version = "2.5.0", default-features = false }
mockall = "0.11.2"
once_cell = "1.14.0"
reqwest = { version = "0.11.11", default-features = false, features = [
Expand Down
184 changes: 182 additions & 2 deletions apollo-router/src/axum_http_server_factory.rs
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,10 @@ use tokio::sync::Notify;
use tower::util::BoxService;
use tower::BoxError;
use tower::ServiceExt;
use tower_http::compression::predicate::NotForContentType;
use tower_http::compression::CompressionLayer;
use tower_http::compression::DefaultPredicate;
use tower_http::compression::Predicate;
use tower_http::trace::MakeSpan;
use tower_http::trace::TraceLayer;
use tower_service::Service;
Expand Down Expand Up @@ -189,7 +192,11 @@ where
.route(&configuration.server.health_check_path, get(health_check))
.layer(Extension(service_factory))
.layer(cors)
.layer(CompressionLayer::new()); // To compress response body
// Compress the response body, except for multipart responses such as with `@defer`.
// This is a work-around for https://github.com/apollographql/router/issues/1572
.layer(CompressionLayer::new().compress_when(
DefaultPredicate::new().and(NotForContentType::const_new("multipart/")),
));

let listener = configuration.server.listen.clone();
Ok(ListenAddrAndRouter(listener, route))
Expand Down Expand Up @@ -865,6 +872,8 @@ impl<B> MakeSpan<B> for PropagatingMakeSpan {
mod tests {
use std::net::SocketAddr;
use std::str::FromStr;
use std::sync::atomic::AtomicU32;
use std::sync::atomic::Ordering;

use async_compression::tokio::write::GzipEncoder;
use http::header::ACCEPT_ENCODING;
Expand All @@ -884,6 +893,7 @@ mod tests {
use reqwest::StatusCode;
use serde_json::json;
use test_log::test;
use tokio::io::BufReader;
use tower::service_fn;

use super::*;
Expand All @@ -892,6 +902,9 @@ mod tests {
use crate::services::new_service::NewService;
use crate::services::transport;
use crate::services::MULTIPART_DEFER_CONTENT_TYPE;
use crate::test_harness::http_client;
use crate::test_harness::http_client::MaybeMultipart;
use crate::TestHarness;

macro_rules! assert_header {
($response:expr, $header:expr, $expected:expr $(, $msg:expr)?) => {
Expand Down Expand Up @@ -1893,7 +1906,6 @@ mod tests {
#[cfg(unix)]
async fn send_to_unix_socket(addr: &ListenAddr, method: Method, body: &str) -> Vec<u8> {
use tokio::io::AsyncBufReadExt;
use tokio::io::BufReader;
use tokio::io::Interest;
use tokio::net::UnixStream;

Expand Down Expand Up @@ -2495,4 +2507,172 @@ Content-Type: application/json\r
assert!(value == "one" || value == "two");
}
}

/// A counter of how many GraphQL responses have been sent by an Apollo Router
///
/// When `@defer` is used, it should increment multiple times for a single HTTP request.
#[derive(Clone, Default)]
struct GraphQLResponseCounter(Arc<AtomicU32>);

impl GraphQLResponseCounter {
fn increment(&self) {
self.0.fetch_add(1, Ordering::SeqCst);
}

fn get(&self) -> u32 {
self.0.load(Ordering::SeqCst)
}
}

async fn http_service() -> impl Service<
http::Request<serde_json::Value>,
Response = http::Response<MaybeMultipart<serde_json::Value>>,
Error = BoxError,
> {
let counter = GraphQLResponseCounter::default();
let service = TestHarness::builder()
.configuration_json(json!({
"plugins": {
"experimental.include_subgraph_errors": {
"all": true
}
}
}))
.unwrap()
.supergraph_hook(move |service| {
let counter = counter.clone();
service
.map_response(move |mut response| {
response.response.extensions_mut().insert(counter.clone());
response.map_stream(move |graphql_response| {
counter.increment();
graphql_response
})
})
.boxed()
})
.build_http_service()
.await
.unwrap()
.map_err(Into::into);
let service = http_client::response_decompression(service);
let service = http_client::defer_spec_20220824_multipart(service);
http_client::json(service)
}

/// Creates an Apollo Router as an HTTP-level Tower service and makes one request.
async fn make_request(
request_body: serde_json::Value,
) -> http::Response<MaybeMultipart<serde_json::Value>> {
let request = http::Request::builder()
.method(http::Method::POST)
.header("host", "127.0.0.1")
.body(request_body)
.unwrap();
http_service().await.oneshot(request).await.unwrap()
}

fn assert_compressed<B>(response: &http::Response<B>, expected: bool) {
assert_eq!(
response
.extensions()
.get::<http_client::ResponseBodyWasCompressed>()
.unwrap()
.0,
expected
)
}

#[tokio::test]
async fn test_compressed_response() {
let response = make_request(json!({
"query": "
query TopProducts($first: Int) {
topProducts(first: $first) {
upc
name
reviews {
id
product { name }
author { id name }
}
}
}
",
"variables": {"first": 2_u32},
}))
.await;
assert_compressed(&response, true);
let status = response.status().as_u16();
let graphql_response = response.into_body().expect_not_multipart();
assert_eq!(graphql_response["errors"], json!(null));
assert_eq!(status, 200);
}

#[tokio::test]
async fn test_defer_is_not_buffered() {
let mut response = make_request(json!({
"query": "
query TopProducts($first: Int) {
topProducts(first: $first) {
upc
name
reviews {
id
product { name }
... @defer { author { id name } }
}
}
}
",
"variables": {"first": 2_u32},
}))
.await;
assert_compressed(&response, false);
let status = response.status().as_u16();
assert_eq!(status, 200);
let counter: GraphQLResponseCounter = response.extensions_mut().remove().unwrap();
let parts = response.into_body().expect_multipart();

let (parts, counts): (Vec<_>, Vec<_>) =
parts.map(|part| (part, counter.get())).unzip().await;
let parts = serde_json::Value::Array(parts);
assert_eq!(
parts,
json!([
{
"data": {
"topProducts": [
{"upc": "1", "name": "Table", "reviews": null},
{"upc": "2", "name": "Couch", "reviews": null}
]
},
"errors": [
{
"message": "invalid content: Missing key `_entities`!",
"path": ["topProducts", "@"],
"extensions": {
"type": "ExecutionInvalidContent",
"reason": "Missing key `_entities`!"
}
}],
"hasNext": true,
},
{"hasNext": false}
]),
"{}",
serde_json::to_string(&parts).unwrap()
);

// Non-regression test for https://github.com/apollographql/router/issues/1572
//
// With unpatched async-compression 0.3.14 as used by tower-http 0.3.4,
// `counts` is `[2, 2]` since both parts have to be generated on the server side
// before the first one reaches the client.
//
// Conversly, observing the value `1` after receiving the first part
// means the didn’t wait for all parts to be in the compression buffer
// before sending any.
assert_eq!(counts, [1, 2]);
}
}
34 changes: 32 additions & 2 deletions apollo-router/src/test_harness.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,8 +13,12 @@ use crate::router_factory::YamlSupergraphServiceFactory;
use crate::services::execution;
use crate::services::subgraph;
use crate::services::supergraph;
use crate::services::RouterCreator;
use crate::Schema;

#[cfg(test)]
pub(crate) mod http_client;

/// Builder for the part of an Apollo Router that handles GraphQL requests, as a [`tower::Service`].
///
/// This allows tests, benchmarks, etc
Expand Down Expand Up @@ -165,7 +169,7 @@ impl<'a> TestHarness<'a> {
}

/// Builds the GraphQL service
pub async fn build(self) -> Result<supergraph::BoxCloneService, BoxError> {
async fn build_common(self) -> Result<(Arc<Configuration>, RouterCreator), BoxError> {
let builder = if self.schema.is_none() {
self.subgraph_hook(|subgraph_name, default| match subgraph_name {
"products" => canned::products_subgraph().boxed(),
Expand Down Expand Up @@ -195,16 +199,42 @@ impl<'a> TestHarness<'a> {
let schema = builder.schema.unwrap_or(canned_schema);
let schema = Arc::new(Schema::parse(schema, &config)?);
let router_creator = YamlSupergraphServiceFactory
.create(config, schema, None, Some(builder.extra_plugins))
.create(config.clone(), schema, None, Some(builder.extra_plugins))
.await?;
Ok((config, router_creator))
}

pub async fn build(self) -> Result<supergraph::BoxCloneService, BoxError> {
let (_config, router_creator) = self.build_common().await?;
Ok(tower::service_fn(move |request| {
let service = router_creator.make();
async move { service.oneshot(request).await }
})
.boxed_clone())
}

#[cfg(test)]
pub(crate) async fn build_http_service(self) -> Result<HttpService, BoxError> {
use crate::axum_http_server_factory::make_axum_router;
use crate::axum_http_server_factory::ListenAddrAndRouter;
use crate::router_factory::SupergraphServiceFactory;

let (config, router_creator) = self.build_common().await?;
let web_endpoints = router_creator.web_endpoints();
let routers = make_axum_router(router_creator, &config, web_endpoints)?;
let ListenAddrAndRouter(_listener, router) = routers.main;
Ok(router.boxed())
}
}

/// An HTTP-level service, as would be given to Hyper’s server
#[cfg(test)]
pub(crate) type HttpService = tower::util::BoxService<
http::Request<hyper::Body>,
http::Response<axum::body::BoxBody>,
std::convert::Infallible,
>;

struct SupergraphServicePlugin<F>(F);
struct ExecutionServicePlugin<F>(F);
struct SubgraphServicePlugin<F>(F);
Expand Down
Loading