Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

use a parking-lot mutex in Context #2885

Merged
merged 8 commits into from
Apr 20, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions .changesets/fix_geal_context_mutex.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
### use a parking-lot mutex in Context ([Issue #2751](https://github.com/apollographql/router/issues/2751))

The context requires synchronized access to the busy timer, and precedently we used a futures aware mutex for that, but those are susceptible to contention. This replaces that mutex with a parking-lot synchronous mutex that is much faster.

By [@Geal](https://github.com/Geal) in https://github.com/apollographql/router/pull/2885
2 changes: 1 addition & 1 deletion apollo-router/src/axum_factory/axum_http_server_factory.rs
Original file line number Diff line number Diff line change
Expand Up @@ -436,7 +436,7 @@ async fn handle_graphql(
let context = request.context.clone();

let res = service.oneshot(request).await;
let dur = context.busy_time().await;
let dur = context.busy_time();
let processing_seconds = dur.as_secs_f64();

tracing::info!(histogram.apollo_router_processing_time = processing_seconds,);
Expand Down
19 changes: 11 additions & 8 deletions apollo-router/src/context/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,8 @@ use std::time::Instant;
use dashmap::mapref::multiple::RefMulti;
use dashmap::mapref::multiple::RefMutMulti;
use dashmap::DashMap;
use futures::lock::Mutex;
use derivative::Derivative;
use parking_lot::Mutex;
use serde::Deserialize;
use serde::Serialize;
use tower::BoxError;
Expand All @@ -34,7 +35,8 @@ pub(crate) type Entries = Arc<DashMap<String, Value>>;
/// [`crate::services::SubgraphResponse`] processing. At such times,
/// plugins should restrict themselves to the [`Context::get`] and [`Context::upsert`]
/// functions to minimise the possibility of mis-sequenced updates.
#[derive(Clone, Debug, Deserialize, Serialize)]
#[derive(Clone, Deserialize, Serialize, Derivative)]
#[derivative(Debug)]
pub struct Context {
// Allows adding custom entries to the context.
entries: Entries,
Expand All @@ -48,6 +50,7 @@ pub struct Context {
pub(crate) created_at: Instant,

#[serde(skip)]
#[derivative(Debug = "ignore")]
busy_timer: Arc<Mutex<BusyTimer>>,
}

Expand Down Expand Up @@ -193,18 +196,18 @@ impl Context {
}

/// Notify the busy timer that we're waiting on a network request
pub(crate) async fn enter_active_request(&self) {
self.busy_timer.lock().await.increment_active_requests()
pub(crate) fn enter_active_request(&self) {
self.busy_timer.lock().increment_active_requests()
}

/// Notify the busy timer that we stopped waiting on a network request
pub(crate) async fn leave_active_request(&self) {
self.busy_timer.lock().await.decrement_active_requests()
pub(crate) fn leave_active_request(&self) {
self.busy_timer.lock().decrement_active_requests()
}

/// How much time was spent working on the request
pub(crate) async fn busy_time(&self) -> Duration {
self.busy_timer.lock().await.current()
pub(crate) fn busy_time(&self) -> Duration {
self.busy_timer.lock().current()
}
}

Expand Down
16 changes: 8 additions & 8 deletions apollo-router/src/plugins/coprocessor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -516,9 +516,9 @@ where
};

tracing::debug!(?payload, "externalized output");
request.context.enter_active_request().await;
request.context.enter_active_request();
let co_processor_result = payload.call(http_client, &coprocessor_url).await;
request.context.leave_active_request().await;
request.context.leave_active_request();
tracing::debug!(?co_processor_result, "co-processor returned");
let co_processor_output = co_processor_result?;

Expand Down Expand Up @@ -642,9 +642,9 @@ where

// Second, call our co-processor and get a reply.
tracing::debug!(?payload, "externalized output");
response.context.enter_active_request().await;
response.context.enter_active_request();
let co_processor_result = payload.call(http_client, &coprocessor_url).await;
response.context.leave_active_request().await;
response.context.leave_active_request();
tracing::debug!(?co_processor_result, "co-processor returned");
let co_processor_output = co_processor_result?;

Expand Down Expand Up @@ -728,9 +728,9 @@ where
};

tracing::debug!(?payload, "externalized output");
request.context.enter_active_request().await;
request.context.enter_active_request();
let co_processor_result = payload.call(http_client, &coprocessor_url).await;
request.context.leave_active_request().await;
request.context.leave_active_request();
tracing::debug!(?co_processor_result, "co-processor returned");
let co_processor_output = co_processor_result?;
validate_coprocessor_output(&co_processor_output, PipelineStep::SubgraphRequest)?;
Expand Down Expand Up @@ -858,9 +858,9 @@ where
};

tracing::debug!(?payload, "externalized output");
response.context.enter_active_request().await;
response.context.enter_active_request();
let co_processor_result = payload.call(http_client, &coprocessor_url).await;
response.context.leave_active_request().await;
response.context.leave_active_request();
tracing::debug!(?co_processor_result, "co-processor returned");
let co_processor_output = co_processor_result?;

Expand Down
10 changes: 5 additions & 5 deletions apollo-router/src/services/subgraph_service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -329,13 +329,13 @@ async fn call_http(
let cloned_service_name = service_name.clone();
let cloned_context = context.clone();
let (parts, body) = async move {
cloned_context.enter_active_request().await;
cloned_context.enter_active_request();
let response = match client
.call(request)
.await {
Err(err) => {
tracing::error!(fetch_error = format!("{err:?}").as_str());
cloned_context.leave_active_request().await;
cloned_context.leave_active_request();

return Err(FetchError::SubrequestHttpError {
status_code: None,
Expand All @@ -359,7 +359,7 @@ async fn call_http(
if !content_type_str.contains(APPLICATION_JSON.essence_str())
&& !content_type_str.contains(GRAPHQL_JSON_RESPONSE_HEADER_VALUE)
{
cloned_context.leave_active_request().await;
cloned_context.leave_active_request();

return if !parts.status.is_success() {

Expand Down Expand Up @@ -387,7 +387,7 @@ async fn call_http(
.instrument(tracing::debug_span!("aggregate_response_data"))
.await {
Err(err) => {
cloned_context.leave_active_request().await;
cloned_context.leave_active_request();

tracing::error!(fetch_error = format!("{err:?}").as_str());

Expand All @@ -400,7 +400,7 @@ async fn call_http(
}, Ok(body) => body,
};

cloned_context.leave_active_request().await;
cloned_context.leave_active_request();

Ok((parts, body))
}.instrument(subgraph_req_span).await?;
Expand Down