Skip to content
This repository has been archived by the owner on Nov 1, 2023. It is now read-only.

fix checking status codes for failure #766

Merged
4 commits merged into from
Apr 3, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 5 additions & 2 deletions src/agent/onefuzz-supervisor/src/coordinator.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ use anyhow::Result;
use downcast_rs::Downcast;
use onefuzz::{auth::AccessToken, http::ResponseExt, process::Output};
use reqwest::{Client, RequestBuilder, Response, StatusCode};
use reqwest_retry::{SendRetry, DEFAULT_RETRY_PERIOD, MAX_RETRY_ATTEMPTS};
use reqwest_retry::{RetryCheck, SendRetry, DEFAULT_RETRY_PERIOD, MAX_RETRY_ATTEMPTS};
use serde::Serialize;
use uuid::Uuid;

Expand Down Expand Up @@ -236,7 +236,10 @@ impl Coordinator {
let request = self.get_request_builder(request_type.clone());
let mut response = request
.send_retry(
vec![StatusCode::UNAUTHORIZED],
|code| match code {
StatusCode::UNAUTHORIZED => RetryCheck::Fail,
_ => RetryCheck::Retry,
},
DEFAULT_RETRY_PERIOD,
MAX_RETRY_ATTEMPTS,
)
Expand Down
7 changes: 5 additions & 2 deletions src/agent/onefuzz/src/syncdir.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ use anyhow::{Context, Result};
use futures::stream::StreamExt;
use onefuzz_telemetry::{Event, EventData};
use reqwest::StatusCode;
use reqwest_retry::{SendRetry, DEFAULT_RETRY_PERIOD, MAX_RETRY_ATTEMPTS};
use reqwest_retry::{RetryCheck, SendRetry, DEFAULT_RETRY_PERIOD, MAX_RETRY_ATTEMPTS};
use serde::{Deserialize, Serialize};
use std::{path::PathBuf, str, time::Duration};
use tokio::fs;
Expand Down Expand Up @@ -143,7 +143,10 @@ impl SyncedDir {
// https://docs.microsoft.com/en-us/rest/api/storageservices/specifying-conditional-headers-for-blob-service-operations
.header("If-None-Match", "*")
.send_retry(
vec![StatusCode::CONFLICT],
|code| match code {
StatusCode::CONFLICT => RetryCheck::Succeed,
_ => RetryCheck::Retry,
},
DEFAULT_RETRY_PERIOD,
MAX_RETRY_ATTEMPTS,
)
Expand Down
25 changes: 14 additions & 11 deletions src/agent/onefuzz/src/uploader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,28 +5,28 @@ use std::path::Path;

use anyhow::Result;
use futures::stream::TryStreamExt;
use reqwest as r;
use reqwest::{Body, Client, Response, StatusCode, Url};
use reqwest_retry::{
send_retry_reqwest_default, SendRetry, DEFAULT_RETRY_PERIOD, MAX_RETRY_ATTEMPTS,
send_retry_reqwest_default, RetryCheck, SendRetry, DEFAULT_RETRY_PERIOD, MAX_RETRY_ATTEMPTS,
};
use serde::Serialize;
use tokio::{fs, io};
use tokio_util::codec;

#[derive(Clone)]
pub struct BlobUploader {
client: r::Client,
url: r::Url,
client: Client,
url: Url,
}

impl BlobUploader {
pub fn new(url: r::Url) -> Self {
let client = r::Client::new();
pub fn new(url: Url) -> Self {
let client = Client::new();

Self { client, url }
}

pub async fn upload(&mut self, file_path: impl AsRef<Path>) -> Result<r::Response> {
pub async fn upload(&mut self, file_path: impl AsRef<Path>) -> Result<Response> {
let file_path = file_path.as_ref();

let file_name = file_path.file_name().unwrap().to_str().unwrap();
Expand All @@ -47,13 +47,16 @@ impl BlobUploader {
.client
.head(url.clone())
.send_retry(
vec![reqwest::StatusCode::NOT_FOUND],
|code| match code {
StatusCode::NOT_FOUND => RetryCheck::Fail,
_ => RetryCheck::Retry,
},
DEFAULT_RETRY_PERIOD,
MAX_RETRY_ATTEMPTS,
)
.await
{
if head.status() == reqwest::StatusCode::OK {
if head.status() == StatusCode::OK {
return Ok(head);
}
}
Expand All @@ -71,7 +74,7 @@ impl BlobUploader {
.put(url.clone())
.header("Content-Length", &content_length)
.header("x-ms-blob-type", "BlockBlob")
.body(r::Body::wrap_stream(file_stream));
.body(Body::wrap_stream(file_stream));

Ok(request_builder)
})
Expand All @@ -84,7 +87,7 @@ impl BlobUploader {
&mut self,
data: D,
name: impl AsRef<str>,
) -> Result<r::Response> {
) -> Result<Response> {
let url = {
let url_path = self.url.path();
let blob_path = format!("{}/{}", url_path, name.as_ref());
Expand Down
177 changes: 150 additions & 27 deletions src/agent/reqwest-retry/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,20 +14,40 @@ use std::{
pub const DEFAULT_RETRY_PERIOD: Duration = Duration::from_secs(5);
pub const MAX_RETRY_ATTEMPTS: usize = 5;

pub enum RetryCheck {
Retry,
Fail,
Succeed,
}

fn always_retry(_: StatusCode) -> RetryCheck {
RetryCheck::Retry
}

pub async fn send_retry_reqwest_default<
F: Fn() -> Result<reqwest::RequestBuilder> + Send + Sync,
>(
build_request: F,
) -> Result<Response> {
send_retry_reqwest(build_request, [], DEFAULT_RETRY_PERIOD, MAX_RETRY_ATTEMPTS).await
send_retry_reqwest(
build_request,
|_| RetryCheck::Retry,
DEFAULT_RETRY_PERIOD,
MAX_RETRY_ATTEMPTS,
)
.await
}

pub async fn send_retry_reqwest<F: Fn() -> Result<reqwest::RequestBuilder> + Send + Sync>(
pub async fn send_retry_reqwest<F, R>(
build_request: F,
fail_fast_status: impl AsRef<[StatusCode]>,
check_status: R,
retry_period: Duration,
max_retry: usize,
) -> Result<Response> {
) -> Result<Response>
where
F: Fn() -> Result<reqwest::RequestBuilder> + Send + Sync,
R: Fn(StatusCode) -> RetryCheck + Send + Sync,
{
let counter = AtomicUsize::new(0);
let op = || async {
let attempt_count = counter.fetch_add(1, Ordering::SeqCst);
Expand All @@ -49,11 +69,35 @@ pub async fn send_retry_reqwest<F: Fn() -> Result<reqwest::RequestBuilder> + Sen
if x.status().is_success() {
Ok(x)
} else {
let fail_fast = fail_fast_status.as_ref().contains(&x.status());
if attempt_count >= max_retry || fail_fast {
Err(backoff::Error::Permanent(Ok(x)))
} else {
Err(backoff::Error::Transient(Ok(x)))
let status = x.status();
let result = check_status(status);

match result {
RetryCheck::Succeed => Ok(x),
RetryCheck::Fail => {
match x.error_for_status().with_context(|| {
format!("request attempt {} failed", attempt_count + 1)
}) {
// the is_success check earlier should have taken care of this already.
Ok(x) => Ok(x),
Err(as_err) => Err(backoff::Error::Permanent(Err(as_err))),
}
}
RetryCheck::Retry => {
match x.error_for_status().with_context(|| {
format!("request attempt {} failed", attempt_count + 1)
}) {
// the is_success check earlier should have taken care of this already.
Ok(x) => Ok(x),
Err(as_err) => {
if attempt_count >= max_retry {
Err(backoff::Error::Permanent(Err(as_err)))
} else {
Err(backoff::Error::Transient(Err(as_err)))
}
}
}
}
}
}
}
Expand Down Expand Up @@ -85,35 +129,40 @@ pub async fn send_retry_reqwest<F: Fn() -> Result<reqwest::RequestBuilder> + Sen

#[async_trait]
pub trait SendRetry {
async fn send_retry(
async fn send_retry<R>(
self,
fail_fast_status: Vec<StatusCode>,
check_status: R,
retry_period: Duration,
max_retry: usize,
) -> Result<Response>;
) -> Result<Response>
where
R: Fn(StatusCode) -> RetryCheck + Send + Sync;
async fn send_retry_default(self) -> Result<Response>;
}

#[async_trait]
impl SendRetry for reqwest::RequestBuilder {
async fn send_retry_default(self) -> Result<Response> {
self.send_retry(vec![], DEFAULT_RETRY_PERIOD, MAX_RETRY_ATTEMPTS)
self.send_retry(always_retry, DEFAULT_RETRY_PERIOD, MAX_RETRY_ATTEMPTS)
.await
}

async fn send_retry(
async fn send_retry<R>(
self,
fail_fast_status: Vec<StatusCode>,
check_status: R,
retry_period: Duration,
max_retry: usize,
) -> Result<Response> {
) -> Result<Response>
where
R: Fn(StatusCode) -> RetryCheck + Send + Sync,
{
let result = send_retry_reqwest(
|| {
self.try_clone().ok_or_else(|| {
anyhow::Error::msg("This request cannot be retried because it cannot be cloned")
})
},
fail_fast_status,
check_status,
retry_period,
max_retry,
)
Expand All @@ -127,32 +176,106 @@ impl SendRetry for reqwest::RequestBuilder {
mod test {
use super::*;

fn always_fail(_: StatusCode) -> RetryCheck {
RetryCheck::Fail
}

fn succeed_400(code: StatusCode) -> RetryCheck {
match code {
StatusCode::BAD_REQUEST => RetryCheck::Succeed,
_ => RetryCheck::Retry,
}
}

#[tokio::test]
async fn retry_should_pass() -> Result<()> {
async fn retry_success() -> Result<()> {
reqwest::Client::new()
.get("https://www.microsoft.com")
.get("https://httpstat.us/200")
.send_retry_default()
.await?
.error_for_status()?;

Ok(())
}

#[tokio::test]
async fn retry_should_fail() -> Result<()> {
async fn retry_socket_failure() -> Result<()> {
let invalid_url = "http://127.0.0.1:81/test.txt";
let resp = reqwest::Client::new()
.get(invalid_url)
.send_retry(vec![], Duration::from_millis(1), 3)
.send_retry(always_retry, Duration::from_millis(1), 3)
.await;

if let Err(err) = &resp {
let as_text = format!("{:?}", err);
assert!(as_text.contains("request attempt 4 failed"));
} else {
anyhow::bail!("response to {} was expected to fail", invalid_url);
match resp {
Ok(_) => {
anyhow::bail!("response should have failed: {}", invalid_url);
}
Err(err) => {
let as_text = format!("{:?}", err);
assert!(as_text.contains("request attempt 4 failed"), "{}", as_text);
}
}

Ok(())
}

#[tokio::test]
async fn retry_fail_normal() -> Result<()> {
let invalid_url = "https://httpstat.us/400";
let resp = reqwest::Client::new()
.get(invalid_url)
.send_retry(always_retry, Duration::from_millis(1), 3)
.await;

match resp {
Ok(result) => {
anyhow::bail!("response should have failed: {:?}", result);
}
Err(err) => {
let as_text = format!("{:?}", err);
assert!(as_text.contains("request attempt 4 failed"), "{}", as_text);
}
}

Ok(())
}

#[tokio::test]
async fn retry_fail_fast() -> Result<()> {
let invalid_url = "https://httpstat.us/400";
let resp = reqwest::Client::new()
.get(invalid_url)
.send_retry(always_fail, Duration::from_millis(1), 3)
.await;

assert!(resp.is_err(), "{:?}", resp);
let as_text = format!("{:?}", resp);
assert!(as_text.contains("request attempt 1 failed"), "{}", as_text);
Ok(())
}

#[tokio::test]
async fn retry_400_success() -> Result<()> {
let invalid_url = "https://httpstat.us/400";
let resp = reqwest::Client::new()
.get(invalid_url)
.send_retry(succeed_400, Duration::from_millis(1), 3)
.await?;

assert_eq!(resp.status(), StatusCode::BAD_REQUEST);
Ok(())
}

#[tokio::test]
async fn retry_400_with_retry() -> Result<()> {
let invalid_url = "https://httpstat.us/401";
let resp = reqwest::Client::new()
.get(invalid_url)
.send_retry(succeed_400, Duration::from_millis(1), 3)
.await;

assert!(resp.is_err(), "{:?}", resp);
let as_text = format!("{:?}", resp);
assert!(as_text.contains("request attempt 4 failed"), "{}", as_text);
Ok(())
}
}