From 17de157e5bd94867df3a18195099da79624f1716 Mon Sep 17 00:00:00 2001 From: Nicholas Yang Date: Wed, 31 May 2023 10:07:07 -0400 Subject: [PATCH] refactor(turborepo): API Client Cleanup (#5084) ### Description I had this inside the HTTP cache PR, but it makes sense as a separate PR. Refactors the API client in the following ways: - Changes the retry logic to use try_clone from reqwest instead of the awkward (and unnecessarily general) `retry_future` function that relied on a closure that generates a Rust future. - Use a crate Error type with thiserror instead of anyhow. This is pretty standard. - Factor out some common logic into helper functions ### Testing Instructions Existing tests should suffice. Co-authored-by: --global --- Cargo.lock | 2 + crates/turborepo-api-client/Cargo.toml | 2 + crates/turborepo-api-client/src/error.rs | 34 ++++ crates/turborepo-api-client/src/lib.rs | 212 ++++++++--------------- crates/turborepo-api-client/src/retry.rs | 56 +++--- crates/turborepo-lib/src/commands/mod.rs | 2 +- 6 files changed, 143 insertions(+), 165 deletions(-) create mode 100644 crates/turborepo-api-client/src/error.rs diff --git a/Cargo.lock b/Cargo.lock index 3eb0f22533653..90533aa66df38 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -9309,7 +9309,9 @@ dependencies = [ "reqwest", "rustc_version_runtime", "serde", + "thiserror", "tokio", + "url", ] [[package]] diff --git a/crates/turborepo-api-client/Cargo.toml b/crates/turborepo-api-client/Cargo.toml index 42e0c7f66315e..1c324109661f0 100644 --- a/crates/turborepo-api-client/Cargo.toml +++ b/crates/turborepo-api-client/Cargo.toml @@ -15,4 +15,6 @@ chrono = { workspace = true, features = ["serde"] } reqwest = { workspace = true, features = ["json"] } rustc_version_runtime = "0.2.1" serde = { workspace = true } +thiserror = { workspace = true } tokio = { workspace = true } +url = { workspace = true } diff --git a/crates/turborepo-api-client/src/error.rs b/crates/turborepo-api-client/src/error.rs new file mode 100644 index 0000000000000..8b4e45f74529e --- /dev/null +++ b/crates/turborepo-api-client/src/error.rs @@ -0,0 +1,34 @@ +use std::backtrace::Backtrace; + +use reqwest::header::ToStrError; +use thiserror::Error; + +use crate::CachingStatus; + +#[derive(Debug, Error)] +pub enum Error { + #[error("Error making HTTP request: {0}")] + ReqwestError(#[from] reqwest::Error), + #[error("skipping HTTP Request, too many failures have occurred.\nLast error: {0}")] + TooManyFailures(#[from] Box), + #[error("Error parsing header: {0}")] + InvalidHeader(#[from] ToStrError), + #[error("Error parsing URL: {0}")] + InvalidUrl(#[from] url::ParseError), + #[error("unknown caching status: {0}")] + UnknownCachingStatus(String, #[backtrace] Backtrace), + #[error("unknown status {code}: {message}")] + UnknownStatus { + code: String, + message: String, + #[backtrace] + backtrace: Backtrace, + }, + #[error("{message}")] + CacheDisabled { + status: CachingStatus, + message: String, + }, +} + +pub type Result = std::result::Result; diff --git a/crates/turborepo-api-client/src/lib.rs b/crates/turborepo-api-client/src/lib.rs index 30c50027713af..71dfc8ffd672d 100644 --- a/crates/turborepo-api-client/src/lib.rs +++ b/crates/turborepo-api-client/src/lib.rs @@ -1,11 +1,15 @@ -use std::{env, future::Future}; +#![feature(async_closure)] +#![feature(provide_any)] +#![feature(error_generic_member_access)] -use anyhow::{anyhow, Result}; -use reqwest::StatusCode; +use std::env; + +use reqwest::RequestBuilder; use serde::{Deserialize, Serialize}; -use crate::retry::retry_future; +pub use crate::error::{Error, Result}; +mod error; mod retry; #[derive(Debug, Clone, Deserialize)] @@ -115,54 +119,33 @@ pub struct APIClient { impl APIClient { pub async fn get_user(&self, token: &str) -> Result { - let response = self - .make_retryable_request(|| { - let url = self.make_url("/v2/user"); - let request_builder = self - .client - .get(url) - .header("User-Agent", self.user_agent.clone()) - .header("Authorization", format!("Bearer {}", token)) - .header("Content-Type", "application/json"); - - request_builder.send() - }) + let url = self.make_url("/v2/user"); + let request_builder = self + .client + .get(url) + .header("User-Agent", self.user_agent.clone()) + .header("Authorization", format!("Bearer {}", token)) + .header("Content-Type", "application/json"); + let response = retry::make_retryable_request(request_builder) .await? .error_for_status()?; - response.json().await.map_err(|err| { - anyhow!( - "Error getting user: {}", - err.status() - .and_then(|status| status.canonical_reason()) - .unwrap_or(&err.to_string()) - ) - }) + Ok(response.json().await?) } pub async fn get_teams(&self, token: &str) -> Result { - let response = self - .make_retryable_request(|| { - let request_builder = self - .client - .get(self.make_url("/v2/teams?limit=100")) - .header("User-Agent", self.user_agent.clone()) - .header("Content-Type", "application/json") - .header("Authorization", format!("Bearer {}", token)); - - request_builder.send() - }) + let request_builder = self + .client + .get(self.make_url("/v2/teams?limit=100")) + .header("User-Agent", self.user_agent.clone()) + .header("Content-Type", "application/json") + .header("Authorization", format!("Bearer {}", token)); + + let response = retry::make_retryable_request(request_builder) .await? .error_for_status()?; - response.json().await.map_err(|err| { - anyhow!( - "Error getting teams: {}", - err.status() - .and_then(|status| status.canonical_reason()) - .unwrap_or(&err.to_string()) - ) - }) + Ok(response.json().await?) } pub async fn get_team(&self, token: &str, team_id: &str) -> Result> { @@ -177,14 +160,22 @@ impl APIClient { .await? .error_for_status()?; - response.json().await.map_err(|err| { - anyhow!( - "Error getting team: {}", - err.status() - .and_then(|status| status.canonical_reason()) - .unwrap_or(&err.to_string()) - ) - }) + Ok(response.json().await?) + } + + fn add_team_params( + mut request_builder: RequestBuilder, + team_id: &str, + team_slug: Option<&str>, + ) -> RequestBuilder { + if let Some(slug) = team_slug { + request_builder = request_builder.query(&[("teamSlug", slug)]); + } + if team_id.starts_with("team_") { + request_builder = request_builder.query(&[("teamId", team_id)]); + } + + request_builder } pub async fn get_caching_status( @@ -193,35 +184,20 @@ impl APIClient { team_id: &str, team_slug: Option<&str>, ) -> Result { - let response = self - .make_retryable_request(|| { - let mut request_builder = self - .client - .get(self.make_url("/v8/artifacts/status")) - .header("User-Agent", self.user_agent.clone()) - .header("Content-Type", "application/json") - .header("Authorization", format!("Bearer {}", token)); - - if let Some(slug) = team_slug { - request_builder = request_builder.query(&[("teamSlug", slug)]); - } - if team_id.starts_with("team_") { - request_builder = request_builder.query(&[("teamId", team_id)]); - } - - request_builder.send() - }) + let request_builder = self + .client + .get(self.make_url("/v8/artifacts/status")) + .header("User-Agent", self.user_agent.clone()) + .header("Content-Type", "application/json") + .header("Authorization", format!("Bearer {}", token)); + + let request_builder = Self::add_team_params(request_builder, team_id, team_slug); + + let response = retry::make_retryable_request(request_builder) .await? .error_for_status()?; - response.json().await.map_err(|err| { - anyhow!( - "Error getting caching status: {}", - err.status() - .and_then(|status| status.canonical_reason()) - .unwrap_or(&err.to_string()) - ) - }) + Ok(response.json().await?) } pub async fn get_spaces(&self, token: &str, team_id: Option<&str>) -> Result { @@ -231,84 +207,40 @@ impl APIClient { None => "/v0/spaces?limit=100".to_string(), }; - let response = self - .make_retryable_request(|| { - let request_builder = self - .client - .get(self.make_url(endpoint.as_str())) - .header("User-Agent", self.user_agent.clone()) - .header("Content-Type", "application/json") - .header("Authorization", format!("Bearer {}", token)); - - request_builder.send() - }) + let request_builder = self + .client + .get(self.make_url(endpoint.as_str())) + .header("User-Agent", self.user_agent.clone()) + .header("Content-Type", "application/json") + .header("Authorization", format!("Bearer {}", token)); + + let response = retry::make_retryable_request(request_builder) .await? .error_for_status()?; - response.json().await.map_err(|err| { - anyhow!( - "Error getting spaces: {}", - err.status() - .and_then(|status| status.canonical_reason()) - .unwrap_or(&err.to_string()) - ) - }) + Ok(response.json().await?) } pub async fn verify_sso_token(&self, token: &str, token_name: &str) -> Result { - let response = self - .make_retryable_request(|| { - let request_builder = self - .client - .get(self.make_url("/registration/verify")) - .query(&[("token", token), ("tokenName", token_name)]) - .header("User-Agent", self.user_agent.clone()); - - request_builder.send() - }) + let request_builder = self + .client + .get(self.make_url("/registration/verify")) + .query(&[("token", token), ("tokenName", token_name)]) + .header("User-Agent", self.user_agent.clone()); + + let response = retry::make_retryable_request(request_builder) .await? .error_for_status()?; - let verification_response: VerificationResponse = response.json().await.map_err(|err| { - anyhow!( - "Error verifying token: {}", - err.status() - .and_then(|status| status.canonical_reason()) - .unwrap_or(&err.to_string()) - ) - })?; + let verification_response: VerificationResponse = response.json().await?; + Ok(VerifiedSsoUser { token: verification_response.token, team_id: verification_response.team_id, }) } - const RETRY_MAX: u32 = 2; - - async fn make_retryable_request< - F: Future>, - >( - &self, - request_builder: impl Fn() -> F, - ) -> Result { - retry_future(Self::RETRY_MAX, request_builder, Self::should_retry_request).await - } - - fn should_retry_request(error: &reqwest::Error) -> bool { - if let Some(status) = error.status() { - if status == StatusCode::TOO_MANY_REQUESTS { - return true; - } - - if status.as_u16() >= 500 && status.as_u16() != 501 { - return true; - } - } - - false - } - - pub fn new(base_url: impl AsRef, timeout: u64, version: &'static str) -> Result { + pub fn new(base_url: impl AsRef, timeout: u64, version: &str) -> Result { let client = if timeout != 0 { reqwest::Client::builder() .timeout(std::time::Duration::from_secs(timeout)) diff --git a/crates/turborepo-api-client/src/retry.rs b/crates/turborepo-api-client/src/retry.rs index 359f231b534c4..c9bf8da9f9154 100644 --- a/crates/turborepo-api-client/src/retry.rs +++ b/crates/turborepo-api-client/src/retry.rs @@ -1,35 +1,32 @@ -use std::future::Future; - -use anyhow::anyhow; +use reqwest::{RequestBuilder, Response, StatusCode}; use tokio::time::sleep; +use crate::Error; + const MIN_SLEEP_TIME_SECS: u64 = 2; const MAX_SLEEP_TIME_SECS: u64 = 10; +const RETRY_MAX: u32 = 2; -/// Retries a future until `max_retries` is reached, the `should_retry` function -/// returns false, or the future succeeds. Uses an exponential backoff with a -/// base of 2 to delay between retries. +/// Retries a request until `RETRY_MAX` is reached, the `should_retry_request` +/// function returns false, or the future succeeds. Uses an exponential backoff +/// with a base of 2 to delay between retries. /// /// # Arguments /// -/// * `max_retries`: Maximum number of retries -/// * `future_generator`: Function to call to generate the future for each retry -/// * `should_retry`: Determines if a retry should be attempted based on the -/// error +/// * `request_builder`: The request builder with everything, i.e. headers and +/// body already set. NOTE: This must be cloneable, so no streams are allowed. /// -/// returns: Result -pub async fn retry_future, F: Future>>( - max_retries: u32, - future_generator: impl Fn() -> F, - should_retry: impl Fn(&E) -> bool, -) -> Result { +/// returns: Result +pub(crate) async fn make_retryable_request( + request_builder: RequestBuilder, +) -> Result { let mut last_error = None; - for retry_count in 0..max_retries { - let future = future_generator(); - match future.await { + for retry_count in 0..RETRY_MAX { + let builder = request_builder.try_clone().expect("cannot clone request"); + match builder.send().await { Ok(value) => return Ok(value), Err(err) => { - if !should_retry(&err) { + if !should_retry_request(&err) { return Err(err.into()); } last_error = Some(err); @@ -42,8 +39,19 @@ pub async fn retry_future, F: Future bool { + if let Some(status) = error.status() { + if status == StatusCode::TOO_MANY_REQUESTS { + return true; + } + + if status.as_u16() >= 500 && status.as_u16() != 501 { + return true; + } + } + + false } diff --git a/crates/turborepo-lib/src/commands/mod.rs b/crates/turborepo-lib/src/commands/mod.rs index 1214583c2ef82..d8adadfe95376 100644 --- a/crates/turborepo-lib/src/commands/mod.rs +++ b/crates/turborepo-lib/src/commands/mod.rs @@ -147,7 +147,7 @@ impl CommandBase { let api_url = repo_config.api_url(); let timeout = client_config.remote_cache_timeout(); - APIClient::new(api_url, timeout, self.version) + Ok(APIClient::new(api_url, timeout, self.version)?) } pub fn daemon_file_root(&self) -> turbopath::AbsoluteSystemPathBuf {