From f2a9103b3bd9cae80f81faa908c9005c6436856b Mon Sep 17 00:00:00 2001 From: Daniel Moran Date: Mon, 12 Sep 2022 23:51:38 -0400 Subject: [PATCH] Add retries to NetDownload intrinsic. (#16798) Closes #6818 For now, hard-code a retry strategy of 10ms, 100ms, 1s, 10s. [ci skip-build-wheels] --- src/rust/engine/Cargo.lock | 12 ++++++++ src/rust/engine/Cargo.toml | 1 + src/rust/engine/src/downloads.rs | 51 ++++++++++++++++++++------------ 3 files changed, 45 insertions(+), 19 deletions(-) diff --git a/src/rust/engine/Cargo.lock b/src/rust/engine/Cargo.lock index ecddee5219b..1aea09c1fdb 100644 --- a/src/rust/engine/Cargo.lock +++ b/src/rust/engine/Cargo.lock @@ -724,6 +724,7 @@ dependencies = [ "testutil", "time 0.3.9", "tokio", + "tokio-retry", "tokio-util 0.7.2", "tryfuture", "ui", @@ -3312,6 +3313,17 @@ dependencies = [ "syn", ] +[[package]] +name = "tokio-retry" +version = "0.3.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "7f57eb36ecbe0fc510036adff84824dd3c24bb781e21bfa67b69d556aa85214f" +dependencies = [ + "pin-project 1.0.8", + "rand 0.8.5", + "tokio", +] + [[package]] name = "tokio-rustls" version = "0.22.0" diff --git a/src/rust/engine/Cargo.toml b/src/rust/engine/Cargo.toml index 0034fe808c1..1253866ba63 100644 --- a/src/rust/engine/Cargo.toml +++ b/src/rust/engine/Cargo.toml @@ -151,6 +151,7 @@ tempfile = "3" testutil_mock = { package = "mock", path = "testutil/mock" } time = "0.3" tokio = { version = "1.16", features = ["macros", "rt-multi-thread"] } +tokio-retry = "0.3" tokio-util = { version = "0.7", features = ["io"] } tryfuture = { path = "tryfuture" } ui = { path = "ui" } diff --git a/src/rust/engine/src/downloads.rs b/src/rust/engine/src/downloads.rs index 44891676c08..6ef4732c64d 100644 --- a/src/rust/engine/src/downloads.rs +++ b/src/rust/engine/src/downloads.rs @@ -10,6 +10,8 @@ use bytes::{BufMut, Bytes}; use futures::stream::StreamExt; use humansize::{file_size_opts, FileSize}; use reqwest::Error; +use tokio_retry::strategy::{jitter, ExponentialBackoff}; +use tokio_retry::RetryIf; use url::Url; use crate::context::Core; @@ -26,30 +28,41 @@ struct NetDownload { impl NetDownload { async fn start(core: &Arc, url: Url, file_name: String) -> Result { - // TODO: Retry failures - let response = core + let try_download = || async { + core .http_client .get(url.clone()) .send() .await - .map_err(|err| format!("Error downloading file: {}", err))?; + .map_err(|err| (format!("Error downloading file: {}", err), true)) + .and_then(|res| + // Handle common HTTP errors. + if res.status().is_server_error() { + Err((format!( + "Server error ({}) downloading file {} from {}", + res.status().as_str(), + file_name, + url, + ), true)) + } else if res.status().is_client_error() { + Err((format!( + "Client error ({}) downloading file {} from {}", + res.status().as_str(), + file_name, + url, + ), false)) + } else { + Ok(res) + }) + }; + + // TODO: Allow the retry strategy to be configurable? + // For now we retry after 10ms, 100ms, 1s, and 10s. + let retry_strategy = ExponentialBackoff::from_millis(10).map(jitter).take(4); + let response = RetryIf::spawn(retry_strategy, try_download, |err: &(String, bool)| err.1) + .await + .map_err(|(err, _)| err)?; - // Handle common HTTP errors. - if response.status().is_server_error() { - return Err(format!( - "Server error ({}) downloading file {} from {}", - response.status().as_str(), - file_name, - url, - )); - } else if response.status().is_client_error() { - return Err(format!( - "Client error ({}) downloading file {} from {}", - response.status().as_str(), - file_name, - url, - )); - } let byte_stream = Pin::new(Box::new(response.bytes_stream())); Ok(NetDownload { stream: byte_stream,