Skip to content

Commit

Permalink
add two retry strategies to allow requests to timeout gracefully (#8080)
Browse files Browse the repository at this point in the history
Certain APIs should time out in different ways. For the cache, we want
to retry connection attempts but not retry upload timeouts as a retry is
unlikely to change your connection speed.
  • Loading branch information
arlyon authored and tknickman committed May 21, 2024
1 parent d2e5b97 commit 809acc9
Show file tree
Hide file tree
Showing 7 changed files with 177 additions and 38 deletions.
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions crates/turborepo-api-client/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ rustls-tls = ["reqwest/rustls-tls-native-roots"]

[dev-dependencies]
http = "0.2.9"
httpmock = { workspace = true }
port_scanner = { workspace = true }
test-case = { workspace = true }
turborepo-vercel-api-mock = { workspace = true }
Expand Down
3 changes: 2 additions & 1 deletion crates/turborepo-api-client/src/analytics.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,8 +25,9 @@ impl AnalyticsClient for APIClient {
.await?
.json(&events);

retry::make_retryable_request(request_builder)
retry::make_retryable_request(request_builder, retry::RetryStrategy::Timeout)
.await?
.into_response()
.error_for_status()?;

Ok(())
Expand Down
64 changes: 44 additions & 20 deletions crates/turborepo-api-client/src/lib.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
#![feature(async_closure)]
#![feature(error_generic_member_access)]
#![feature(assert_matches)]
#![deny(clippy::all)]

use std::{backtrace::Backtrace, env, future::Future, time::Duration};
Expand Down Expand Up @@ -134,9 +135,11 @@ impl Client for APIClient {
.header("User-Agent", self.user_agent.clone())
.header("Authorization", format!("Bearer {}", token))
.header("Content-Type", "application/json");
let response = retry::make_retryable_request(request_builder)
.await?
.error_for_status()?;
let response =
retry::make_retryable_request(request_builder, retry::RetryStrategy::Timeout)
.await?
.into_response()
.error_for_status()?;

Ok(response.json().await?)
}
Expand All @@ -149,9 +152,11 @@ impl Client for APIClient {
.header("Content-Type", "application/json")
.header("Authorization", format!("Bearer {}", token));

let response = retry::make_retryable_request(request_builder)
.await?
.error_for_status()?;
let response =
retry::make_retryable_request(request_builder, retry::RetryStrategy::Timeout)
.await?
.into_response()
.error_for_status()?;

Ok(response.json().await?)
}
Expand Down Expand Up @@ -194,9 +199,11 @@ impl Client for APIClient {
.header("Content-Type", "application/json")
.header("Authorization", format!("Bearer {}", token));

let response = retry::make_retryable_request(request_builder)
.await?
.error_for_status()?;
let response =
retry::make_retryable_request(request_builder, retry::RetryStrategy::Timeout)
.await?
.into_response()
.error_for_status()?;

Ok(response.json().await?)
}
Expand All @@ -208,9 +215,11 @@ impl Client for APIClient {
.query(&[("token", token), ("tokenName", token_name)])
.header("User-Agent", self.user_agent.clone());

let response = retry::make_retryable_request(request_builder)
.await?
.error_for_status()?;
let response =
retry::make_retryable_request(request_builder, retry::RetryStrategy::Timeout)
.await?
.into_response()
.error_for_status()?;

let verification_response: VerificationResponse = response.json().await?;

Expand Down Expand Up @@ -310,7 +319,9 @@ impl CacheClient for APIClient {

request_builder = Self::add_team_params(request_builder, team_id, team_slug);

let response = retry::make_retryable_request(request_builder).await?;
let response =
retry::make_retryable_request(request_builder, retry::RetryStrategy::Timeout).await?;
let response = response.into_response();

match response.status() {
StatusCode::FORBIDDEN => Err(Self::handle_403(response).await),
Expand Down Expand Up @@ -391,7 +402,10 @@ impl CacheClient for APIClient {
request_builder = request_builder.header("x-artifact-tag", tag);
}

let response = retry::make_retryable_request(request_builder).await?;
let response =
retry::make_retryable_request(request_builder, retry::RetryStrategy::Connection)
.await?
.into_response();

if response.status() == StatusCode::FORBIDDEN {
return Err(Self::handle_403(response).await);
Expand All @@ -416,9 +430,11 @@ impl CacheClient for APIClient {

let request_builder = Self::add_team_params(request_builder, team_id, team_slug);

let response = retry::make_retryable_request(request_builder)
.await?
.error_for_status()?;
let response =
retry::make_retryable_request(request_builder, retry::RetryStrategy::Timeout)
.await?
.into_response()
.error_for_status()?;

Ok(response.json().await?)
}
Expand Down Expand Up @@ -451,7 +467,9 @@ impl TokenClient for APIClient {
invalid_token: bool,
}

let response = retry::make_retryable_request(request_builder).await?;
let response =
retry::make_retryable_request(request_builder, retry::RetryStrategy::Timeout).await?;
let response = response.into_response();
let status = response.status();
// Give a better error message for invalid tokens. This endpoint returns the
// following statuses:
Expand Down Expand Up @@ -503,7 +521,10 @@ impl TokenClient for APIClient {
invalid_token: bool,
}

let response = retry::make_retryable_request(request_builder).await?;
let response =
retry::make_retryable_request(request_builder, retry::RetryStrategy::Timeout)
.await?
.into_response();
let status = response.status();
// Give a better error message for invalid tokens. This endpoint returns the
// following statuses:
Expand Down Expand Up @@ -604,7 +625,10 @@ impl APIClient {
.header("Access-Control-Request-Headers", request_headers)
.header("Authorization", format!("Bearer {}", token));

let response = retry::make_retryable_request(request_builder).await?;
let response =
retry::make_retryable_request(request_builder, retry::RetryStrategy::Timeout)
.await?
.into_response();

let headers = response.headers();
let location = if let Some(location) = headers.get("Location") {
Expand Down
129 changes: 118 additions & 11 deletions crates/turborepo-api-client/src/retry.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,30 @@ const MIN_SLEEP_TIME_SECS: u64 = 2;
const MAX_SLEEP_TIME_SECS: u64 = 10;
const RETRY_MAX: u32 = 2;

#[derive(Debug)]
pub enum Retry {
Once(Response),
#[allow(dead_code)]
Retried(Response, u32),
}

impl Retry {
pub fn into_response(self) -> Response {
match self {
Retry::Once(response) => response,
Retry::Retried(response, _) => response,
}
}

#[allow(dead_code)]
pub fn retry_count(&self) -> Option<u32> {
match self {
Retry::Once(_) => None,
Retry::Retried(_, count) => Some(*count),
}
}
}

/// Retries a request until `RETRY_MAX` is reached, the `should_retry_request`
/// function returns false, or the future succeeds. Uses an exponential backoff
/// with a base of 2 to delay between retries.
Expand All @@ -15,11 +39,13 @@ const RETRY_MAX: u32 = 2;
///
/// * `request_builder`: The request builder with everything, i.e. headers and
/// body already set. NOTE: This must be cloneable, so no streams are allowed.
/// * `strategy`: The strategy to use for retrying requests.
///
/// returns: Result<Response, Error>
pub(crate) async fn make_retryable_request(
request_builder: RequestBuilder,
) -> Result<Response, Error> {
strategy: RetryStrategy,
) -> Result<Retry, Error> {
let mut last_error = None;
for retry_count in 0..RETRY_MAX {
// A request builder can fail to clone for two reasons:
Expand All @@ -28,12 +54,12 @@ pub(crate) async fn make_retryable_request(
// - the request body is a stream, in this case we'll just send the one request
// we have
let Some(builder) = request_builder.try_clone() else {
return Ok(request_builder.send().await?);
return Ok(Retry::Once(request_builder.send().await?));
};
match builder.send().await {
Ok(value) => return Ok(value),
Ok(value) => return Ok(Retry::Retried(value, retry_count)),
Err(err) => {
if !should_retry_request(&err) {
if !strategy.should_retry(&err) {
return Err(err.into());
}
last_error = Some(err);
Expand All @@ -49,16 +75,97 @@ pub(crate) async fn make_retryable_request(
Err(Error::TooManyFailures(Box::new(last_error.unwrap())))
}

fn should_retry_request(error: &reqwest::Error) -> bool {
if let Some(status) = error.status() {
if status == StatusCode::TOO_MANY_REQUESTS {
return true;
/// A retry strategy. Note that error statuses and TOO_MANY_REQUESTS are always
/// retried.
pub enum RetryStrategy {
/// Retry in the case of connection issues, but ignore timeouts.
Connection,
/// Retry in the case of connection issues and timeouts.
Timeout,
}

impl RetryStrategy {
fn should_retry(&self, error: &reqwest::Error) -> bool {
if let Some(status) = error.status() {
if status == StatusCode::TOO_MANY_REQUESTS {
return true;
}

if status.as_u16() >= 500 && status.as_u16() != 501 {
return true;
}
}

if status.as_u16() >= 500 && status.as_u16() != 501 {
return true;
match self {
RetryStrategy::Connection => error.is_connect(),
RetryStrategy::Timeout => error.is_timeout(),
}
}
}

error.is_request() || error.is_timeout()
#[cfg(test)]
mod test {
use std::{assert_matches::assert_matches, time::Duration};

use crate::{
retry::{make_retryable_request, RetryStrategy},
Error,
};

#[tokio::test]
async fn handles_too_many_failures() {
let mock = httpmock::MockServer::start_async().await;
let req = mock
.mock_async(|when, then| {
when.method(httpmock::Method::GET);
then.delay(Duration::from_secs(100));
})
.await;

let request_builder = reqwest::Client::new()
.get(mock.url("/"))
.timeout(Duration::from_millis(10));
let result = make_retryable_request(request_builder, RetryStrategy::Timeout).await;

req.assert_hits_async(2).await;
assert_matches!(result, Err(Error::TooManyFailures(_)));
}

#[tokio::test]
async fn handles_connection_timeout() {
let client = reqwest::Client::builder()
.connect_timeout(Duration::from_millis(10))
.build()
.unwrap();

let request_builder = client.get("http://localhost:1").send().await; // bad port
let should_retry = RetryStrategy::Connection.should_retry(&request_builder.unwrap_err());

assert_matches!(should_retry, true);
}

#[tokio::test]
async fn handles_connection_timeout_retries() {
let client = reqwest::Client::builder()
.timeout(Duration::from_millis(20))
.connect_timeout(Duration::from_millis(10))
.build()
.unwrap();

let mock = httpmock::MockServer::start_async().await;
let req = mock
.mock_async(|when, then| {
when.method(httpmock::Method::GET);
then.delay(Duration::from_secs(100));
})
.await;

let request_builder = client.get(mock.url("/")); // bad port
let result = make_retryable_request(request_builder, RetryStrategy::Connection).await;

// we should make at most one request and give up if it times out after
// connecting
assert_matches!(result, Err(_));
req.assert_hits_async(1).await;
}
}
14 changes: 9 additions & 5 deletions crates/turborepo-api-client/src/spaces.rs
Original file line number Diff line number Diff line change
Expand Up @@ -152,9 +152,11 @@ impl APIClient {
.await?
.json(&payload);

let response = retry::make_retryable_request(request_builder)
.await?
.error_for_status()?;
let response =
retry::make_retryable_request(request_builder, retry::RetryStrategy::Timeout)
.await?
.into_response()
.error_for_status()?;

Ok(response.json().await?)
}
Expand All @@ -176,8 +178,9 @@ impl APIClient {
.await?
.json(&task);

retry::make_retryable_request(request_builder)
retry::make_retryable_request(request_builder, retry::RetryStrategy::Timeout)
.await?
.into_response()
.error_for_status()?;

Ok(())
Expand All @@ -201,8 +204,9 @@ impl APIClient {
.await?
.json(&payload);

retry::make_retryable_request(request_builder)
retry::make_retryable_request(request_builder, retry::RetryStrategy::Timeout)
.await?
.into_response()
.error_for_status()?;

Ok(())
Expand Down
3 changes: 2 additions & 1 deletion crates/turborepo-api-client/src/telemetry.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,8 +33,9 @@ impl TelemetryClient for AnonAPIClient {
.header("x-turbo-session-id", session_id)
.json(&events);

retry::make_retryable_request(telemetry_request)
retry::make_retryable_request(telemetry_request, retry::RetryStrategy::Timeout)
.await?
.into_response()
.error_for_status()?;

Ok(())
Expand Down

0 comments on commit 809acc9

Please sign in to comment.