Skip to content

Commit

Permalink
Add retries to NetDownload intrinsic. (#16798)
Browse files Browse the repository at this point in the history
Closes #6818

For now, hard-code a retry strategy of 10ms, 100ms, 1s, 10s.

[ci skip-build-wheels]
  • Loading branch information
danxmoran authored Sep 13, 2022
1 parent 68979e1 commit f72b8c0
Show file tree
Hide file tree
Showing 3 changed files with 45 additions and 19 deletions.
12 changes: 12 additions & 0 deletions src/rust/engine/Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions src/rust/engine/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -150,6 +150,7 @@ tempfile = "3"
testutil_mock = { package = "mock", path = "testutil/mock" }
time = "0.3"
tokio = { version = "1.16", features = ["macros", "rt-multi-thread"] }
tokio-retry = "0.3"
tokio-util = { version = "0.7", features = ["io"] }
tryfuture = { path = "tryfuture" }
ui = { path = "ui" }
Expand Down
51 changes: 32 additions & 19 deletions src/rust/engine/src/downloads.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,8 @@ use bytes::{BufMut, Bytes};
use futures::stream::StreamExt;
use humansize::{file_size_opts, FileSize};
use reqwest::Error;
use tokio_retry::strategy::{jitter, ExponentialBackoff};
use tokio_retry::RetryIf;
use url::Url;

use crate::context::Core;
Expand All @@ -26,30 +28,41 @@ struct NetDownload {

impl NetDownload {
async fn start(core: &Arc<Core>, url: Url, file_name: String) -> Result<NetDownload, String> {
// TODO: Retry failures
let response = core
let try_download = || async {
core
.http_client
.get(url.clone())
.send()
.await
.map_err(|err| format!("Error downloading file: {}", err))?;
.map_err(|err| (format!("Error downloading file: {}", err), true))
.and_then(|res|
// Handle common HTTP errors.
if res.status().is_server_error() {
Err((format!(
"Server error ({}) downloading file {} from {}",
res.status().as_str(),
file_name,
url,
), true))
} else if res.status().is_client_error() {
Err((format!(
"Client error ({}) downloading file {} from {}",
res.status().as_str(),
file_name,
url,
), false))
} else {
Ok(res)
})
};

// TODO: Allow the retry strategy to be configurable?
// For now we retry after 10ms, 100ms, 1s, and 10s.
let retry_strategy = ExponentialBackoff::from_millis(10).map(jitter).take(4);
let response = RetryIf::spawn(retry_strategy, try_download, |err: &(String, bool)| err.1)
.await
.map_err(|(err, _)| err)?;

// Handle common HTTP errors.
if response.status().is_server_error() {
return Err(format!(
"Server error ({}) downloading file {} from {}",
response.status().as_str(),
file_name,
url,
));
} else if response.status().is_client_error() {
return Err(format!(
"Client error ({}) downloading file {} from {}",
response.status().as_str(),
file_name,
url,
));
}
let byte_stream = Pin::new(Box::new(response.bytes_stream()));
Ok(NetDownload {
stream: byte_stream,
Expand Down

0 comments on commit f72b8c0

Please sign in to comment.