From 31fe1143abe298dc89ffae7c44a2a2ba57af4b79 Mon Sep 17 00:00:00 2001 From: Geoffroy Couprie Date: Mon, 29 Aug 2022 15:17:58 +0200 Subject: [PATCH] refactor SupergraphService (#1615) Follow up on #1610 rustfmt does not apply anymore on supergraph_service.rs, which is a hint that the code is way too complex. This PR moves the `Service:call` body to an async function, and splits part of the functionality --- NEXT_CHANGELOG.md | 9 + .../src/services/supergraph_service.rs | 401 +++++++++--------- 2 files changed, 221 insertions(+), 189 deletions(-) diff --git a/NEXT_CHANGELOG.md b/NEXT_CHANGELOG.md index 942a264d7b..a0cef95a43 100644 --- a/NEXT_CHANGELOG.md +++ b/NEXT_CHANGELOG.md @@ -29,4 +29,13 @@ By [@USERNAME](https://github.com/USERNAME) in https://github.com/apollographql/ ## 🚀 Features ## 🐛 Fixes ## 🛠 Maintenance + +### Refactor `SupergraphService` ([PR #1615](https://github.com/apollographql/router/issues/1615)) + +The `SupergrapHService` code became too complex, so much that `rsutfmt` could not modify it anymore. +This breaks up the code in more manageable functions. + +By [@Geal](https://github.com/Geal) in https://github.com/apollographql/router/pull/1615 + + ## 📚 Documentation diff --git a/apollo-router/src/services/supergraph_service.rs b/apollo-router/src/services/supergraph_service.rs index 12de60dd35..4f26029e8a 100644 --- a/apollo-router/src/services/supergraph_service.rs +++ b/apollo-router/src/services/supergraph_service.rs @@ -19,6 +19,9 @@ use mediatype::names::MULTIPART; use mediatype::MediaType; use mediatype::MediaTypeList; use opentelemetry::trace::SpanKind; +use serde_json_bytes::ByteString; +use serde_json_bytes::Map; +use serde_json_bytes::Value; use tower::util::BoxService; use tower::BoxError; use tower::ServiceBuilder; @@ -47,7 +50,9 @@ use crate::response::IncrementalResponse; use crate::router_factory::SupergraphServiceFactory; use crate::services::layers::apq::APQLayer; use crate::services::layers::ensure_query_presence::EnsureQueryPresence; +use crate::spec::Query; use crate::Configuration; +use crate::Context; use crate::ExecutionRequest; use crate::ExecutionResponse; use crate::QueryPlannerRequest; @@ -105,206 +110,140 @@ where fn call(&mut self, req: SupergraphRequest) -> Self::Future { // Consume our cloned services and allow ownership to be transferred to the async block. - let mut planning = self.ready_query_planner_service.take().unwrap(); + let planning = self.ready_query_planner_service.take().unwrap(); let execution = self.execution_service_factory.new_service(); let schema = self.schema.clone(); let context_cloned = req.context.clone(); - let fut = async move { - let context = req.context; - let body = req.originating_request.body(); - let variables = body.variables.clone(); - let QueryPlannerResponse { content, context } = planning - .call( - QueryPlannerRequest::builder() - .query( - body.query - .clone() - .expect("the query presence was already checked by a plugin"), - ) - .and_operation_name(body.operation_name.clone()) - .context(context) - .build(), - ) - .instrument(tracing::info_span!("query_planning", - graphql.document = body.query.clone().expect("the query presence was already checked by a plugin").as_str(), - graphql.operation.name = body.operation_name.clone().unwrap_or_default().as_str(), - "otel.kind" = %SpanKind::Internal - )) - .await?; - - match content { - QueryPlannerContent::Introspection { response } => Ok( - SupergraphResponse::new_from_graphql_response(*response, context), - ), - QueryPlannerContent::IntrospectionDisabled => { - let mut resp = http::Response::new( - once(ready( - graphql::Response::builder() - .errors(vec![crate::error::Error::builder() - .message(String::from("introspection has been disabled")) - .build()]) - .build(), - )) - .boxed(), - ); - *resp.status_mut() = StatusCode::BAD_REQUEST; - - Ok(SupergraphResponse { - response: resp, - context, - }) - } - QueryPlannerContent::Plan { query, plan } => { - let can_be_deferred = plan.root.contains_defer(); - - if can_be_deferred && !accepts_multipart(req.originating_request.headers()) { - tracing::error!("tried to send a defer request without accept: multipart/mixed"); - let mut resp = http::Response::new( - once(ready( - graphql::Response::builder() - .errors(vec![crate::error::Error::builder() - .message(String::from("the router received a query with the @defer directive but the client does not accept multipart/mixed HTTP responses")) - .build()]) - .build(), - )) - .boxed(), - ); - *resp.status_mut() = StatusCode::BAD_REQUEST; - Ok(SupergraphResponse { - response: resp, - context, - }) - } else if let Some(err) = query.validate_variables(body, &schema).err() { - let mut res = SupergraphResponse::new_from_graphql_response(err, context); - *res.response.status_mut() = StatusCode::BAD_REQUEST; - Ok(res) - } else { - let operation_name = body.operation_name.clone(); - - let ExecutionResponse { response, context } = execution - .oneshot( - ExecutionRequest::builder() - .originating_request(req.originating_request) - .query_plan(plan) - .context(context) - .build(), - ) - .await?; - - let (parts, response_stream) = response.into_parts(); - - let stream = response_stream - .map(move |mut response: Response| { - tracing::debug_span!("format_response").in_scope(|| { - query.format_response( - &mut response, - operation_name.as_deref(), - variables.clone(), - schema.api_schema(), - ) - }); - - match (response.path.as_ref(), response.data.as_ref()) { - (None, _) | (_, None) => { - if can_be_deferred { - response.has_next = Some(true); - } - - response - } - // if the deferred response specified a path, we must extract the - //values matched by that path and create a separate response for - //each of them. - // While { "data": { "a": { "b": 1 } } } and { "data": { "b": 1 }, "path: ["a"] } - // would merge in the same ways, some clients will generate code - // that checks the specific type of the deferred response at that - // path, instead of starting from the root object, so to support - // this, we extract the value at that path. - // In particular, that means that a deferred fragment in an object - // under an array would generate one response par array element - (Some(response_path), Some(response_data)) => { - let mut sub_responses = Vec::new(); - response_data.select_values_and_paths( - response_path, - |path, value| { - sub_responses - .push((path.clone(), value.clone())); - }, - ); - - Response::builder() - .has_next(true) - .incremental( - sub_responses - .into_iter() - .map(move |(path, data)| { - IncrementalResponse::builder() - .and_label( - response.label.clone(), - ) - .data(data) - .path(path) - .errors(response.errors.clone()) - .extensions( - response.extensions.clone(), - ) - .build() - }) - .collect(), - ) - .build() - } - } - }); - - Ok(SupergraphResponse { - context, - response: http::Response::from_parts( - parts, - if can_be_deferred { - stream.chain(once(ready(Response::builder().has_next(false).build()))).left_stream() - } else { - stream.right_stream() - }.in_current_span() - .boxed(), - ) - }) + let fut = + service_call(planning, execution, schema, req).or_else(|error: BoxError| async move { + let errors = vec![crate::error::Error { + message: error.to_string(), + ..Default::default() + }]; + let status_code = match error.downcast_ref::() { + Some(crate::error::CacheResolverError::RetrievalError(retrieval_error)) + if matches!( + retrieval_error.deref().downcast_ref::(), + Some(QueryPlannerError::SpecError(_)) + | Some(QueryPlannerError::SchemaValidationErrors(_)) + ) => + { + StatusCode::BAD_REQUEST } - } - } - } - .or_else(|error: BoxError| async move { - let errors = vec![crate::error::Error { - message: error.to_string(), - ..Default::default() - }]; - let status_code = match error.downcast_ref::() { - Some(crate::error::CacheResolverError::RetrievalError(retrieval_error)) - if matches!( - retrieval_error.deref().downcast_ref::(), - Some(QueryPlannerError::SpecError(_)) - | Some(QueryPlannerError::SchemaValidationErrors(_)) - ) => - { - StatusCode::BAD_REQUEST - } - _ => StatusCode::INTERNAL_SERVER_ERROR, - }; - - Ok(SupergraphResponse::builder() - .errors(errors) - .status_code(status_code) - .context(context_cloned) - .build() - .expect("building a response like this should not fail")) - }); + _ => StatusCode::INTERNAL_SERVER_ERROR, + }; + + Ok(SupergraphResponse::builder() + .errors(errors) + .status_code(status_code) + .context(context_cloned) + .build() + .expect("building a response like this should not fail")) + }); Box::pin(fut) } } +async fn service_call( + planning: CachingQueryPlanner, + execution: ExecutionService, + schema: Arc, + req: SupergraphRequest, +) -> Result +where + ExecutionService: + Service + Send, +{ + let context = req.context; + let body = req.originating_request.body(); + let variables = body.variables.clone(); + let QueryPlannerResponse { content, context } = plan_query(planning, body, context).await?; + + match content { + QueryPlannerContent::Introspection { response } => Ok( + SupergraphResponse::new_from_graphql_response(*response, context), + ), + QueryPlannerContent::IntrospectionDisabled => { + let mut response = SupergraphResponse::new_from_graphql_response( + graphql::Response::builder() + .errors(vec![crate::error::Error::builder() + .message(String::from("introspection has been disabled")) + .build()]) + .build(), + context, + ); + *response.response.status_mut() = StatusCode::BAD_REQUEST; + Ok(response) + } + QueryPlannerContent::Plan { query, plan } => { + let can_be_deferred = plan.root.contains_defer(); + + if can_be_deferred && !accepts_multipart(req.originating_request.headers()) { + let mut response = SupergraphResponse::new_from_graphql_response(graphql::Response::builder() + .errors(vec![crate::error::Error::builder() + .message(String::from("the router received a query with the @defer directive but the client does not accept multipart/mixed HTTP responses")) + .build()]) + .build(), context); + *response.response.status_mut() = StatusCode::BAD_REQUEST; + Ok(response) + } else if let Some(err) = query.validate_variables(body, &schema).err() { + let mut res = SupergraphResponse::new_from_graphql_response(err, context); + *res.response.status_mut() = StatusCode::BAD_REQUEST; + Ok(res) + } else { + let operation_name = body.operation_name.clone(); + + let execution_response = execution + .oneshot( + ExecutionRequest::builder() + .originating_request(req.originating_request) + .query_plan(plan) + .context(context) + .build(), + ) + .await?; + + process_execution_response( + execution_response, + query, + operation_name, + variables, + schema, + can_be_deferred, + ) + } + } + } +} + +async fn plan_query( + mut planning: CachingQueryPlanner, + body: &graphql::Request, + context: Context, +) -> Result { + planning + .call( + QueryPlannerRequest::builder() + .query( + body.query + .clone() + .expect("the query presence was already checked by a plugin"), + ) + .and_operation_name(body.operation_name.clone()) + .context(context) + .build(), + ) + .instrument(tracing::info_span!("query_planning", + graphql.document = body.query.clone().expect("the query presence was already checked by a plugin").as_str(), + graphql.operation.name = body.operation_name.clone().unwrap_or_default().as_str(), + "otel.kind" = %SpanKind::Internal + )) + .await +} + fn accepts_multipart(headers: &HeaderMap) -> bool { let multipart_mixed = MediaType::new(MULTIPART, MIXED); @@ -320,6 +259,90 @@ fn accepts_multipart(headers: &HeaderMap) -> bool { }) } +fn process_execution_response( + execution_response: ExecutionResponse, + query: Arc, + operation_name: Option, + variables: Map, + schema: Arc, + can_be_deferred: bool, +) -> Result { + let ExecutionResponse { response, context } = execution_response; + + let (parts, response_stream) = response.into_parts(); + + let stream = response_stream.map(move |mut response: Response| { + tracing::debug_span!("format_response").in_scope(|| { + query.format_response( + &mut response, + operation_name.as_deref(), + variables.clone(), + schema.api_schema(), + ) + }); + + match (response.path.as_ref(), response.data.as_ref()) { + (None, _) | (_, None) => { + if can_be_deferred { + response.has_next = Some(true); + } + + response + } + // if the deferred response specified a path, we must extract the + //values matched by that path and create a separate response for + //each of them. + // While { "data": { "a": { "b": 1 } } } and { "data": { "b": 1 }, "path: ["a"] } + // would merge in the same ways, some clients will generate code + // that checks the specific type of the deferred response at that + // path, instead of starting from the root object, so to support + // this, we extract the value at that path. + // In particular, that means that a deferred fragment in an object + // under an array would generate one response par array element + (Some(response_path), Some(response_data)) => { + let mut sub_responses = Vec::new(); + response_data.select_values_and_paths(response_path, |path, value| { + sub_responses.push((path.clone(), value.clone())); + }); + + Response::builder() + .has_next(true) + .incremental( + sub_responses + .into_iter() + .map(move |(path, data)| { + IncrementalResponse::builder() + .and_label(response.label.clone()) + .data(data) + .path(path) + .errors(response.errors.clone()) + .extensions(response.extensions.clone()) + .build() + }) + .collect(), + ) + .build() + } + } + }); + + Ok(SupergraphResponse { + context, + response: http::Response::from_parts( + parts, + if can_be_deferred { + stream + .chain(once(ready(Response::builder().has_next(false).build()))) + .left_stream() + } else { + stream.right_stream() + } + .in_current_span() + .boxed(), + ), + }) +} + /// Builder which generates a plugin pipeline. /// /// This is at the heart of the delegation of responsibility model for the router. A schema,