From 9fc8be114f635067cc6890092b92e4e9f023ceeb Mon Sep 17 00:00:00 2001 From: Brian Caswell Date: Fri, 2 Apr 2021 19:36:37 -0400 Subject: [PATCH 1/3] fix checking status codes for failure --- src/agent/reqwest-retry/src/lib.rs | 43 +++++++++++++++++++++++------- 1 file changed, 33 insertions(+), 10 deletions(-) diff --git a/src/agent/reqwest-retry/src/lib.rs b/src/agent/reqwest-retry/src/lib.rs index 4534f416b7..360891b826 100644 --- a/src/agent/reqwest-retry/src/lib.rs +++ b/src/agent/reqwest-retry/src/lib.rs @@ -46,14 +46,20 @@ pub async fn send_retry_reqwest Result + Sen } } Ok(x) => { - if x.status().is_success() { - Ok(x) - } else { - let fail_fast = fail_fast_status.as_ref().contains(&x.status()); - if attempt_count >= max_retry || fail_fast { - Err(backoff::Error::Permanent(Ok(x))) - } else { - Err(backoff::Error::Transient(Ok(x))) + let status = x.status(); + let response = x + .error_for_status() + .with_context(|| format!("request attempt {} failed", attempt_count + 1,)); + + match response { + Ok(x) => Ok(x), + Err(as_err) => { + let fail_fast = fail_fast_status.as_ref().contains(&status); + if attempt_count >= max_retry || fail_fast { + Err(backoff::Error::Permanent(Err(as_err))) + } else { + Err(backoff::Error::Transient(Err(as_err))) + } } } } @@ -130,11 +136,10 @@ mod test { #[tokio::test] async fn retry_should_pass() -> Result<()> { reqwest::Client::new() - .get("https://www.microsoft.com") + .get("https://httpstat.us/200") .send_retry_default() .await? .error_for_status()?; - Ok(()) } @@ -155,4 +160,22 @@ mod test { Ok(()) } + + #[tokio::test] + async fn retry_should_fail_404() -> Result<()> { + let invalid_url = "https://httpstat.us/400"; + let resp = reqwest::Client::new() + .get(invalid_url) + .send_retry(vec![], Duration::from_millis(1), 1) + .await; + + if let Err(err) = &resp { + let as_text = format!("{:?}", err); + assert!(as_text.contains("request attempt 2 failed")); + } else { + anyhow::bail!("response to {} was expected to fail", invalid_url); + } + + Ok(()) + } } From c355cba7cb30643d4c69ed7ddde3ba5822cbba03 Mon Sep 17 00:00:00 2001 From: Brian Caswell Date: Fri, 2 Apr 2021 20:39:25 -0400 Subject: [PATCH 2/3] added extra_success_status --- .../onefuzz-supervisor/src/coordinator.rs | 1 + src/agent/onefuzz/src/syncdir.rs | 1 + src/agent/onefuzz/src/uploader.rs | 1 + src/agent/reqwest-retry/src/lib.rs | 41 +++++++++++++++++-- 4 files changed, 40 insertions(+), 4 deletions(-) diff --git a/src/agent/onefuzz-supervisor/src/coordinator.rs b/src/agent/onefuzz-supervisor/src/coordinator.rs index d34ce6dfa4..f9dc9d281d 100644 --- a/src/agent/onefuzz-supervisor/src/coordinator.rs +++ b/src/agent/onefuzz-supervisor/src/coordinator.rs @@ -237,6 +237,7 @@ impl Coordinator { let mut response = request .send_retry( vec![StatusCode::UNAUTHORIZED], + vec![], DEFAULT_RETRY_PERIOD, MAX_RETRY_ATTEMPTS, ) diff --git a/src/agent/onefuzz/src/syncdir.rs b/src/agent/onefuzz/src/syncdir.rs index bcbb29a996..9900dec3f3 100644 --- a/src/agent/onefuzz/src/syncdir.rs +++ b/src/agent/onefuzz/src/syncdir.rs @@ -143,6 +143,7 @@ impl SyncedDir { // https://docs.microsoft.com/en-us/rest/api/storageservices/specifying-conditional-headers-for-blob-service-operations .header("If-None-Match", "*") .send_retry( + vec![], vec![StatusCode::CONFLICT], DEFAULT_RETRY_PERIOD, MAX_RETRY_ATTEMPTS, diff --git a/src/agent/onefuzz/src/uploader.rs b/src/agent/onefuzz/src/uploader.rs index 2d6655d681..3bc9795f45 100644 --- a/src/agent/onefuzz/src/uploader.rs +++ b/src/agent/onefuzz/src/uploader.rs @@ -48,6 +48,7 @@ impl BlobUploader { .head(url.clone()) .send_retry( vec![reqwest::StatusCode::NOT_FOUND], + vec![], DEFAULT_RETRY_PERIOD, MAX_RETRY_ATTEMPTS, ) diff --git a/src/agent/reqwest-retry/src/lib.rs b/src/agent/reqwest-retry/src/lib.rs index 360891b826..efca7ec277 100644 --- a/src/agent/reqwest-retry/src/lib.rs +++ b/src/agent/reqwest-retry/src/lib.rs @@ -19,12 +19,20 @@ pub async fn send_retry_reqwest_default< >( build_request: F, ) -> Result { - send_retry_reqwest(build_request, [], DEFAULT_RETRY_PERIOD, MAX_RETRY_ATTEMPTS).await + send_retry_reqwest( + build_request, + [], + [], + DEFAULT_RETRY_PERIOD, + MAX_RETRY_ATTEMPTS, + ) + .await } pub async fn send_retry_reqwest Result + Send + Sync>( build_request: F, fail_fast_status: impl AsRef<[StatusCode]>, + extra_success_status: impl AsRef<[StatusCode]>, retry_period: Duration, max_retry: usize, ) -> Result { @@ -47,6 +55,10 @@ pub async fn send_retry_reqwest Result + Sen } Ok(x) => { let status = x.status(); + if extra_success_status.as_ref().contains(&status) { + return Ok(x); + } + let response = x .error_for_status() .with_context(|| format!("request attempt {} failed", attempt_count + 1,)); @@ -94,6 +106,7 @@ pub trait SendRetry { async fn send_retry( self, fail_fast_status: Vec, + extra_success_status: Vec, retry_period: Duration, max_retry: usize, ) -> Result; @@ -103,13 +116,14 @@ pub trait SendRetry { #[async_trait] impl SendRetry for reqwest::RequestBuilder { async fn send_retry_default(self) -> Result { - self.send_retry(vec![], DEFAULT_RETRY_PERIOD, MAX_RETRY_ATTEMPTS) + self.send_retry(vec![], vec![], DEFAULT_RETRY_PERIOD, MAX_RETRY_ATTEMPTS) .await } async fn send_retry( self, fail_fast_status: Vec, + extra_success_status: Vec, retry_period: Duration, max_retry: usize, ) -> Result { @@ -120,6 +134,7 @@ impl SendRetry for reqwest::RequestBuilder { }) }, fail_fast_status, + extra_success_status, retry_period, max_retry, ) @@ -148,7 +163,7 @@ mod test { let invalid_url = "http://127.0.0.1:81/test.txt"; let resp = reqwest::Client::new() .get(invalid_url) - .send_retry(vec![], Duration::from_millis(1), 3) + .send_retry(vec![], vec![], Duration::from_millis(1), 3) .await; if let Err(err) = &resp { @@ -166,7 +181,7 @@ mod test { let invalid_url = "https://httpstat.us/400"; let resp = reqwest::Client::new() .get(invalid_url) - .send_retry(vec![], Duration::from_millis(1), 1) + .send_retry(vec![], vec![], Duration::from_millis(1), 1) .await; if let Err(err) = &resp { @@ -178,4 +193,22 @@ mod test { Ok(()) } + + #[tokio::test] + async fn extra_error_success() -> Result<()> { + let invalid_url = "https://httpstat.us/400"; + let resp = reqwest::Client::new() + .get(invalid_url) + .send_retry( + vec![], + vec![StatusCode::BAD_REQUEST], + Duration::from_millis(1), + 1, + ) + .await?; + + assert_eq!(resp.status(), StatusCode::BAD_REQUEST); + + Ok(()) + } } From 0c9c5b91d7e4e955ec8f69ce4f42ca92f59aba66 Mon Sep 17 00:00:00 2001 From: Brian Caswell Date: Fri, 2 Apr 2021 23:04:23 -0400 Subject: [PATCH 3/3] address feedback --- .../onefuzz-supervisor/src/coordinator.rs | 8 +- src/agent/onefuzz/src/syncdir.rs | 8 +- src/agent/onefuzz/src/uploader.rs | 26 +-- src/agent/reqwest-retry/src/lib.rs | 177 ++++++++++++------ 4 files changed, 146 insertions(+), 73 deletions(-) diff --git a/src/agent/onefuzz-supervisor/src/coordinator.rs b/src/agent/onefuzz-supervisor/src/coordinator.rs index f9dc9d281d..38682cd3c4 100644 --- a/src/agent/onefuzz-supervisor/src/coordinator.rs +++ b/src/agent/onefuzz-supervisor/src/coordinator.rs @@ -5,7 +5,7 @@ use anyhow::Result; use downcast_rs::Downcast; use onefuzz::{auth::AccessToken, http::ResponseExt, process::Output}; use reqwest::{Client, RequestBuilder, Response, StatusCode}; -use reqwest_retry::{SendRetry, DEFAULT_RETRY_PERIOD, MAX_RETRY_ATTEMPTS}; +use reqwest_retry::{RetryCheck, SendRetry, DEFAULT_RETRY_PERIOD, MAX_RETRY_ATTEMPTS}; use serde::Serialize; use uuid::Uuid; @@ -236,8 +236,10 @@ impl Coordinator { let request = self.get_request_builder(request_type.clone()); let mut response = request .send_retry( - vec![StatusCode::UNAUTHORIZED], - vec![], + |code| match code { + StatusCode::UNAUTHORIZED => RetryCheck::Fail, + _ => RetryCheck::Retry, + }, DEFAULT_RETRY_PERIOD, MAX_RETRY_ATTEMPTS, ) diff --git a/src/agent/onefuzz/src/syncdir.rs b/src/agent/onefuzz/src/syncdir.rs index 9900dec3f3..e83f868531 100644 --- a/src/agent/onefuzz/src/syncdir.rs +++ b/src/agent/onefuzz/src/syncdir.rs @@ -13,7 +13,7 @@ use anyhow::{Context, Result}; use futures::stream::StreamExt; use onefuzz_telemetry::{Event, EventData}; use reqwest::StatusCode; -use reqwest_retry::{SendRetry, DEFAULT_RETRY_PERIOD, MAX_RETRY_ATTEMPTS}; +use reqwest_retry::{RetryCheck, SendRetry, DEFAULT_RETRY_PERIOD, MAX_RETRY_ATTEMPTS}; use serde::{Deserialize, Serialize}; use std::{path::PathBuf, str, time::Duration}; use tokio::fs; @@ -143,8 +143,10 @@ impl SyncedDir { // https://docs.microsoft.com/en-us/rest/api/storageservices/specifying-conditional-headers-for-blob-service-operations .header("If-None-Match", "*") .send_retry( - vec![], - vec![StatusCode::CONFLICT], + |code| match code { + StatusCode::CONFLICT => RetryCheck::Succeed, + _ => RetryCheck::Retry, + }, DEFAULT_RETRY_PERIOD, MAX_RETRY_ATTEMPTS, ) diff --git a/src/agent/onefuzz/src/uploader.rs b/src/agent/onefuzz/src/uploader.rs index 3bc9795f45..1ebd69e886 100644 --- a/src/agent/onefuzz/src/uploader.rs +++ b/src/agent/onefuzz/src/uploader.rs @@ -5,9 +5,9 @@ use std::path::Path; use anyhow::Result; use futures::stream::TryStreamExt; -use reqwest as r; +use reqwest::{Body, Client, Response, StatusCode, Url}; use reqwest_retry::{ - send_retry_reqwest_default, SendRetry, DEFAULT_RETRY_PERIOD, MAX_RETRY_ATTEMPTS, + send_retry_reqwest_default, RetryCheck, SendRetry, DEFAULT_RETRY_PERIOD, MAX_RETRY_ATTEMPTS, }; use serde::Serialize; use tokio::{fs, io}; @@ -15,18 +15,18 @@ use tokio_util::codec; #[derive(Clone)] pub struct BlobUploader { - client: r::Client, - url: r::Url, + client: Client, + url: Url, } impl BlobUploader { - pub fn new(url: r::Url) -> Self { - let client = r::Client::new(); + pub fn new(url: Url) -> Self { + let client = Client::new(); Self { client, url } } - pub async fn upload(&mut self, file_path: impl AsRef) -> Result { + pub async fn upload(&mut self, file_path: impl AsRef) -> Result { let file_path = file_path.as_ref(); let file_name = file_path.file_name().unwrap().to_str().unwrap(); @@ -47,14 +47,16 @@ impl BlobUploader { .client .head(url.clone()) .send_retry( - vec![reqwest::StatusCode::NOT_FOUND], - vec![], + |code| match code { + StatusCode::NOT_FOUND => RetryCheck::Fail, + _ => RetryCheck::Retry, + }, DEFAULT_RETRY_PERIOD, MAX_RETRY_ATTEMPTS, ) .await { - if head.status() == reqwest::StatusCode::OK { + if head.status() == StatusCode::OK { return Ok(head); } } @@ -72,7 +74,7 @@ impl BlobUploader { .put(url.clone()) .header("Content-Length", &content_length) .header("x-ms-blob-type", "BlockBlob") - .body(r::Body::wrap_stream(file_stream)); + .body(Body::wrap_stream(file_stream)); Ok(request_builder) }) @@ -85,7 +87,7 @@ impl BlobUploader { &mut self, data: D, name: impl AsRef, - ) -> Result { + ) -> Result { let url = { let url_path = self.url.path(); let blob_path = format!("{}/{}", url_path, name.as_ref()); diff --git a/src/agent/reqwest-retry/src/lib.rs b/src/agent/reqwest-retry/src/lib.rs index efca7ec277..4094027eb5 100644 --- a/src/agent/reqwest-retry/src/lib.rs +++ b/src/agent/reqwest-retry/src/lib.rs @@ -14,6 +14,16 @@ use std::{ pub const DEFAULT_RETRY_PERIOD: Duration = Duration::from_secs(5); pub const MAX_RETRY_ATTEMPTS: usize = 5; +pub enum RetryCheck { + Retry, + Fail, + Succeed, +} + +fn always_retry(_: StatusCode) -> RetryCheck { + RetryCheck::Retry +} + pub async fn send_retry_reqwest_default< F: Fn() -> Result + Send + Sync, >( @@ -21,21 +31,23 @@ pub async fn send_retry_reqwest_default< ) -> Result { send_retry_reqwest( build_request, - [], - [], + |_| RetryCheck::Retry, DEFAULT_RETRY_PERIOD, MAX_RETRY_ATTEMPTS, ) .await } -pub async fn send_retry_reqwest Result + Send + Sync>( +pub async fn send_retry_reqwest( build_request: F, - fail_fast_status: impl AsRef<[StatusCode]>, - extra_success_status: impl AsRef<[StatusCode]>, + check_status: R, retry_period: Duration, max_retry: usize, -) -> Result { +) -> Result +where + F: Fn() -> Result + Send + Sync, + R: Fn(StatusCode) -> RetryCheck + Send + Sync, +{ let counter = AtomicUsize::new(0); let op = || async { let attempt_count = counter.fetch_add(1, Ordering::SeqCst); @@ -54,23 +66,37 @@ pub async fn send_retry_reqwest Result + Sen } } Ok(x) => { - let status = x.status(); - if extra_success_status.as_ref().contains(&status) { - return Ok(x); - } + if x.status().is_success() { + Ok(x) + } else { + let status = x.status(); + let result = check_status(status); - let response = x - .error_for_status() - .with_context(|| format!("request attempt {} failed", attempt_count + 1,)); - - match response { - Ok(x) => Ok(x), - Err(as_err) => { - let fail_fast = fail_fast_status.as_ref().contains(&status); - if attempt_count >= max_retry || fail_fast { - Err(backoff::Error::Permanent(Err(as_err))) - } else { - Err(backoff::Error::Transient(Err(as_err))) + match result { + RetryCheck::Succeed => Ok(x), + RetryCheck::Fail => { + match x.error_for_status().with_context(|| { + format!("request attempt {} failed", attempt_count + 1) + }) { + // the is_success check earlier should have taken care of this already. + Ok(x) => Ok(x), + Err(as_err) => Err(backoff::Error::Permanent(Err(as_err))), + } + } + RetryCheck::Retry => { + match x.error_for_status().with_context(|| { + format!("request attempt {} failed", attempt_count + 1) + }) { + // the is_success check earlier should have taken care of this already. + Ok(x) => Ok(x), + Err(as_err) => { + if attempt_count >= max_retry { + Err(backoff::Error::Permanent(Err(as_err))) + } else { + Err(backoff::Error::Transient(Err(as_err))) + } + } + } } } } @@ -103,38 +129,40 @@ pub async fn send_retry_reqwest Result + Sen #[async_trait] pub trait SendRetry { - async fn send_retry( + async fn send_retry( self, - fail_fast_status: Vec, - extra_success_status: Vec, + check_status: R, retry_period: Duration, max_retry: usize, - ) -> Result; + ) -> Result + where + R: Fn(StatusCode) -> RetryCheck + Send + Sync; async fn send_retry_default(self) -> Result; } #[async_trait] impl SendRetry for reqwest::RequestBuilder { async fn send_retry_default(self) -> Result { - self.send_retry(vec![], vec![], DEFAULT_RETRY_PERIOD, MAX_RETRY_ATTEMPTS) + self.send_retry(always_retry, DEFAULT_RETRY_PERIOD, MAX_RETRY_ATTEMPTS) .await } - async fn send_retry( + async fn send_retry( self, - fail_fast_status: Vec, - extra_success_status: Vec, + check_status: R, retry_period: Duration, max_retry: usize, - ) -> Result { + ) -> Result + where + R: Fn(StatusCode) -> RetryCheck + Send + Sync, + { let result = send_retry_reqwest( || { self.try_clone().ok_or_else(|| { anyhow::Error::msg("This request cannot be retried because it cannot be cloned") }) }, - fail_fast_status, - extra_success_status, + check_status, retry_period, max_retry, ) @@ -148,8 +176,19 @@ impl SendRetry for reqwest::RequestBuilder { mod test { use super::*; + fn always_fail(_: StatusCode) -> RetryCheck { + RetryCheck::Fail + } + + fn succeed_400(code: StatusCode) -> RetryCheck { + match code { + StatusCode::BAD_REQUEST => RetryCheck::Succeed, + _ => RetryCheck::Retry, + } + } + #[tokio::test] - async fn retry_should_pass() -> Result<()> { + async fn retry_success() -> Result<()> { reqwest::Client::new() .get("https://httpstat.us/200") .send_retry_default() @@ -159,56 +198,84 @@ mod test { } #[tokio::test] - async fn retry_should_fail() -> Result<()> { + async fn retry_socket_failure() -> Result<()> { let invalid_url = "http://127.0.0.1:81/test.txt"; let resp = reqwest::Client::new() .get(invalid_url) - .send_retry(vec![], vec![], Duration::from_millis(1), 3) + .send_retry(always_retry, Duration::from_millis(1), 3) .await; - if let Err(err) = &resp { - let as_text = format!("{:?}", err); - assert!(as_text.contains("request attempt 4 failed")); - } else { - anyhow::bail!("response to {} was expected to fail", invalid_url); + match resp { + Ok(_) => { + anyhow::bail!("response should have failed: {}", invalid_url); + } + Err(err) => { + let as_text = format!("{:?}", err); + assert!(as_text.contains("request attempt 4 failed"), "{}", as_text); + } } Ok(()) } #[tokio::test] - async fn retry_should_fail_404() -> Result<()> { + async fn retry_fail_normal() -> Result<()> { let invalid_url = "https://httpstat.us/400"; let resp = reqwest::Client::new() .get(invalid_url) - .send_retry(vec![], vec![], Duration::from_millis(1), 1) + .send_retry(always_retry, Duration::from_millis(1), 3) .await; - if let Err(err) = &resp { - let as_text = format!("{:?}", err); - assert!(as_text.contains("request attempt 2 failed")); - } else { - anyhow::bail!("response to {} was expected to fail", invalid_url); + match resp { + Ok(result) => { + anyhow::bail!("response should have failed: {:?}", result); + } + Err(err) => { + let as_text = format!("{:?}", err); + assert!(as_text.contains("request attempt 4 failed"), "{}", as_text); + } } Ok(()) } #[tokio::test] - async fn extra_error_success() -> Result<()> { + async fn retry_fail_fast() -> Result<()> { let invalid_url = "https://httpstat.us/400"; let resp = reqwest::Client::new() .get(invalid_url) - .send_retry( - vec![], - vec![StatusCode::BAD_REQUEST], - Duration::from_millis(1), - 1, - ) + .send_retry(always_fail, Duration::from_millis(1), 3) + .await; + + assert!(resp.is_err(), "{:?}", resp); + let as_text = format!("{:?}", resp); + assert!(as_text.contains("request attempt 1 failed"), "{}", as_text); + Ok(()) + } + + #[tokio::test] + async fn retry_400_success() -> Result<()> { + let invalid_url = "https://httpstat.us/400"; + let resp = reqwest::Client::new() + .get(invalid_url) + .send_retry(succeed_400, Duration::from_millis(1), 3) .await?; assert_eq!(resp.status(), StatusCode::BAD_REQUEST); + Ok(()) + } + + #[tokio::test] + async fn retry_400_with_retry() -> Result<()> { + let invalid_url = "https://httpstat.us/401"; + let resp = reqwest::Client::new() + .get(invalid_url) + .send_retry(succeed_400, Duration::from_millis(1), 3) + .await; + assert!(resp.is_err(), "{:?}", resp); + let as_text = format!("{:?}", resp); + assert!(as_text.contains("request attempt 4 failed"), "{}", as_text); Ok(()) } }