Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

add two retry strategies to allow requests to timeout gracefully #8080

Merged
merged 1 commit into from
May 21, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions crates/turborepo-api-client/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ rustls-tls = ["reqwest/rustls-tls-native-roots"]

[dev-dependencies]
http = "0.2.9"
httpmock = { workspace = true }
port_scanner = { workspace = true }
test-case = { workspace = true }
turborepo-vercel-api-mock = { workspace = true }
Expand Down
3 changes: 2 additions & 1 deletion crates/turborepo-api-client/src/analytics.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,8 +25,9 @@ impl AnalyticsClient for APIClient {
.await?
.json(&events);

retry::make_retryable_request(request_builder)
retry::make_retryable_request(request_builder, retry::RetryStrategy::Timeout)
.await?
.into_response()
.error_for_status()?;

Ok(())
Expand Down
64 changes: 44 additions & 20 deletions crates/turborepo-api-client/src/lib.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
#![feature(async_closure)]
#![feature(error_generic_member_access)]
#![feature(assert_matches)]
#![deny(clippy::all)]

use std::{backtrace::Backtrace, env, future::Future, time::Duration};
Expand Down Expand Up @@ -134,9 +135,11 @@ impl Client for APIClient {
.header("User-Agent", self.user_agent.clone())
.header("Authorization", format!("Bearer {}", token))
.header("Content-Type", "application/json");
let response = retry::make_retryable_request(request_builder)
.await?
.error_for_status()?;
let response =
retry::make_retryable_request(request_builder, retry::RetryStrategy::Timeout)
.await?
.into_response()
.error_for_status()?;

Ok(response.json().await?)
}
Expand All @@ -149,9 +152,11 @@ impl Client for APIClient {
.header("Content-Type", "application/json")
.header("Authorization", format!("Bearer {}", token));

let response = retry::make_retryable_request(request_builder)
.await?
.error_for_status()?;
let response =
retry::make_retryable_request(request_builder, retry::RetryStrategy::Timeout)
.await?
.into_response()
.error_for_status()?;

Ok(response.json().await?)
}
Expand Down Expand Up @@ -194,9 +199,11 @@ impl Client for APIClient {
.header("Content-Type", "application/json")
.header("Authorization", format!("Bearer {}", token));

let response = retry::make_retryable_request(request_builder)
.await?
.error_for_status()?;
let response =
retry::make_retryable_request(request_builder, retry::RetryStrategy::Timeout)
.await?
.into_response()
.error_for_status()?;

Ok(response.json().await?)
}
Expand All @@ -208,9 +215,11 @@ impl Client for APIClient {
.query(&[("token", token), ("tokenName", token_name)])
.header("User-Agent", self.user_agent.clone());

let response = retry::make_retryable_request(request_builder)
.await?
.error_for_status()?;
let response =
retry::make_retryable_request(request_builder, retry::RetryStrategy::Timeout)
.await?
.into_response()
.error_for_status()?;

let verification_response: VerificationResponse = response.json().await?;

Expand Down Expand Up @@ -310,7 +319,9 @@ impl CacheClient for APIClient {

request_builder = Self::add_team_params(request_builder, team_id, team_slug);

let response = retry::make_retryable_request(request_builder).await?;
let response =
retry::make_retryable_request(request_builder, retry::RetryStrategy::Timeout).await?;
let response = response.into_response();

match response.status() {
StatusCode::FORBIDDEN => Err(Self::handle_403(response).await),
Expand Down Expand Up @@ -391,7 +402,10 @@ impl CacheClient for APIClient {
request_builder = request_builder.header("x-artifact-tag", tag);
}

let response = retry::make_retryable_request(request_builder).await?;
let response =
retry::make_retryable_request(request_builder, retry::RetryStrategy::Connection)
.await?
.into_response();

if response.status() == StatusCode::FORBIDDEN {
return Err(Self::handle_403(response).await);
Expand All @@ -416,9 +430,11 @@ impl CacheClient for APIClient {

let request_builder = Self::add_team_params(request_builder, team_id, team_slug);

let response = retry::make_retryable_request(request_builder)
.await?
.error_for_status()?;
let response =
retry::make_retryable_request(request_builder, retry::RetryStrategy::Timeout)
.await?
.into_response()
.error_for_status()?;

Ok(response.json().await?)
}
Expand Down Expand Up @@ -451,7 +467,9 @@ impl TokenClient for APIClient {
invalid_token: bool,
}

let response = retry::make_retryable_request(request_builder).await?;
let response =
retry::make_retryable_request(request_builder, retry::RetryStrategy::Timeout).await?;
let response = response.into_response();
let status = response.status();
// Give a better error message for invalid tokens. This endpoint returns the
// following statuses:
Expand Down Expand Up @@ -503,7 +521,10 @@ impl TokenClient for APIClient {
invalid_token: bool,
}

let response = retry::make_retryable_request(request_builder).await?;
let response =
retry::make_retryable_request(request_builder, retry::RetryStrategy::Timeout)
.await?
.into_response();
let status = response.status();
// Give a better error message for invalid tokens. This endpoint returns the
// following statuses:
Expand Down Expand Up @@ -604,7 +625,10 @@ impl APIClient {
.header("Access-Control-Request-Headers", request_headers)
.header("Authorization", format!("Bearer {}", token));

let response = retry::make_retryable_request(request_builder).await?;
let response =
retry::make_retryable_request(request_builder, retry::RetryStrategy::Timeout)
.await?
.into_response();

let headers = response.headers();
let location = if let Some(location) = headers.get("Location") {
Expand Down
129 changes: 118 additions & 11 deletions crates/turborepo-api-client/src/retry.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,30 @@ const MIN_SLEEP_TIME_SECS: u64 = 2;
const MAX_SLEEP_TIME_SECS: u64 = 10;
const RETRY_MAX: u32 = 2;

#[derive(Debug)]
pub enum Retry {
Once(Response),
#[allow(dead_code)]
Retried(Response, u32),
}

impl Retry {
pub fn into_response(self) -> Response {
match self {
Retry::Once(response) => response,
Retry::Retried(response, _) => response,
}
}

#[allow(dead_code)]
pub fn retry_count(&self) -> Option<u32> {
match self {
Retry::Once(_) => None,
Retry::Retried(_, count) => Some(*count),
}
}
}

/// Retries a request until `RETRY_MAX` is reached, the `should_retry_request`
/// function returns false, or the future succeeds. Uses an exponential backoff
/// with a base of 2 to delay between retries.
Expand All @@ -15,11 +39,13 @@ const RETRY_MAX: u32 = 2;
///
/// * `request_builder`: The request builder with everything, i.e. headers and
/// body already set. NOTE: This must be cloneable, so no streams are allowed.
/// * `strategy`: The strategy to use for retrying requests.
///
/// returns: Result<Response, Error>
pub(crate) async fn make_retryable_request(
request_builder: RequestBuilder,
) -> Result<Response, Error> {
strategy: RetryStrategy,
) -> Result<Retry, Error> {
let mut last_error = None;
for retry_count in 0..RETRY_MAX {
// A request builder can fail to clone for two reasons:
Expand All @@ -28,12 +54,12 @@ pub(crate) async fn make_retryable_request(
// - the request body is a stream, in this case we'll just send the one request
// we have
let Some(builder) = request_builder.try_clone() else {
return Ok(request_builder.send().await?);
return Ok(Retry::Once(request_builder.send().await?));
};
match builder.send().await {
Ok(value) => return Ok(value),
Ok(value) => return Ok(Retry::Retried(value, retry_count)),
Err(err) => {
if !should_retry_request(&err) {
if !strategy.should_retry(&err) {
return Err(err.into());
}
last_error = Some(err);
Expand All @@ -49,16 +75,97 @@ pub(crate) async fn make_retryable_request(
Err(Error::TooManyFailures(Box::new(last_error.unwrap())))
}

fn should_retry_request(error: &reqwest::Error) -> bool {
if let Some(status) = error.status() {
if status == StatusCode::TOO_MANY_REQUESTS {
return true;
/// A retry strategy. Note that error statuses and TOO_MANY_REQUESTS are always
/// retried.
pub enum RetryStrategy {
/// Retry in the case of connection issues, but ignore timeouts.
Connection,
/// Retry in the case of connection issues and timeouts.
Timeout,
}

impl RetryStrategy {
fn should_retry(&self, error: &reqwest::Error) -> bool {
if let Some(status) = error.status() {
if status == StatusCode::TOO_MANY_REQUESTS {
return true;
}

if status.as_u16() >= 500 && status.as_u16() != 501 {
return true;
}
}

if status.as_u16() >= 500 && status.as_u16() != 501 {
return true;
match self {
RetryStrategy::Connection => error.is_connect(),
RetryStrategy::Timeout => error.is_timeout(),
}
}
}

error.is_request() || error.is_timeout()
#[cfg(test)]
mod test {
use std::{assert_matches::assert_matches, time::Duration};

use crate::{
retry::{make_retryable_request, RetryStrategy},
Error,
};

#[tokio::test]
async fn handles_too_many_failures() {
let mock = httpmock::MockServer::start_async().await;
let req = mock
.mock_async(|when, then| {
when.method(httpmock::Method::GET);
then.delay(Duration::from_secs(100));
})
.await;

let request_builder = reqwest::Client::new()
.get(mock.url("/"))
.timeout(Duration::from_millis(10));
let result = make_retryable_request(request_builder, RetryStrategy::Timeout).await;

req.assert_hits_async(2).await;
assert_matches!(result, Err(Error::TooManyFailures(_)));
}

#[tokio::test]
async fn handles_connection_timeout() {
let client = reqwest::Client::builder()
.connect_timeout(Duration::from_millis(10))
.build()
.unwrap();

let request_builder = client.get("http://localhost:1").send().await; // bad port
let should_retry = RetryStrategy::Connection.should_retry(&request_builder.unwrap_err());

assert_matches!(should_retry, true);
}

#[tokio::test]
async fn handles_connection_timeout_retries() {
let client = reqwest::Client::builder()
.timeout(Duration::from_millis(20))
.connect_timeout(Duration::from_millis(10))
.build()
.unwrap();

let mock = httpmock::MockServer::start_async().await;
let req = mock
.mock_async(|when, then| {
when.method(httpmock::Method::GET);
then.delay(Duration::from_secs(100));
})
.await;

let request_builder = client.get(mock.url("/")); // bad port
let result = make_retryable_request(request_builder, RetryStrategy::Connection).await;

// we should make at most one request and give up if it times out after
// connecting
assert_matches!(result, Err(_));
req.assert_hits_async(1).await;
}
}
14 changes: 9 additions & 5 deletions crates/turborepo-api-client/src/spaces.rs
Original file line number Diff line number Diff line change
Expand Up @@ -152,9 +152,11 @@ impl APIClient {
.await?
.json(&payload);

let response = retry::make_retryable_request(request_builder)
.await?
.error_for_status()?;
let response =
retry::make_retryable_request(request_builder, retry::RetryStrategy::Timeout)
.await?
.into_response()
.error_for_status()?;

Ok(response.json().await?)
}
Expand All @@ -176,8 +178,9 @@ impl APIClient {
.await?
.json(&task);

retry::make_retryable_request(request_builder)
retry::make_retryable_request(request_builder, retry::RetryStrategy::Timeout)
.await?
.into_response()
.error_for_status()?;

Ok(())
Expand All @@ -201,8 +204,9 @@ impl APIClient {
.await?
.json(&payload);

retry::make_retryable_request(request_builder)
retry::make_retryable_request(request_builder, retry::RetryStrategy::Timeout)
.await?
.into_response()
.error_for_status()?;

Ok(())
Expand Down
3 changes: 2 additions & 1 deletion crates/turborepo-api-client/src/telemetry.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,8 +33,9 @@ impl TelemetryClient for AnonAPIClient {
.header("x-turbo-session-id", session_id)
.json(&events);

retry::make_retryable_request(telemetry_request)
retry::make_retryable_request(telemetry_request, retry::RetryStrategy::Timeout)
.await?
.into_response()
.error_for_status()?;

Ok(())
Expand Down
Loading