From c79e5013a5757ba2b787d7114392e8aa33aa22d6 Mon Sep 17 00:00:00 2001 From: Geoffroy Couprie Date: Fri, 31 Mar 2023 12:16:52 +0200 Subject: [PATCH 1/4] use a parking-lot mutex in Context --- Cargo.lock | 1 + apollo-router/Cargo.toml | 1 + apollo-router/src/context.rs | 19 +++++++++++-------- apollo-router/src/plugins/coprocessor.rs | 16 ++++++++-------- .../src/services/subgraph_service.rs | 10 +++++----- 5 files changed, 26 insertions(+), 21 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index d682c2047e..04ed9abebd 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -284,6 +284,7 @@ dependencies = [ "opentelemetry-semantic-conventions", "opentelemetry-zipkin", "p256 0.12.0", + "parking_lot 0.12.1", "paste", "pin-project-lite", "prometheus", diff --git a/apollo-router/Cargo.toml b/apollo-router/Cargo.toml index cbe5a08749..b91f0ce8a0 100644 --- a/apollo-router/Cargo.toml +++ b/apollo-router/Cargo.toml @@ -203,6 +203,7 @@ yaml-rust = "0.4.5" wsl = "0.1.0" tokio-rustls = "0.23.4" http-serde = "1.1.2" +parking_lot = "0.12.1" [target.'cfg(macos)'.dependencies] uname = "0.1.1" diff --git a/apollo-router/src/context.rs b/apollo-router/src/context.rs index e4f670d93e..e65c360e9a 100644 --- a/apollo-router/src/context.rs +++ b/apollo-router/src/context.rs @@ -10,7 +10,8 @@ use std::time::Instant; use dashmap::mapref::multiple::RefMulti; use dashmap::mapref::multiple::RefMutMulti; use dashmap::DashMap; -use futures::lock::Mutex; +use derivative::Derivative; +use parking_lot::Mutex; use serde::Deserialize; use serde::Serialize; use tower::BoxError; @@ -31,7 +32,8 @@ pub(crate) type Entries = Arc>; /// [`crate::services::SubgraphResponse`] processing. At such times, /// plugins should restrict themselves to the [`Context::get`] and [`Context::upsert`] /// functions to minimise the possibility of mis-sequenced updates. -#[derive(Clone, Debug, Deserialize, Serialize)] +#[derive(Clone, Deserialize, Serialize, Derivative)] +#[derivative(Debug)] pub struct Context { // Allows adding custom entries to the context. entries: Entries, @@ -42,6 +44,7 @@ pub struct Context { pub(crate) created_at: Instant, #[serde(skip)] + #[derivative(Debug = "ignore")] busy_timer: Arc>, } @@ -186,18 +189,18 @@ impl Context { } /// Notify the busy timer that we're waiting on a network request - pub(crate) async fn enter_active_request(&self) { - self.busy_timer.lock().await.increment_active_requests() + pub(crate) fn enter_active_request(&self) { + self.busy_timer.lock().increment_active_requests() } /// Notify the busy timer that we stopped waiting on a network request - pub(crate) async fn leave_active_request(&self) { - self.busy_timer.lock().await.decrement_active_requests() + pub(crate) fn leave_active_request(&self) { + self.busy_timer.lock().decrement_active_requests() } /// How much time was spent working on the request - pub(crate) async fn busy_time(&self) -> Duration { - self.busy_timer.lock().await.current() + pub(crate) fn busy_time(&self) -> Duration { + self.busy_timer.lock().current() } } diff --git a/apollo-router/src/plugins/coprocessor.rs b/apollo-router/src/plugins/coprocessor.rs index f959da6581..e77c31c645 100644 --- a/apollo-router/src/plugins/coprocessor.rs +++ b/apollo-router/src/plugins/coprocessor.rs @@ -471,9 +471,9 @@ where }; tracing::debug!(?payload, "externalized output"); - request.context.enter_active_request().await; + request.context.enter_active_request(); let co_processor_result = payload.call(http_client, &coprocessor_url).await; - request.context.leave_active_request().await; + request.context.leave_active_request(); tracing::debug!(?co_processor_result, "co-processor returned"); let co_processor_output = co_processor_result?; @@ -593,9 +593,9 @@ where // Second, call our co-processor and get a reply. tracing::debug!(?payload, "externalized output"); - response.context.enter_active_request().await; + response.context.enter_active_request(); let co_processor_result = payload.call(http_client, &coprocessor_url).await; - response.context.leave_active_request().await; + response.context.leave_active_request(); tracing::debug!(?co_processor_result, "co-processor returned"); let co_processor_output = co_processor_result?; @@ -676,9 +676,9 @@ where }; tracing::debug!(?payload, "externalized output"); - request.context.enter_active_request().await; + request.context.enter_active_request(); let co_processor_result = payload.call(http_client, &coprocessor_url).await; - request.context.leave_active_request().await; + request.context.leave_active_request(); tracing::debug!(?co_processor_result, "co-processor returned"); let co_processor_output = co_processor_result?; validate_coprocessor_output(&co_processor_output, PipelineStep::SubgraphRequest)?; @@ -801,9 +801,9 @@ where }; tracing::debug!(?payload, "externalized output"); - response.context.enter_active_request().await; + response.context.enter_active_request(); let co_processor_result = payload.call(http_client, &coprocessor_url).await; - response.context.leave_active_request().await; + response.context.leave_active_request(); tracing::debug!(?co_processor_result, "co-processor returned"); let co_processor_output = co_processor_result?; diff --git a/apollo-router/src/services/subgraph_service.rs b/apollo-router/src/services/subgraph_service.rs index 08dc2f4095..e86aa6524b 100644 --- a/apollo-router/src/services/subgraph_service.rs +++ b/apollo-router/src/services/subgraph_service.rs @@ -329,13 +329,13 @@ async fn call_http( let cloned_service_name = service_name.clone(); let cloned_context = context.clone(); let (parts, body) = async move { - cloned_context.enter_active_request().await; + cloned_context.enter_active_request(); let response = match client .call(request) .await { Err(err) => { tracing::error!(fetch_error = format!("{err:?}").as_str()); - cloned_context.leave_active_request().await; + cloned_context.leave_active_request(); return Err(FetchError::SubrequestHttpError { service: service_name.clone(), @@ -358,7 +358,7 @@ async fn call_http( if !content_type_str.contains(APPLICATION_JSON.essence_str()) && !content_type_str.contains(GRAPHQL_JSON_RESPONSE_HEADER_VALUE) { - cloned_context.leave_active_request().await; + cloned_context.leave_active_request(); return if !parts.status.is_success() { @@ -384,7 +384,7 @@ async fn call_http( .instrument(tracing::debug_span!("aggregate_response_data")) .await { Err(err) => { - cloned_context.leave_active_request().await; + cloned_context.leave_active_request(); tracing::error!(fetch_error = format!("{err:?}").as_str()); @@ -396,7 +396,7 @@ async fn call_http( }, Ok(body) => body, }; - cloned_context.leave_active_request().await; + cloned_context.leave_active_request(); Ok((parts, body)) }.instrument(subgraph_req_span).await?; From 1b7e38e233e407e45b35b86302a07c2b8b0c03cb Mon Sep 17 00:00:00 2001 From: Geoffroy Couprie Date: Fri, 31 Mar 2023 13:39:18 +0200 Subject: [PATCH 2/4] fix --- apollo-router/src/axum_factory/axum_http_server_factory.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/apollo-router/src/axum_factory/axum_http_server_factory.rs b/apollo-router/src/axum_factory/axum_http_server_factory.rs index 74c0ac1601..7e5b3c4011 100644 --- a/apollo-router/src/axum_factory/axum_http_server_factory.rs +++ b/apollo-router/src/axum_factory/axum_http_server_factory.rs @@ -436,7 +436,7 @@ async fn handle_graphql( let context = request.context.clone(); let res = service.oneshot(request).await; - let dur = context.busy_time().await; + let dur = context.busy_time(); let processing_seconds = dur.as_secs_f64(); tracing::info!(histogram.apollo_router_processing_time = processing_seconds,); From 8105e65f4bdb63753e02be449955d1257afbc1b7 Mon Sep 17 00:00:00 2001 From: Geoffroy Couprie Date: Mon, 17 Apr 2023 12:42:17 +0200 Subject: [PATCH 3/4] changeset --- .changesets/fix_geal_context_mutex.md | 5 +++++ 1 file changed, 5 insertions(+) create mode 100644 .changesets/fix_geal_context_mutex.md diff --git a/.changesets/fix_geal_context_mutex.md b/.changesets/fix_geal_context_mutex.md new file mode 100644 index 0000000000..74bc0f00e6 --- /dev/null +++ b/.changesets/fix_geal_context_mutex.md @@ -0,0 +1,5 @@ +### use a parking-lot mutex in Context [Issue #2751](https://github.com/apollographql/router/issues/2751) + +The context requires synchronized access to the busy timer, and precedently we used a futures aware mutex for that, but those are susceptible to contention. This replaces that mutex with a parking-lot synchronous mutex that is much faster. + +By [@Geal](https://github.com/Geal) in https://github.com/apollographql/router/pull/2885 \ No newline at end of file From e7fe7cfc2c8d4de253b877e5a7b85aa1659ad3ee Mon Sep 17 00:00:00 2001 From: Geoffroy Couprie Date: Mon, 17 Apr 2023 12:45:11 +0200 Subject: [PATCH 4/4] Update fix_geal_context_mutex.md --- .changesets/fix_geal_context_mutex.md | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/.changesets/fix_geal_context_mutex.md b/.changesets/fix_geal_context_mutex.md index 74bc0f00e6..be40b646a7 100644 --- a/.changesets/fix_geal_context_mutex.md +++ b/.changesets/fix_geal_context_mutex.md @@ -1,5 +1,5 @@ -### use a parking-lot mutex in Context [Issue #2751](https://github.com/apollographql/router/issues/2751) +### use a parking-lot mutex in Context ([Issue #2751](https://github.com/apollographql/router/issues/2751)) The context requires synchronized access to the busy timer, and precedently we used a futures aware mutex for that, but those are susceptible to contention. This replaces that mutex with a parking-lot synchronous mutex that is much faster. -By [@Geal](https://github.com/Geal) in https://github.com/apollographql/router/pull/2885 \ No newline at end of file +By [@Geal](https://github.com/Geal) in https://github.com/apollographql/router/pull/2885