From c76fbd742bb6dfe61162e978b5ba193dc83c7aa2 Mon Sep 17 00:00:00 2001 From: Brian Caswell Date: Mon, 1 Mar 2021 19:41:07 -0500 Subject: [PATCH 1/5] fix retry for connection issues --- src/agent/Cargo.lock | 2 +- src/agent/reqwest-retry/Cargo.toml | 4 +- src/agent/reqwest-retry/src/lib.rs | 69 ++++++++++++++++++------------ 3 files changed, 45 insertions(+), 30 deletions(-) diff --git a/src/agent/Cargo.lock b/src/agent/Cargo.lock index 56bea19efc..377a131464 100644 --- a/src/agent/Cargo.lock +++ b/src/agent/Cargo.lock @@ -2134,7 +2134,7 @@ dependencies = [ "anyhow", "async-trait", "backoff", - "log", + "onefuzz-telemetry", "reqwest", "tokio", ] diff --git a/src/agent/reqwest-retry/Cargo.toml b/src/agent/reqwest-retry/Cargo.toml index 53287f0d8d..1f90de7d75 100644 --- a/src/agent/reqwest-retry/Cargo.toml +++ b/src/agent/reqwest-retry/Cargo.toml @@ -10,7 +10,7 @@ anyhow = "1.0" async-trait = "0.1" reqwest = { version = "0.10", features = ["json", "stream"] } backoff = { version = "0.2", features = ["async-std"] } -log = "0.4" +onefuzz-telemetry = { path = "../onefuzz-telemetry" } [dev-dependencies] -tokio = { version = "0.2" } \ No newline at end of file +tokio = { version = "0.2" , features=["macros"] } \ No newline at end of file diff --git a/src/agent/reqwest-retry/src/lib.rs b/src/agent/reqwest-retry/src/lib.rs index acf1556b03..cda5ab0bf1 100644 --- a/src/agent/reqwest-retry/src/lib.rs +++ b/src/agent/reqwest-retry/src/lib.rs @@ -4,6 +4,7 @@ use anyhow::Result; use async_trait::async_trait; use backoff::{self, future::FutureOperation, ExponentialBackoff}; +use onefuzz_telemetry::warn; use reqwest::{Response, StatusCode}; use std::{ sync::atomic::{AtomicI32, Ordering}, @@ -16,30 +17,35 @@ use std::io::ErrorKind; const DEFAULT_RETRY_PERIOD: Duration = Duration::from_secs(2); const MAX_ELAPSED_TIME: Duration = Duration::from_secs(30); const MAX_RETRY_ATTEMPTS: i32 = 5; +const MAX_RETRY_ERROR_MESSAGE: &str = "Maximum number of attempts reached for this request"; -fn to_backoff_response( - result: Result, -) -> Result> { - fn is_transient_socket_error(error: &reqwest::Error) -> bool { - let source = error.source(); - if let Some(err) = source { - if let Some(io_error) = err.downcast_ref::() { - match io_error.kind() { - ErrorKind::ConnectionAborted - | ErrorKind::ConnectionReset - | ErrorKind::ConnectionRefused - | ErrorKind::TimedOut - | ErrorKind::NotConnected => return true, - _ => (), +fn is_transient_socket_error(error: &reqwest::Error) -> bool { + let source = error.source(); + if let Some(err) = source { + if let Some(io_error) = err.downcast_ref::() { + match io_error.kind() { + ErrorKind::ConnectionAborted + | ErrorKind::ConnectionReset + | ErrorKind::ConnectionRefused + | ErrorKind::TimedOut + | ErrorKind::NotConnected => { + return true; } + _ => (), } } - false } + false +} +fn to_backoff_response( + result: Result, +) -> Result> { match result { Err(error) => { - if is_transient_socket_error(&error) { + if error.is_connect() { + Err(backoff::Error::Transient(anyhow::Error::from(error))) + } else if is_transient_socket_error(&error) { Err(backoff::Error::Transient(anyhow::Error::from(error))) } else { Err(backoff::Error::Permanent(anyhow::Error::from(error))) @@ -91,12 +97,14 @@ pub async fn send_retry_reqwest< let op = || async { if counter.fetch_add(1, Ordering::SeqCst) >= max_retry { Result::>::Err(backoff::Error::Permanent( - anyhow::Error::msg("Maximum number of attempts reached for this request"), + anyhow::Error::msg(MAX_RETRY_ERROR_MESSAGE), )) } else { let request = build_request().map_err(backoff::Error::Permanent)?; let response = request.send().await; - Result::>::Ok(error_mapper(response)?) + let mapped = error_mapper(response)?; + + Result::>::Ok(mapped) } }; let result = op @@ -107,7 +115,7 @@ pub async fn send_retry_reqwest< max_elapsed_time: Some(max_elapsed_time), ..ExponentialBackoff::default() }, - |err, _| println!("Transient error: {}", err), + |err, _| warn!("transient http error: {}", err), ) .await?; Ok(result) @@ -173,17 +181,24 @@ impl SendRetry for reqwest::RequestBuilder { mod test { use super::*; - // TODO: convert to feature-gated integration test. - #[ignore] #[tokio::test] - async fn empty_stack() -> Result<()> { + async fn retry_should_fail() -> Result<()> { let resp = reqwest::Client::new() - .get("http://localhost:5000/api/testGet") - .send_retry_default() - .await?; - println!("{:?}", resp); + .get("http://localhost:81/test.txt") + .send_retry( + Duration::from_secs(1), + Duration::from_secs(3), + 3i32, + to_backoff_response, + ) + .await; + + assert!(resp.is_err()); + if let Err(err) = &resp { + let as_text = format!("{}", err); + assert!(as_text.contains("Maximum number of attempts reached for this request")); + } - assert!(resp.error_for_status().is_err()); Ok(()) } } From d20f16d13b35c3081e5370466542eeae5a88efd4 Mon Sep 17 00:00:00 2001 From: Brian Caswell Date: Mon, 1 Mar 2021 19:46:59 -0500 Subject: [PATCH 2/5] add success test --- src/agent/reqwest-retry/src/lib.rs | 10 ++++++++++ 1 file changed, 10 insertions(+) diff --git a/src/agent/reqwest-retry/src/lib.rs b/src/agent/reqwest-retry/src/lib.rs index cda5ab0bf1..cf320ca526 100644 --- a/src/agent/reqwest-retry/src/lib.rs +++ b/src/agent/reqwest-retry/src/lib.rs @@ -181,6 +181,16 @@ impl SendRetry for reqwest::RequestBuilder { mod test { use super::*; + #[tokio::test] + async fn retry_should_pass() -> Result<()> { + reqwest::Client::new() + .get("https://www.microsoft.com") + .send_retry_default() + .await?.error_for_status()?; + + Ok(()) + } + #[tokio::test] async fn retry_should_fail() -> Result<()> { let resp = reqwest::Client::new() From 0ef93b64b3d24f8877f18f241e8b7ce4edb15ddc Mon Sep 17 00:00:00 2001 From: Brian Caswell Date: Mon, 1 Mar 2021 19:53:59 -0500 Subject: [PATCH 3/5] cargo fmt --- src/agent/reqwest-retry/src/lib.rs | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/src/agent/reqwest-retry/src/lib.rs b/src/agent/reqwest-retry/src/lib.rs index cf320ca526..ede4546d5c 100644 --- a/src/agent/reqwest-retry/src/lib.rs +++ b/src/agent/reqwest-retry/src/lib.rs @@ -186,7 +186,8 @@ mod test { reqwest::Client::new() .get("https://www.microsoft.com") .send_retry_default() - .await?.error_for_status()?; + .await? + .error_for_status()?; Ok(()) } From 963a48e4ce767bcc6ce974b32affae469a6bc75a Mon Sep 17 00:00:00 2001 From: Brian Caswell Date: Mon, 1 Mar 2021 20:01:10 -0500 Subject: [PATCH 4/5] address suggestions --- src/agent/reqwest-retry/src/lib.rs | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/src/agent/reqwest-retry/src/lib.rs b/src/agent/reqwest-retry/src/lib.rs index ede4546d5c..b074001f68 100644 --- a/src/agent/reqwest-retry/src/lib.rs +++ b/src/agent/reqwest-retry/src/lib.rs @@ -194,8 +194,9 @@ mod test { #[tokio::test] async fn retry_should_fail() -> Result<()> { + let invalid_url = "http://localhost:81/test.txt"; let resp = reqwest::Client::new() - .get("http://localhost:81/test.txt") + .get(invalid_url) .send_retry( Duration::from_secs(1), Duration::from_secs(3), @@ -204,10 +205,11 @@ mod test { ) .await; - assert!(resp.is_err()); if let Err(err) = &resp { let as_text = format!("{}", err); assert!(as_text.contains("Maximum number of attempts reached for this request")); + } else { + bail!("response to {} was expected to fail", invalid_url); } Ok(()) From 8e1d2b4dc4bf15f4e8ac21fec12f0da19b96a31f Mon Sep 17 00:00:00 2001 From: Brian Caswell Date: Tue, 2 Mar 2021 10:32:14 -0500 Subject: [PATCH 5/5] clippy --- src/agent/reqwest-retry/src/lib.rs | 4 +--- 1 file changed, 1 insertion(+), 3 deletions(-) diff --git a/src/agent/reqwest-retry/src/lib.rs b/src/agent/reqwest-retry/src/lib.rs index b074001f68..019f32d75e 100644 --- a/src/agent/reqwest-retry/src/lib.rs +++ b/src/agent/reqwest-retry/src/lib.rs @@ -43,9 +43,7 @@ fn to_backoff_response( ) -> Result> { match result { Err(error) => { - if error.is_connect() { - Err(backoff::Error::Transient(anyhow::Error::from(error))) - } else if is_transient_socket_error(&error) { + if error.is_connect() || is_transient_socket_error(&error) { Err(backoff::Error::Transient(anyhow::Error::from(error))) } else { Err(backoff::Error::Permanent(anyhow::Error::from(error)))