Skip to content

Commit

Permalink
Add a non-regression test for buffered compression with @defer
Browse files Browse the repository at this point in the history
  • Loading branch information
SimonSapin committed Sep 12, 2022
1 parent 83a683c commit 8a44b79
Show file tree
Hide file tree
Showing 5 changed files with 490 additions and 5 deletions.
5 changes: 3 additions & 2 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions apollo-router/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -182,6 +182,7 @@ uname = "0.1.1"
insta = { version = "1.19.1", features = [ "json", "redactions" ] }
jsonpath_lib = "0.3.0"
maplit = "1.0.2"
memchr = { version = "2.5.0", default-features = false }
mockall = "0.11.2"
once_cell = "1.14.0"
reqwest = { version = "0.11.11", default-features = false, features = [
Expand Down
176 changes: 175 additions & 1 deletion apollo-router/src/axum_http_server_factory.rs
Original file line number Diff line number Diff line change
Expand Up @@ -865,6 +865,8 @@ impl<B> MakeSpan<B> for PropagatingMakeSpan {
mod tests {
use std::net::SocketAddr;
use std::str::FromStr;
use std::sync::atomic::AtomicU32;
use std::sync::atomic::Ordering;

use async_compression::tokio::write::GzipEncoder;
use http::header::ACCEPT_ENCODING;
Expand All @@ -884,6 +886,7 @@ mod tests {
use reqwest::StatusCode;
use serde_json::json;
use test_log::test;
use tokio::io::BufReader;
use tower::service_fn;

use super::*;
Expand All @@ -892,6 +895,9 @@ mod tests {
use crate::services::new_service::NewService;
use crate::services::transport;
use crate::services::MULTIPART_DEFER_CONTENT_TYPE;
use crate::test_harness::http_client;
use crate::test_harness::http_client::MaybeMultipart;
use crate::TestHarness;

macro_rules! assert_header {
($response:expr, $header:expr, $expected:expr $(, $msg:expr)?) => {
Expand Down Expand Up @@ -1893,7 +1899,6 @@ mod tests {
#[cfg(unix)]
async fn send_to_unix_socket(addr: &ListenAddr, method: Method, body: &str) -> Vec<u8> {
use tokio::io::AsyncBufReadExt;
use tokio::io::BufReader;
use tokio::io::Interest;
use tokio::net::UnixStream;

Expand Down Expand Up @@ -2495,4 +2500,173 @@ Content-Type: application/json\r
assert!(value == "one" || value == "two");
}
}

/// A counter of how many GraphQL responses have been sent by an Apollo Router
///
/// When `@defer` is used, it should increment multiple times for a single HTTP request.
#[derive(Clone, Default)]
struct GraphQLResponseCounter(Arc<AtomicU32>);

impl GraphQLResponseCounter {
fn increment(&self) {
self.0.fetch_add(1, Ordering::SeqCst);
}

fn get(&self) -> u32 {
self.0.load(Ordering::SeqCst)
}
}

async fn http_service() -> impl Service<
http::Request<serde_json::Value>,
Response = http::Response<MaybeMultipart<serde_json::Value>>,
Error = BoxError,
> {
let counter = GraphQLResponseCounter::default();
let service = TestHarness::builder()
.configuration_json(json!({
"plugins": {
"experimental.include_subgraph_errors": {
"all": true
}
}
}))
.unwrap()
.supergraph_hook(move |service| {
let counter = counter.clone();
service
.map_response(move |mut response| {
response.response.extensions_mut().insert(counter.clone());
response.map_stream(move |graphql_response| {
counter.increment();
graphql_response
})
})
.boxed()
})
.build_http_service()
.await
.unwrap()
.map_err(Into::into);
let service = http_client::response_decompression(service);
let service = http_client::defer_spec_20220824_multipart(service);
let service = http_client::json(service);
service
}

/// Creates an Apollo Router as an HTTP-level Tower service and makes one request.
async fn make_request(
request_body: serde_json::Value,
) -> http::Response<MaybeMultipart<serde_json::Value>> {
let request = http::Request::builder()
.method(http::Method::POST)
.header("host", "127.0.0.1")
.body(request_body)
.unwrap();
http_service().await.oneshot(request).await.unwrap()
}

fn assert_compressed<B>(response: &http::Response<B>, expected: bool) {
assert_eq!(
response
.extensions()
.get::<http_client::ResponseBodyWasCompressed>()
.unwrap()
.0,
expected
)
}

#[tokio::test]
async fn test_compressed_response() {
let response = make_request(json!({
"query": "
query TopProducts($first: Int) {
topProducts(first: $first) {
upc
name
reviews {
id
product { name }
author { id name }
}
}
}
",
"variables": {"first": 2_u32},
}))
.await;
assert_compressed(&response, true);
let status = response.status().as_u16();
let graphql_response = response.into_body().expect_not_multipart();
assert_eq!(graphql_response["errors"], json!(null));
assert_eq!(status, 200);
}

#[tokio::test]
async fn test_defer_is_not_buffered() {
let mut response = make_request(json!({
"query": "
query TopProducts($first: Int) {
topProducts(first: $first) {
upc
name
reviews {
id
product { name }
... @defer { author { id name } }
}
}
}
",
"variables": {"first": 2_u32},
}))
.await;
assert_compressed(&response, true);
let status = response.status().as_u16();
assert_eq!(status, 200);
let counter: GraphQLResponseCounter = response.extensions_mut().remove().unwrap();
let parts = response.into_body().expect_multipart();

let (parts, counts): (Vec<_>, Vec<_>) =
parts.map(|part| (part, counter.get())).unzip().await;
let parts = serde_json::Value::Array(parts);
assert_eq!(
parts,
json!([
{
"data": {
"topProducts": [
{"upc": "1", "name": "Table", "reviews": null},
{"upc": "2", "name": "Couch", "reviews": null}
]
},
"errors": [
{
"message": "invalid content: Missing key `_entities`!",
"path": ["topProducts", "@"],
"extensions": {
"type": "ExecutionInvalidContent",
"reason": "Missing key `_entities`!"
}
}],
"hasNext": true,
},
{"hasNext": false}
]),
"{}",
serde_json::to_string(&parts).unwrap()
);

// Non-regression test for https://github.com/apollographql/router/issues/1572
//
// With unpatched async-compression 0.3.14 as used by tower-http 0.3.4,
// `counts` is `[2, 2]` since both parts have to be generated on the server side
// before the first one reaches the client.
//
// Conversly, observing the value `1` after receiving the first part
// means the didn’t wait for all parts to be in the compression buffer
// before sending any.
assert_eq!(counts, [1, 2]);
}
}
34 changes: 32 additions & 2 deletions apollo-router/src/test_harness.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,8 +13,12 @@ use crate::router_factory::YamlSupergraphServiceFactory;
use crate::services::execution;
use crate::services::subgraph;
use crate::services::supergraph;
use crate::services::RouterCreator;
use crate::Schema;

#[cfg(test)]
pub(crate) mod http_client;

/// Builder for the part of an Apollo Router that handles GraphQL requests, as a [`tower::Service`].
///
/// This allows tests, benchmarks, etc
Expand Down Expand Up @@ -165,7 +169,7 @@ impl<'a> TestHarness<'a> {
}

/// Builds the GraphQL service
pub async fn build(self) -> Result<supergraph::BoxCloneService, BoxError> {
async fn build_common(self) -> Result<(Arc<Configuration>, RouterCreator), BoxError> {
let builder = if self.schema.is_none() {
self.subgraph_hook(|subgraph_name, default| match subgraph_name {
"products" => canned::products_subgraph().boxed(),
Expand Down Expand Up @@ -195,16 +199,42 @@ impl<'a> TestHarness<'a> {
let schema = builder.schema.unwrap_or(canned_schema);
let schema = Arc::new(Schema::parse(schema, &config)?);
let router_creator = YamlSupergraphServiceFactory
.create(config, schema, None, Some(builder.extra_plugins))
.create(config.clone(), schema, None, Some(builder.extra_plugins))
.await?;
Ok((config, router_creator))
}

pub async fn build(self) -> Result<supergraph::BoxCloneService, BoxError> {
let (_config, router_creator) = self.build_common().await?;
Ok(tower::service_fn(move |request| {
let service = router_creator.make();
async move { service.oneshot(request).await }
})
.boxed_clone())
}

#[cfg(test)]
pub(crate) async fn build_http_service(self) -> Result<HttpService, BoxError> {
use crate::axum_http_server_factory::make_axum_router;
use crate::axum_http_server_factory::ListenAddrAndRouter;
use crate::router_factory::SupergraphServiceFactory;

let (config, router_creator) = self.build_common().await?;
let web_endpoints = router_creator.web_endpoints();
let routers = make_axum_router(router_creator, &config, web_endpoints)?;
let ListenAddrAndRouter(_listener, router) = routers.main;
Ok(router.boxed())
}
}

/// An HTTP-level service, as would be given to Hyper’s server
#[cfg(test)]
pub(crate) type HttpService = tower::util::BoxService<
http::Request<hyper::Body>,
http::Response<axum::body::BoxBody>,
std::convert::Infallible,
>;

struct SupergraphServicePlugin<F>(F);
struct ExecutionServicePlugin<F>(F);
struct SubgraphServicePlugin<F>(F);
Expand Down
Loading

0 comments on commit 8a44b79

Please sign in to comment.