Skip to content
This repository has been archived by the owner on Nov 1, 2023. It is now read-only.

fix agent retry on connection level failures #623

Merged
8 commits merged into from
Mar 2, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion src/agent/Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

4 changes: 2 additions & 2 deletions src/agent/reqwest-retry/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ anyhow = "1.0"
async-trait = "0.1"
reqwest = { version = "0.10", features = ["json", "stream"] }
backoff = { version = "0.2", features = ["async-std"] }
log = "0.4"
onefuzz-telemetry = { path = "../onefuzz-telemetry" }

[dev-dependencies]
tokio = { version = "0.2" }
tokio = { version = "0.2" , features=["macros"] }
80 changes: 53 additions & 27 deletions src/agent/reqwest-retry/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
use anyhow::Result;
use async_trait::async_trait;
use backoff::{self, future::FutureOperation, ExponentialBackoff};
use onefuzz_telemetry::warn;
use reqwest::{Response, StatusCode};
use std::{
sync::atomic::{AtomicI32, Ordering},
Expand All @@ -16,30 +17,33 @@ use std::io::ErrorKind;
const DEFAULT_RETRY_PERIOD: Duration = Duration::from_secs(2);
const MAX_ELAPSED_TIME: Duration = Duration::from_secs(30);
const MAX_RETRY_ATTEMPTS: i32 = 5;
const MAX_RETRY_ERROR_MESSAGE: &str = "Maximum number of attempts reached for this request";

fn to_backoff_response(
result: Result<Response, reqwest::Error>,
) -> Result<Response, backoff::Error<anyhow::Error>> {
fn is_transient_socket_error(error: &reqwest::Error) -> bool {
let source = error.source();
if let Some(err) = source {
if let Some(io_error) = err.downcast_ref::<std::io::Error>() {
match io_error.kind() {
ErrorKind::ConnectionAborted
| ErrorKind::ConnectionReset
| ErrorKind::ConnectionRefused
| ErrorKind::TimedOut
| ErrorKind::NotConnected => return true,
_ => (),
fn is_transient_socket_error(error: &reqwest::Error) -> bool {
let source = error.source();
if let Some(err) = source {
if let Some(io_error) = err.downcast_ref::<std::io::Error>() {
match io_error.kind() {
ErrorKind::ConnectionAborted
| ErrorKind::ConnectionReset
| ErrorKind::ConnectionRefused
| ErrorKind::TimedOut
| ErrorKind::NotConnected => {
return true;
}
_ => (),
}
}
false
}
false
}

fn to_backoff_response(
result: Result<Response, reqwest::Error>,
) -> Result<Response, backoff::Error<anyhow::Error>> {
match result {
Err(error) => {
if is_transient_socket_error(&error) {
if error.is_connect() || is_transient_socket_error(&error) {
Err(backoff::Error::Transient(anyhow::Error::from(error)))
} else {
Err(backoff::Error::Permanent(anyhow::Error::from(error)))
Expand Down Expand Up @@ -91,12 +95,14 @@ pub async fn send_retry_reqwest<
let op = || async {
if counter.fetch_add(1, Ordering::SeqCst) >= max_retry {
Result::<Response, backoff::Error<anyhow::Error>>::Err(backoff::Error::Permanent(
anyhow::Error::msg("Maximum number of attempts reached for this request"),
anyhow::Error::msg(MAX_RETRY_ERROR_MESSAGE),
))
} else {
let request = build_request().map_err(backoff::Error::Permanent)?;
let response = request.send().await;
Result::<Response, backoff::Error<anyhow::Error>>::Ok(error_mapper(response)?)
let mapped = error_mapper(response)?;

Result::<Response, backoff::Error<anyhow::Error>>::Ok(mapped)
}
};
let result = op
Expand All @@ -107,7 +113,7 @@ pub async fn send_retry_reqwest<
max_elapsed_time: Some(max_elapsed_time),
..ExponentialBackoff::default()
},
|err, _| println!("Transient error: {}", err),
|err, _| warn!("transient http error: {}", err),
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can we add the url in the message here it might be helpful when debugging.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Unfortunately, the URL here isn't easily accessible. At this context, we have a Builder, which we'd have to TryClone to get access into the URL.

I agree that it would be valuable. Perhaps a follow-up can refactor this to make it accessible?

)
.await?;
Ok(result)
Expand Down Expand Up @@ -173,17 +179,37 @@ impl SendRetry for reqwest::RequestBuilder {
mod test {
use super::*;

// TODO: convert to feature-gated integration test.
#[ignore]
#[tokio::test]
async fn empty_stack() -> Result<()> {
let resp = reqwest::Client::new()
.get("http://localhost:5000/api/testGet")
async fn retry_should_pass() -> Result<()> {
reqwest::Client::new()
.get("https://www.microsoft.com")
bmc-msft marked this conversation as resolved.
Show resolved Hide resolved
.send_retry_default()
.await?;
println!("{:?}", resp);
.await?
.error_for_status()?;

Ok(())
}

#[tokio::test]
async fn retry_should_fail() -> Result<()> {
let invalid_url = "http://localhost:81/test.txt";
let resp = reqwest::Client::new()
bmc-msft marked this conversation as resolved.
Show resolved Hide resolved
.get(invalid_url)
.send_retry(
Duration::from_secs(1),
Duration::from_secs(3),
3i32,
to_backoff_response,
)
.await;

if let Err(err) = &resp {
let as_text = format!("{}", err);
assert!(as_text.contains("Maximum number of attempts reached for this request"));
} else {
bail!("response to {} was expected to fail", invalid_url);
}
bmc-msft marked this conversation as resolved.
Show resolved Hide resolved

assert!(resp.error_for_status().is_err());
Ok(())
}
}