Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add map_first_graphql_response, unify execution & supergraph response type #1708

Merged
merged 6 commits into from
Sep 7, 2022
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
39 changes: 39 additions & 0 deletions NEXT_CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,46 @@ This project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.htm
# [x.x.x] (unreleased) - 2022-mm-dd
> Important: X breaking changes below, indicated by **❗ BREAKING ❗**
## ❗ BREAKING ❗

### Unified supergraph and execution response types

`apollo_router::services::supergraph::Response` and
`apollo_router::services::execution::Response` were two structs with identical fields
and almost-identical methods.
The main difference was that builders were fallible for the former but not the latter.

They are now the same type (with one location a `type` alias of the other), with fallible builders.
Callers may need to add either a operator `?` (in plugins) or an `.unwrap()` call (in tests).

```diff
let response = execution::Response::builder()
.error(error)
.status_code(StatusCode::BAD_REQUEST)
.context(req.context)
- .build();
+ .build()?;
```

By [@SimonSapin](https://github.com/SimonSapin)

## 🚀 Features

### New plugin helper: `map_first_graphql_response`

In supergraph and execution services, the service response contains
not just one GraphQL response but a stream of them,
in order to support features such as `@defer`.

This new method of `ServiceExt` and `ServiceBuilderExt` in `apollo_router::layers`
wraps a service and call `callback` when the first GraphQL response
garypen marked this conversation as resolved.
Show resolved Hide resolved
in the stream returned by the inner service becomes available.
The callback can then modify the HTTP parts (headers, status code, etc)
garypen marked this conversation as resolved.
Show resolved Hide resolved
or the first GraphQL response before returning them.

See the doc-comments in `apollo-router/src/layers/mod.rs` for more.

By [@SimonSapin](https://github.com/SimonSapin)

## 🐛 Fixes
## 🛠 Maintenance
## 📚 Documentation
Expand Down
9 changes: 6 additions & 3 deletions apollo-router/src/layers/async_checkpoint.rs
Original file line number Diff line number Diff line change
Expand Up @@ -159,7 +159,8 @@ mod async_checkpoint_tests {
move |_req: crate::ExecutionRequest| {
Ok(ExecutionResponse::fake_builder()
.label(expected_label.to_string())
.build())
.build()
.unwrap())
},
);

Expand Down Expand Up @@ -200,7 +201,8 @@ mod async_checkpoint_tests {
.returning(move |_req| {
Ok(ExecutionResponse::fake_builder()
.label(expected_label.to_string())
.build())
.build()
.unwrap())
});
router_service
});
Expand Down Expand Up @@ -236,7 +238,8 @@ mod async_checkpoint_tests {
Ok(ControlFlow::Break(
ExecutionResponse::fake_builder()
.label("returned_before_mock_service".to_string())
.build(),
.build()
.unwrap(),
))
})
.layer(router_service);
Expand Down
118 changes: 118 additions & 0 deletions apollo-router/src/layers/map_first_graphql_response.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,118 @@
#![allow(missing_docs)]

use std::future::ready;
use std::task::Poll;

use futures::future::BoxFuture;
use futures::stream::once;
use futures::FutureExt;
use futures::StreamExt;
use tower::Layer;
use tower::Service;

use crate::graphql;
use crate::services::supergraph;
use crate::Context;

pub struct MapFirstGraphqlResponseLayer<Callback> {
pub(super) callback: Callback,
}

pub struct MapFirstGraphqlResponseService<InnerService, Callback> {
inner: InnerService,
callback: Callback,
}

impl<InnerService, Callback> Layer<InnerService> for MapFirstGraphqlResponseLayer<Callback>
where
Callback: Clone,
{
type Service = MapFirstGraphqlResponseService<InnerService, Callback>;

fn layer(&self, inner: InnerService) -> Self::Service {
MapFirstGraphqlResponseService {
inner,
callback: self.callback.clone(),
}
}
}

impl<InnerService, Callback, Request> Service<Request>
for MapFirstGraphqlResponseService<InnerService, Callback>
where
InnerService: Service<Request, Response = supergraph::Response>,
InnerService::Future: Send + 'static,
Callback: FnOnce(
Context,
http::response::Parts,
graphql::Response,
) -> (http::response::Parts, graphql::Response)
+ Clone
+ Send
+ 'static,
{
type Response = supergraph::Response;
type Error = InnerService::Error;
type Future = BoxFuture<'static, Result<Self::Response, Self::Error>>;

fn poll_ready(&mut self, cx: &mut std::task::Context<'_>) -> Poll<Result<(), Self::Error>> {
self.inner.poll_ready(cx)
}

fn call(&mut self, request: Request) -> Self::Future {
let future = self.inner.call(request);
let callback = self.callback.clone();
async move {
let supergraph_response = future.await?;
let context = supergraph_response.context;
let (mut parts, mut stream) = supergraph_response.response.into_parts();
if let Some(first) = stream.next().await {
let (new_parts, new_first) = callback(context.clone(), parts, first);
parts = new_parts;
stream = once(ready(new_first)).chain(stream).boxed();
};
Ok(supergraph::Response {
context,
response: http::Response::from_parts(parts, stream),
})
}
.boxed()
}
}

#[cfg(test)]
mod tests {
use tower::ServiceExt;

use super::*;
use crate::layers::ServiceExt as _;

#[tokio::test]
async fn test_map_first_graphql_response() {
assert_eq!(
crate::TestHarness::builder()
.execution_hook(|service| {
service
.map_first_graphql_response(|_context, http_parts, mut graphql_response| {
graphql_response
.errors
.push(graphql::Error::builder().message("oh no!").build());
(http_parts, graphql_response)
})
.boxed()
})
.build()
.await
.unwrap()
.oneshot(supergraph::Request::canned_builder().build().unwrap())
.await
.unwrap()
.next_response()
.await
.unwrap()
.errors[0]
.message,
"oh no!"
);
}
}
149 changes: 147 additions & 2 deletions apollo-router/src/layers/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,16 +10,21 @@ use tower::ServiceBuilder;
use tower_service::Service;
use tracing::Span;

use self::map_first_graphql_response::MapFirstGraphqlResponseLayer;
use self::map_first_graphql_response::MapFirstGraphqlResponseService;
use crate::graphql;
use crate::layers::async_checkpoint::AsyncCheckpointLayer;
use crate::layers::instrument::InstrumentLayer;
use crate::layers::map_future_with_request_data::MapFutureWithRequestDataLayer;
use crate::layers::map_future_with_request_data::MapFutureWithRequestDataService;
use crate::layers::sync_checkpoint::CheckpointLayer;

pub mod map_future_with_request_data;
use crate::services::supergraph;
use crate::Context;

pub mod async_checkpoint;
pub mod instrument;
pub mod map_first_graphql_response;
pub mod map_future_with_request_data;
pub mod sync_checkpoint;

pub(crate) const DEFAULT_BUFFER_SIZE: usize = 20_000;
Expand Down Expand Up @@ -199,6 +204,70 @@ pub trait ServiceBuilderExt<L>: Sized {
self.layer(InstrumentLayer::new(span_fn))
}

/// Maps HTTP parts, as well as the first GraphQL response, to different values.
///
/// In supergraph and execution services, the service response contains
/// not just one GraphQL response but a stream of them,
/// in order to support features such as `@defer`.
///
/// This method wraps a service and call `callback` when the first GraphQL response
abernix marked this conversation as resolved.
Show resolved Hide resolved
/// in the stream returned by the inner service becomes available.
/// The callback can then modify the HTTP parts (headers, status code, etc)
garypen marked this conversation as resolved.
Show resolved Hide resolved
/// or the first GraphQL response before returning them.
///
/// Note that any subsequent GraphQL responses after the first will be forwarded unmodified.
/// In order to inspect or modify all GraphQL responses,
/// consider using [`map_response`][tower::ServiceExt::map_response]
/// together with [`supergraph::Response::map_stream`] instead.
/// (See the example in `map_stream`’s documentation.)
/// In that case however HTTP parts cannot be modified because they may have already been sent.
///
/// # Example
///
/// ```
/// use apollo_router::services::supergraph;
/// use apollo_router::layers::ServiceBuilderExt as _;
/// use tower::ServiceExt as _;
///
/// struct ExamplePlugin;
///
/// #[async_trait::async_trait]
/// impl apollo_router::plugin::Plugin for ExamplePlugin {
/// # type Config = ();
/// # async fn new(
/// # _init: apollo_router::plugin::PluginInit<Self::Config>,
/// # ) -> Result<Self, tower::BoxError> {
/// # Ok(Self)
/// # }
/// // …
/// fn supergraph_service(&self, inner: supergraph::BoxService) -> supergraph::BoxService {
/// tower::ServiceBuilder::new()
/// .map_first_graphql_response(|context, mut http_parts, mut graphql_response| {
/// // Something interesting here
/// (http_parts, graphql_response)
/// })
/// .service(inner)
/// .boxed()
/// }
/// }
/// ```
fn map_first_graphql_response<Callback>(
self,
callback: Callback,
) -> ServiceBuilder<Stack<MapFirstGraphqlResponseLayer<Callback>, L>>
where
Callback: FnOnce(
Context,
http::response::Parts,
graphql::Response,
) -> (http::response::Parts, graphql::Response)
+ Clone
+ Send
+ 'static,
{
self.layer(MapFirstGraphqlResponseLayer { callback })
}

/// Similar to map_future but also providing an opportunity to extract information out of the
/// request for use when constructing the response.
///
Expand Down Expand Up @@ -260,7 +329,83 @@ impl<L> ServiceBuilderExt<L> for ServiceBuilder<L> {
}

/// Extension trait for [`Service`].
///
/// Importing both this trait and [`tower::ServiceExt`] could lead a name collision error.
/// To work around that, use `as _` syntax to make a trait’s methods available in a module
/// without assigning it a name in that module’s namespace.
///
/// ```
/// use apollo_router::layers::ServiceExt as _;
/// use tower::ServiceExt as _;
/// ```
pub trait ServiceExt<Request>: Service<Request> {
/// Maps HTTP parts, as well as the first GraphQL response, to different values.
///
/// In supergraph and execution services, the service response contains
/// not just one GraphQL response but a stream of them,
/// in order to support features such as `@defer`.
///
/// This method wraps a service and call `callback` when the first GraphQL response
/// in the stream returned by the inner service becomes available.
/// The callback can then modify the HTTP parts (headers, status code, etc)
/// or the first GraphQL response before returning them.
///
/// Note that any subsequent GraphQL responses after the first will be forwarded unmodified.
/// In order to inspect or modify all GraphQL responses,
/// consider using [`map_response`][tower::ServiceExt::map_response]
/// together with [`supergraph::Response::map_stream`] instead.
/// (See the example in `map_stream`’s documentation.)
/// In that case however HTTP parts cannot be modified because they may have already been sent.
///
/// # Example
///
/// ```
/// use apollo_router::services::supergraph;
/// use apollo_router::layers::ServiceExt as _;
/// use tower::ServiceExt as _;
///
/// struct ExamplePlugin;
///
/// #[async_trait::async_trait]
/// impl apollo_router::plugin::Plugin for ExamplePlugin {
/// # type Config = ();
/// # async fn new(
/// # _init: apollo_router::plugin::PluginInit<Self::Config>,
/// # ) -> Result<Self, tower::BoxError> {
/// # Ok(Self)
/// # }
/// // …
/// fn supergraph_service(&self, inner: supergraph::BoxService) -> supergraph::BoxService {
/// inner
/// .map_first_graphql_response(|context, mut http_parts, mut graphql_response| {
/// // Something interesting here
/// (http_parts, graphql_response)
/// })
/// .boxed()
/// }
/// }
/// ```
fn map_first_graphql_response<Callback>(
self,
callback: Callback,
) -> MapFirstGraphqlResponseService<Self, Callback>
where
Self: Sized + Service<Request, Response = supergraph::Response>,
<Self as Service<Request>>::Future: Send + 'static,
Callback: FnOnce(
Context,
http::response::Parts,
graphql::Response,
) -> (http::response::Parts, graphql::Response)
+ Clone
+ Send
+ 'static,
{
ServiceBuilder::new()
.map_first_graphql_response(callback)
.service(self)
}

/// Similar to map_future but also providing an opportunity to extract information out of the
/// request for use when constructing the response.
///
Expand Down
Loading