From b5d772f303588ce8057e80f8f3958c098ebf4bcc Mon Sep 17 00:00:00 2001 From: Eric Huss Date: Wed, 22 Mar 2023 11:20:27 -0700 Subject: [PATCH 1/6] Split the `cargo::util::network` module into submodules This is intended to help grow with more stuff. --- src/cargo/core/package.rs | 2 +- src/cargo/sources/git/oxide.rs | 2 +- src/cargo/sources/git/utils.rs | 2 +- src/cargo/sources/registry/http_remote.rs | 2 +- src/cargo/util/network/mod.rs | 37 +++++++++++++++++++ .../util/{network.rs => network/retry.rs} | 37 ++----------------- 6 files changed, 44 insertions(+), 38 deletions(-) create mode 100644 src/cargo/util/network/mod.rs rename src/cargo/util/{network.rs => network/retry.rs} (83%) diff --git a/src/cargo/core/package.rs b/src/cargo/core/package.rs index 952b7cba26d..98d18c2efb5 100644 --- a/src/cargo/core/package.rs +++ b/src/cargo/core/package.rs @@ -28,7 +28,7 @@ use crate::ops; use crate::util::config::PackageCacheLock; use crate::util::errors::{CargoResult, HttpNotSuccessful}; use crate::util::interning::InternedString; -use crate::util::network::Retry; +use crate::util::network::retry::Retry; use crate::util::{self, internal, Config, Progress, ProgressStyle}; pub const MANIFEST_PREAMBLE: &str = "\ diff --git a/src/cargo/sources/git/oxide.rs b/src/cargo/sources/git/oxide.rs index 56d7f820b9e..0270579da75 100644 --- a/src/cargo/sources/git/oxide.rs +++ b/src/cargo/sources/git/oxide.rs @@ -29,7 +29,7 @@ pub fn with_retry_and_progress( ) -> CargoResult<()> { std::thread::scope(|s| { let mut progress_bar = Progress::new("Fetch", config); - network::with_retry(config, || { + network::retry::with_retry(config, || { let progress_root: Arc = gix::progress::tree::root::Options { initial_capacity: 10, diff --git a/src/cargo/sources/git/utils.rs b/src/cargo/sources/git/utils.rs index 17da5e59598..c7fce1f5e21 100644 --- a/src/cargo/sources/git/utils.rs +++ b/src/cargo/sources/git/utils.rs @@ -739,7 +739,7 @@ pub fn with_fetch_options( let ssh_config = config.net_config()?.ssh.as_ref(); let config_known_hosts = ssh_config.and_then(|ssh| ssh.known_hosts.as_ref()); let diagnostic_home_config = config.diagnostic_home_config(); - network::with_retry(config, || { + network::retry::with_retry(config, || { with_authentication(config, url, git_config, |f| { let port = Url::parse(url).ok().and_then(|url| url.port()); let mut last_update = Instant::now(); diff --git a/src/cargo/sources/registry/http_remote.rs b/src/cargo/sources/registry/http_remote.rs index e41c385a60b..45c45be72a1 100644 --- a/src/cargo/sources/registry/http_remote.rs +++ b/src/cargo/sources/registry/http_remote.rs @@ -8,7 +8,7 @@ use crate::sources::registry::download; use crate::sources::registry::MaybeLock; use crate::sources::registry::{LoadResponse, RegistryConfig, RegistryData}; use crate::util::errors::{CargoResult, HttpNotSuccessful}; -use crate::util::network::Retry; +use crate::util::network::retry::Retry; use crate::util::{auth, Config, Filesystem, IntoUrl, Progress, ProgressStyle}; use anyhow::Context; use cargo_util::paths; diff --git a/src/cargo/util/network/mod.rs b/src/cargo/util/network/mod.rs new file mode 100644 index 00000000000..76dba470a7e --- /dev/null +++ b/src/cargo/util/network/mod.rs @@ -0,0 +1,37 @@ +//! Utilities for networking. + +use std::task::Poll; + +pub mod retry; + +pub trait PollExt { + fn expect(self, msg: &str) -> T; +} + +impl PollExt for Poll { + #[track_caller] + fn expect(self, msg: &str) -> T { + match self { + Poll::Ready(val) => val, + Poll::Pending => panic!("{}", msg), + } + } +} + +// When dynamically linked against libcurl, we want to ignore some failures +// when using old versions that don't support certain features. +#[macro_export] +macro_rules! try_old_curl { + ($e:expr, $msg:expr) => { + let result = $e; + if cfg!(target_os = "macos") { + if let Err(e) = result { + warn!("ignoring libcurl {} error: {}", $msg, e); + } + } else { + result.with_context(|| { + anyhow::format_err!("failed to enable {}, is curl not built right?", $msg) + })?; + } + }; +} diff --git a/src/cargo/util/network.rs b/src/cargo/util/network/retry.rs similarity index 83% rename from src/cargo/util/network.rs rename to src/cargo/util/network/retry.rs index 70c38b6d42b..be29c0074fe 100644 --- a/src/cargo/util/network.rs +++ b/src/cargo/util/network/retry.rs @@ -1,22 +1,9 @@ +//! Utilities for retrying a network operation. + use anyhow::Error; use crate::util::errors::{CargoResult, HttpNotSuccessful}; use crate::util::Config; -use std::task::Poll; - -pub trait PollExt { - fn expect(self, msg: &str) -> T; -} - -impl PollExt for Poll { - #[track_caller] - fn expect(self, msg: &str) -> T { - match self { - Poll::Ready(val) => val, - Poll::Pending => panic!("{}", msg), - } - } -} pub struct Retry<'a> { config: &'a Config, @@ -105,7 +92,7 @@ fn maybe_spurious(err: &Error) -> bool { /// # let download_something = || return Ok(()); /// # let config = Config::default().unwrap(); /// use cargo::util::network; -/// let cargo_result = network::with_retry(&config, || download_something()); +/// let cargo_result = network::retry::with_retry(&config, || download_something()); /// ``` pub fn with_retry(config: &Config, mut callback: F) -> CargoResult where @@ -119,24 +106,6 @@ where } } -// When dynamically linked against libcurl, we want to ignore some failures -// when using old versions that don't support certain features. -#[macro_export] -macro_rules! try_old_curl { - ($e:expr, $msg:expr) => { - let result = $e; - if cfg!(target_os = "macos") { - if let Err(e) = result { - warn!("ignoring libcurl {} error: {}", $msg, e); - } - } else { - result.with_context(|| { - anyhow::format_err!("failed to enable {}, is curl not built right?", $msg) - })?; - } - }; -} - #[test] fn with_retry_repeats_the_call_then_works() { use crate::core::Shell; From c38e050fc6a823330b549c8a3db23ba4bf872c24 Mon Sep 17 00:00:00 2001 From: Eric Huss Date: Thu, 23 Mar 2023 05:20:25 -0700 Subject: [PATCH 2/6] Allow RegistryBuilder responder URLs to be a String This allows tests to generate dynamic URLs for custom responders. --- crates/cargo-test-support/src/registry.rs | 14 ++++++-------- 1 file changed, 6 insertions(+), 8 deletions(-) diff --git a/crates/cargo-test-support/src/registry.rs b/crates/cargo-test-support/src/registry.rs index e8b4342c78f..41fac3847a5 100644 --- a/crates/cargo-test-support/src/registry.rs +++ b/crates/cargo-test-support/src/registry.rs @@ -97,7 +97,7 @@ pub struct RegistryBuilder { /// Write the registry in configuration. configure_registry: bool, /// API responders. - custom_responders: HashMap<&'static str, Box Response>>, + custom_responders: HashMap Response>>, /// If nonzero, the git index update to be delayed by the given number of seconds. delayed_index_update: usize, } @@ -167,10 +167,11 @@ impl RegistryBuilder { #[must_use] pub fn add_responder Response>( mut self, - url: &'static str, + url: impl Into, responder: R, ) -> Self { - self.custom_responders.insert(url, Box::new(responder)); + self.custom_responders + .insert(url.into(), Box::new(responder)); self } @@ -601,7 +602,7 @@ pub struct HttpServer { addr: SocketAddr, token: Token, auth_required: bool, - custom_responders: HashMap<&'static str, Box Response>>, + custom_responders: HashMap Response>>, delayed_index_update: usize, } @@ -621,10 +622,7 @@ impl HttpServer { api_path: PathBuf, token: Token, auth_required: bool, - api_responders: HashMap< - &'static str, - Box Response>, - >, + api_responders: HashMap Response>>, delayed_index_update: usize, ) -> HttpServerHandle { let listener = TcpListener::bind("127.0.0.1:0").unwrap(); From 6bd1209a55c81f54408496bde838c7db72f909f0 Mon Sep 17 00:00:00 2001 From: Eric Huss Date: Thu, 23 Mar 2023 05:33:20 -0700 Subject: [PATCH 3/6] Add delays to network retries. --- Cargo.toml | 1 + src/cargo/core/package.rs | 118 ++++++----- src/cargo/sources/registry/http_remote.rs | 69 +++++-- src/cargo/util/network/mod.rs | 1 + src/cargo/util/network/retry.rs | 101 ++++++++-- src/cargo/util/network/sleep.rs | 94 +++++++++ src/doc/src/reference/config.md | 4 +- tests/testsuite/git_auth.rs | 1 + tests/testsuite/registry.rs | 226 +++++++++++++++++++++- 9 files changed, 527 insertions(+), 88 deletions(-) create mode 100644 src/cargo/util/network/sleep.rs diff --git a/Cargo.toml b/Cargo.toml index a9c045edbe4..27e552a3c4c 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -58,6 +58,7 @@ os_info = "3.5.0" pasetors = { version = "0.6.4", features = ["v3", "paserk", "std", "serde"] } pathdiff = "0.2" pretty_env_logger = { version = "0.4", optional = true } +rand = "0.8.5" rustfix = "0.6.0" semver = { version = "1.0.3", features = ["serde"] } serde = { version = "1.0.123", features = ["derive"] } diff --git a/src/cargo/core/package.rs b/src/cargo/core/package.rs index 98d18c2efb5..912c80cc461 100644 --- a/src/cargo/core/package.rs +++ b/src/cargo/core/package.rs @@ -28,7 +28,8 @@ use crate::ops; use crate::util::config::PackageCacheLock; use crate::util::errors::{CargoResult, HttpNotSuccessful}; use crate::util::interning::InternedString; -use crate::util::network::retry::Retry; +use crate::util::network::retry::{Retry, RetryResult}; +use crate::util::network::sleep::SleepTracker; use crate::util::{self, internal, Config, Progress, ProgressStyle}; pub const MANIFEST_PREAMBLE: &str = "\ @@ -319,6 +320,8 @@ pub struct Downloads<'a, 'cfg> { /// Set of packages currently being downloaded. This should stay in sync /// with `pending`. pending_ids: HashSet, + /// Downloads that have failed and are waiting to retry again later. + sleeping: SleepTracker<(Download<'cfg>, Easy)>, /// The final result of each download. A pair `(token, result)`. This is a /// temporary holding area, needed because curl can report multiple /// downloads at once, but the main loop (`wait`) is written to only @@ -442,6 +445,7 @@ impl<'cfg> PackageSet<'cfg> { next: 0, pending: HashMap::new(), pending_ids: HashSet::new(), + sleeping: SleepTracker::new(), results: Vec::new(), progress: RefCell::new(Some(Progress::with_style( "Downloading", @@ -800,7 +804,7 @@ impl<'a, 'cfg> Downloads<'a, 'cfg> { /// Returns the number of crates that are still downloading. pub fn remaining(&self) -> usize { - self.pending.len() + self.pending.len() + self.sleeping.len() } /// Blocks the current thread waiting for a package to finish downloading. @@ -831,51 +835,52 @@ impl<'a, 'cfg> Downloads<'a, 'cfg> { let ret = { let timed_out = &dl.timed_out; let url = &dl.url; - dl.retry - .r#try(|| { - if let Err(e) = result { - // If this error is "aborted by callback" then that's - // probably because our progress callback aborted due to - // a timeout. We'll find out by looking at the - // `timed_out` field, looking for a descriptive message. - // If one is found we switch the error code (to ensure - // it's flagged as spurious) and then attach our extra - // information to the error. - if !e.is_aborted_by_callback() { - return Err(e.into()); - } + dl.retry.r#try(|| { + if let Err(e) = result { + // If this error is "aborted by callback" then that's + // probably because our progress callback aborted due to + // a timeout. We'll find out by looking at the + // `timed_out` field, looking for a descriptive message. + // If one is found we switch the error code (to ensure + // it's flagged as spurious) and then attach our extra + // information to the error. + if !e.is_aborted_by_callback() { + return Err(e.into()); + } - return Err(match timed_out.replace(None) { - Some(msg) => { - let code = curl_sys::CURLE_OPERATION_TIMEDOUT; - let mut err = curl::Error::new(code); - err.set_extra(msg); - err - } - None => e, + return Err(match timed_out.replace(None) { + Some(msg) => { + let code = curl_sys::CURLE_OPERATION_TIMEDOUT; + let mut err = curl::Error::new(code); + err.set_extra(msg); + err } - .into()); + None => e, } + .into()); + } - let code = handle.response_code()?; - if code != 200 && code != 0 { - let url = handle.effective_url()?.unwrap_or(url); - return Err(HttpNotSuccessful { - code, - url: url.to_string(), - body: data, - } - .into()); + let code = handle.response_code()?; + if code != 200 && code != 0 { + let url = handle.effective_url()?.unwrap_or(url); + return Err(HttpNotSuccessful { + code, + url: url.to_string(), + body: data, } - Ok(data) - }) - .with_context(|| format!("failed to download from `{}`", dl.url))? + .into()); + } + Ok(data) + }) }; match ret { - Some(data) => break (dl, data), - None => { - self.pending_ids.insert(dl.id); - self.enqueue(dl, handle)? + RetryResult::Success(data) => break (dl, data), + RetryResult::Err(e) => { + return Err(e.context(format!("failed to download from `{}`", dl.url))) + } + RetryResult::Retry(sleep) => { + debug!("download retry {} for {sleep}ms", dl.url); + self.sleeping.push(sleep, (dl, handle)); } } }; @@ -963,6 +968,7 @@ impl<'a, 'cfg> Downloads<'a, 'cfg> { // actually block waiting for I/O to happen, which we achieve with the // `wait` method on `multi`. loop { + self.add_sleepers()?; let n = tls::set(self, || { self.set .multi @@ -985,17 +991,31 @@ impl<'a, 'cfg> Downloads<'a, 'cfg> { if let Some(pair) = results.pop() { break Ok(pair); } - assert!(!self.pending.is_empty()); - let min_timeout = Duration::new(1, 0); - let timeout = self.set.multi.get_timeout()?.unwrap_or(min_timeout); - let timeout = timeout.min(min_timeout); - self.set - .multi - .wait(&mut [], timeout) - .with_context(|| "failed to wait on curl `Multi`")?; + assert_ne!(self.remaining(), 0); + if self.pending.is_empty() { + let delay = self.sleeping.time_to_next().unwrap(); + debug!("sleeping main thread for {delay:?}"); + std::thread::sleep(delay); + } else { + let min_timeout = Duration::new(1, 0); + let timeout = self.set.multi.get_timeout()?.unwrap_or(min_timeout); + let timeout = timeout.min(min_timeout); + self.set + .multi + .wait(&mut [], timeout) + .with_context(|| "failed to wait on curl `Multi`")?; + } } } + fn add_sleepers(&mut self) -> CargoResult<()> { + for (dl, handle) in self.sleeping.to_retry() { + self.pending_ids.insert(dl.id); + self.enqueue(dl, handle)?; + } + Ok(()) + } + fn progress(&self, token: usize, total: u64, cur: u64) -> bool { let dl = &self.pending[&token].0; dl.total.set(total); @@ -1061,7 +1081,7 @@ impl<'a, 'cfg> Downloads<'a, 'cfg> { return Ok(()); } } - let pending = self.pending.len(); + let pending = self.remaining(); let mut msg = if pending == 1 { format!("{} crate", pending) } else { diff --git a/src/cargo/sources/registry/http_remote.rs b/src/cargo/sources/registry/http_remote.rs index 45c45be72a1..01a8958fc16 100644 --- a/src/cargo/sources/registry/http_remote.rs +++ b/src/cargo/sources/registry/http_remote.rs @@ -8,11 +8,12 @@ use crate::sources::registry::download; use crate::sources::registry::MaybeLock; use crate::sources::registry::{LoadResponse, RegistryConfig, RegistryData}; use crate::util::errors::{CargoResult, HttpNotSuccessful}; -use crate::util::network::retry::Retry; +use crate::util::network::retry::{Retry, RetryResult}; +use crate::util::network::sleep::SleepTracker; use crate::util::{auth, Config, Filesystem, IntoUrl, Progress, ProgressStyle}; use anyhow::Context; use cargo_util::paths; -use curl::easy::{HttpVersion, List}; +use curl::easy::{Easy, HttpVersion, List}; use curl::multi::{EasyHandle, Multi}; use log::{debug, trace, warn}; use std::cell::RefCell; @@ -103,6 +104,8 @@ struct Downloads<'cfg> { /// Set of paths currently being downloaded. /// This should stay in sync with `pending`. pending_paths: HashSet, + /// Downloads that have failed and are waiting to retry again later. + sleeping: SleepTracker<(Download<'cfg>, Easy)>, /// The final result of each download. results: HashMap>, /// The next ID to use for creating a token (see `Download::token`). @@ -184,6 +187,7 @@ impl<'cfg> HttpRegistry<'cfg> { next: 0, pending: HashMap::new(), pending_paths: HashSet::new(), + sleeping: SleepTracker::new(), results: HashMap::new(), progress: RefCell::new(Some(Progress::with_style( "Fetch", @@ -265,6 +269,7 @@ impl<'cfg> HttpRegistry<'cfg> { }; for (token, result) in results { let (mut download, handle) = self.downloads.pending.remove(&token).unwrap(); + assert!(self.downloads.pending_paths.remove(&download.path)); let mut handle = self.multi.remove(handle)?; let data = download.data.take(); let url = self.full_url(&download.path); @@ -289,21 +294,19 @@ impl<'cfg> HttpRegistry<'cfg> { }; Ok((data, code)) }) { - Ok(Some((data, code))) => Ok(CompletedDownload { + RetryResult::Success((data, code)) => Ok(CompletedDownload { response_code: code, data, header_map: download.header_map.take(), }), - Ok(None) => { - // retry the operation - let handle = self.multi.add(handle)?; - self.downloads.pending.insert(token, (download, handle)); + RetryResult::Err(e) => Err(e), + RetryResult::Retry(sleep) => { + debug!("download retry {:?} for {sleep}ms", download.path); + self.downloads.sleeping.push(sleep, (download, handle)); continue; } - Err(e) => Err(e), }; - assert!(self.downloads.pending_paths.remove(&download.path)); self.downloads.results.insert(download.path, result); self.downloads.downloads_finished += 1; } @@ -395,6 +398,25 @@ impl<'cfg> HttpRegistry<'cfg> { ))), } } + + fn add_sleepers(&mut self) -> CargoResult<()> { + for (dl, handle) in self.downloads.sleeping.to_retry() { + let mut handle = self.multi.add(handle)?; + handle.set_token(dl.token)?; + assert!( + self.downloads.pending_paths.insert(dl.path.to_path_buf()), + "path queued for download more than once" + ); + assert!( + self.downloads + .pending + .insert(dl.token, (dl, handle)) + .is_none(), + "dl token queued more than once" + ); + } + Ok(()) + } } impl<'cfg> RegistryData for HttpRegistry<'cfg> { @@ -730,6 +752,7 @@ impl<'cfg> RegistryData for HttpRegistry<'cfg> { loop { self.handle_completed_downloads()?; + self.add_sleepers()?; let remaining_in_multi = tls::set(&self.downloads, || { self.multi @@ -738,19 +761,25 @@ impl<'cfg> RegistryData for HttpRegistry<'cfg> { })?; trace!("{} transfers remaining", remaining_in_multi); - if remaining_in_multi == 0 { + if remaining_in_multi + self.downloads.sleeping.len() as u32 == 0 { return Ok(()); } - // We have no more replies to provide the caller with, - // so we need to wait until cURL has something new for us. - let timeout = self - .multi - .get_timeout()? - .unwrap_or_else(|| Duration::new(1, 0)); - self.multi - .wait(&mut [], timeout) - .with_context(|| "failed to wait on curl `Multi`")?; + if self.downloads.pending.is_empty() { + let delay = self.downloads.sleeping.time_to_next().unwrap(); + debug!("sleeping main thread for {delay:?}"); + std::thread::sleep(delay); + } else { + // We have no more replies to provide the caller with, + // so we need to wait until cURL has something new for us. + let timeout = self + .multi + .get_timeout()? + .unwrap_or_else(|| Duration::new(1, 0)); + self.multi + .wait(&mut [], timeout) + .with_context(|| "failed to wait on curl `Multi`")?; + } } } } @@ -779,7 +808,7 @@ impl<'cfg> Downloads<'cfg> { &format!( " {} complete; {} pending", self.downloads_finished, - self.pending.len() + self.pending.len() + self.sleeping.len() ), ) } diff --git a/src/cargo/util/network/mod.rs b/src/cargo/util/network/mod.rs index 76dba470a7e..60a380343b7 100644 --- a/src/cargo/util/network/mod.rs +++ b/src/cargo/util/network/mod.rs @@ -3,6 +3,7 @@ use std::task::Poll; pub mod retry; +pub mod sleep; pub trait PollExt { fn expect(self, msg: &str) -> T; diff --git a/src/cargo/util/network/retry.rs b/src/cargo/util/network/retry.rs index be29c0074fe..388857f2219 100644 --- a/src/cargo/util/network/retry.rs +++ b/src/cargo/util/network/retry.rs @@ -1,37 +1,71 @@ //! Utilities for retrying a network operation. +use crate::util::errors::HttpNotSuccessful; +use crate::{CargoResult, Config}; use anyhow::Error; - -use crate::util::errors::{CargoResult, HttpNotSuccessful}; -use crate::util::Config; +use rand::Rng; +use std::cmp::min; +use std::time::Duration; pub struct Retry<'a> { config: &'a Config, - remaining: u32, + retries: u64, + max_retries: u64, +} + +pub enum RetryResult { + Success(T), + Err(anyhow::Error), + Retry(u64), } +/// Maximum amount of time a single retry can be delayed (milliseconds). +const MAX_RETRY_SLEEP: u64 = 10 * 1000; +/// The minimum initial amount of time a retry will be delayed (milliseconds). +/// +/// The actual amount of time will be a random value above this. +const INITIAL_RETRY_SLEEP_BASE: u64 = 500; +/// The maximum amount of additional time the initial retry will take (milliseconds). +/// +/// The initial delay will be [`INITIAL_RETRY_SLEEP_BASE`] plus a random range +/// from 0 to this value. +const INITIAL_RETRY_JITTER: u64 = 1000; + impl<'a> Retry<'a> { pub fn new(config: &'a Config) -> CargoResult> { Ok(Retry { config, - remaining: config.net_config()?.retry.unwrap_or(2), + retries: 0, + max_retries: config.net_config()?.retry.unwrap_or(3) as u64, }) } /// Returns `Ok(None)` for operations that should be re-tried. - pub fn r#try(&mut self, f: impl FnOnce() -> CargoResult) -> CargoResult> { + pub fn r#try(&mut self, f: impl FnOnce() -> CargoResult) -> RetryResult { match f() { - Err(ref e) if maybe_spurious(e) && self.remaining > 0 => { + Err(ref e) if maybe_spurious(e) && self.retries < self.max_retries => { let msg = format!( "spurious network error ({} tries remaining): {}", - self.remaining, + self.max_retries - self.retries, e.root_cause(), ); - self.config.shell().warn(msg)?; - self.remaining -= 1; - Ok(None) + if let Err(e) = self.config.shell().warn(msg) { + return RetryResult::Err(e); + } + self.retries += 1; + let sleep = if self.retries == 1 { + let mut rng = rand::thread_rng(); + INITIAL_RETRY_SLEEP_BASE + rng.gen_range(0..INITIAL_RETRY_JITTER) + } else { + min( + ((self.retries - 1) * 3) * 1000 + INITIAL_RETRY_SLEEP_BASE, + MAX_RETRY_SLEEP, + ) + }; + RetryResult::Retry(sleep) } - other => other.map(Some), + Err(e) => RetryResult::Err(e), + Ok(r) => RetryResult::Success(r), } } } @@ -100,8 +134,10 @@ where { let mut retry = Retry::new(config)?; loop { - if let Some(ret) = retry.r#try(&mut callback)? { - return Ok(ret); + match retry.r#try(&mut callback) { + RetryResult::Success(r) => return Ok(r), + RetryResult::Err(e) => return Err(e), + RetryResult::Retry(sleep) => std::thread::sleep(Duration::from_millis(sleep)), } } } @@ -155,6 +191,43 @@ fn with_retry_finds_nested_spurious_errors() { assert!(result.is_ok()) } +#[test] +fn default_retry_schedule() { + use crate::core::Shell; + + let spurious = || -> CargoResult<()> { + Err(anyhow::Error::from(HttpNotSuccessful { + code: 500, + url: "Uri".to_string(), + body: Vec::new(), + })) + }; + let config = Config::default().unwrap(); + *config.shell() = Shell::from_write(Box::new(Vec::new())); + let mut retry = Retry::new(&config).unwrap(); + match retry.r#try(|| spurious()) { + RetryResult::Retry(sleep) => { + assert!( + sleep >= INITIAL_RETRY_SLEEP_BASE + && sleep < INITIAL_RETRY_SLEEP_BASE + INITIAL_RETRY_JITTER + ); + } + _ => panic!("unexpected non-retry"), + } + match retry.r#try(|| spurious()) { + RetryResult::Retry(sleep) => assert_eq!(sleep, 3500), + _ => panic!("unexpected non-retry"), + } + match retry.r#try(|| spurious()) { + RetryResult::Retry(sleep) => assert_eq!(sleep, 6500), + _ => panic!("unexpected non-retry"), + } + match retry.r#try(|| spurious()) { + RetryResult::Err(_) => {} + _ => panic!("unexpected non-retry"), + } +} + #[test] fn curle_http2_stream_is_spurious() { let code = curl_sys::CURLE_HTTP2_STREAM; diff --git a/src/cargo/util/network/sleep.rs b/src/cargo/util/network/sleep.rs new file mode 100644 index 00000000000..c77e056be4a --- /dev/null +++ b/src/cargo/util/network/sleep.rs @@ -0,0 +1,94 @@ +//! Utility for tracking network requests that will be retried in the future. + +use core::cmp::Ordering; +use std::collections::BinaryHeap; +use std::time::{Duration, Instant}; + +pub struct SleepTracker { + heap: BinaryHeap>, +} + +struct Sleeper { + wakeup: Instant, + data: T, +} + +impl PartialEq for Sleeper { + fn eq(&self, other: &Sleeper) -> bool { + self.wakeup == other.wakeup + } +} + +impl PartialOrd for Sleeper { + fn partial_cmp(&self, other: &Sleeper) -> Option { + Some(other.wakeup.cmp(&self.wakeup)) + } +} + +impl Eq for Sleeper {} + +impl Ord for Sleeper { + fn cmp(&self, other: &Sleeper) -> Ordering { + self.wakeup.cmp(&other.wakeup) + } +} + +impl SleepTracker { + pub fn new() -> SleepTracker { + SleepTracker { + heap: BinaryHeap::new(), + } + } + + /// Adds a new download that should be retried in the future. + pub fn push(&mut self, sleep: u64, data: T) { + self.heap.push(Sleeper { + wakeup: Instant::now() + .checked_add(Duration::from_millis(sleep)) + .expect("instant should not wrap"), + data, + }); + } + + pub fn len(&self) -> usize { + self.heap.len() + } + + /// Returns any downloads that are ready to go now. + pub fn to_retry(&mut self) -> Vec { + let now = Instant::now(); + let mut result = Vec::new(); + while let Some(next) = self.heap.peek() { + log::debug!("ERIC: now={now:?} next={:?}", next.wakeup); + if next.wakeup < now { + result.push(self.heap.pop().unwrap().data); + } else { + break; + } + } + result + } + + /// Returns the time when the next download is ready to go. + /// + /// Returns None if there are no sleepers remaining. + pub fn time_to_next(&self) -> Option { + self.heap + .peek() + .map(|s| s.wakeup.saturating_duration_since(Instant::now())) + } +} + +#[test] +fn returns_in_order() { + let mut s = SleepTracker::new(); + s.push(3, 3); + s.push(1, 1); + s.push(6, 6); + s.push(5, 5); + s.push(2, 2); + s.push(10000, 10000); + assert_eq!(s.len(), 6); + std::thread::sleep(Duration::from_millis(100)); + assert_eq!(s.to_retry(), &[1, 2, 3, 5, 6]); +} diff --git a/src/doc/src/reference/config.md b/src/doc/src/reference/config.md index 325b958ae95..6578b3cc0ab 100644 --- a/src/doc/src/reference/config.md +++ b/src/doc/src/reference/config.md @@ -110,7 +110,7 @@ user-agent = "…" # the user-agent header root = "/some/path" # `cargo install` destination directory [net] -retry = 2 # network retries +retry = 3 # network retries git-fetch-with-cli = true # use the `git` executable for git operations offline = true # do not access the network @@ -724,7 +724,7 @@ The `[net]` table controls networking configuration. ##### `net.retry` * Type: integer -* Default: 2 +* Default: 3 * Environment: `CARGO_NET_RETRY` Number of times to retry possibly spurious network errors. diff --git a/tests/testsuite/git_auth.rs b/tests/testsuite/git_auth.rs index 7c379adba9b..b6e68fa3d88 100644 --- a/tests/testsuite/git_auth.rs +++ b/tests/testsuite/git_auth.rs @@ -327,6 +327,7 @@ fn net_err_suggests_fetch_with_cli() { [UPDATING] git repository `ssh://needs-proxy.invalid/git` warning: spurious network error[..] warning: spurious network error[..] +warning: spurious network error[..] [ERROR] failed to get `foo` as a dependency of package `foo v0.0.0 [..]` Caused by: diff --git a/tests/testsuite/registry.rs b/tests/testsuite/registry.rs index 2d3933967d7..bf112f134ce 100644 --- a/tests/testsuite/registry.rs +++ b/tests/testsuite/registry.rs @@ -9,8 +9,10 @@ use cargo_test_support::registry::{ use cargo_test_support::{basic_manifest, project}; use cargo_test_support::{git, install::cargo_home, t}; use cargo_util::paths::remove_dir_all; +use std::fmt::Write; use std::fs::{self, File}; use std::path::Path; +use std::sync::Arc; use std::sync::Mutex; fn setup_http() -> TestRegistry { @@ -2704,7 +2706,7 @@ Caused by: } #[cargo_test] -fn sparse_retry() { +fn sparse_retry_single() { let fail_count = Mutex::new(0); let _registry = RegistryBuilder::new() .http_index() @@ -2741,10 +2743,10 @@ fn sparse_retry() { .with_stderr( "\ [UPDATING] `dummy-registry` index -warning: spurious network error (2 tries remaining): failed to get successful HTTP response from `[..]`, got 500 +warning: spurious network error (3 tries remaining): failed to get successful HTTP response from `[..]`, got 500 body: internal server error -warning: spurious network error (1 tries remaining): failed to get successful HTTP response from `[..]`, got 500 +warning: spurious network error (2 tries remaining): failed to get successful HTTP response from `[..]`, got 500 body: internal server error [DOWNLOADING] crates ... @@ -2757,6 +2759,224 @@ internal server error .run(); } +#[cargo_test] +fn sparse_retry_multiple() { + // Tests retry behavior of downloading lots of packages with various + // failure rates accessing the sparse index. + + // The index is the number of retries, the value is the number of packages + // that retry that number of times. Thus 50 packages succeed on first try, + // 25 on second, etc. + const RETRIES: &[u32] = &[50, 25, 12, 6]; + + let pkgs: Vec<_> = RETRIES + .iter() + .enumerate() + .flat_map(|(retries, num)| { + (0..*num) + .into_iter() + .map(move |n| (retries as u32, format!("{}-{n}-{retries}", rand_prefix()))) + }) + .collect(); + + let mut builder = RegistryBuilder::new().http_index(); + let fail_counts: Arc>> = Arc::new(Mutex::new(vec![0; pkgs.len()])); + let mut cargo_toml = r#" + [package] + name = "foo" + version = "0.1.0" + + [dependencies] + "# + .to_string(); + // The expected stderr output. + let mut expected = "\ +[UPDATING] `dummy-registry` index +[DOWNLOADING] crates ... +" + .to_string(); + for (n, (retries, name)) in pkgs.iter().enumerate() { + let count_clone = fail_counts.clone(); + let retries = *retries; + let ab = &name[..2]; + let cd = &name[2..4]; + builder = builder.add_responder(format!("/index/{ab}/{cd}/{name}"), move |req, server| { + let mut fail_counts = count_clone.lock().unwrap(); + if fail_counts[n] < retries { + fail_counts[n] += 1; + server.internal_server_error(req) + } else { + server.index(req) + } + }); + write!(&mut cargo_toml, "{name} = \"1.0.0\"\n").unwrap(); + for retry in 0..retries { + let remain = 3 - retry; + write!( + &mut expected, + "warning: spurious network error ({remain} tries remaining): \ + failed to get successful HTTP response from \ + `http://127.0.0.1:[..]/{ab}/{cd}/{name}`, got 500\n\ + body:\n\ + internal server error\n" + ) + .unwrap(); + } + write!( + &mut expected, + "[DOWNLOADED] {name} v1.0.0 (registry `dummy-registry`)\n" + ) + .unwrap(); + } + let _server = builder.build(); + for (_, name) in &pkgs { + Package::new(name, "1.0.0").publish(); + } + let p = project() + .file("Cargo.toml", &cargo_toml) + .file("src/lib.rs", "") + .build(); + p.cargo("fetch").with_stderr_unordered(expected).run(); +} + +#[cargo_test] +fn dl_retry_single() { + // Tests retry behavior of downloading a package. + // This tests a single package which exercises the code path that causes + // it to block. + let fail_count = Mutex::new(0); + let _server = RegistryBuilder::new() + .http_index() + .add_responder("/dl/bar/1.0.0/download", move |req, server| { + let mut fail_count = fail_count.lock().unwrap(); + if *fail_count < 2 { + *fail_count += 1; + server.internal_server_error(req) + } else { + server.dl(req) + } + }) + .build(); + Package::new("bar", "1.0.0").publish(); + let p = project() + .file( + "Cargo.toml", + r#" + [package] + name = "foo" + version = "0.1.0" + + [dependencies] + bar = "1.0" + "#, + ) + .file("src/lib.rs", "") + .build(); + p.cargo("fetch") + .with_stderr("\ +[UPDATING] `dummy-registry` index +[DOWNLOADING] crates ... +warning: spurious network error (3 tries remaining): failed to get successful HTTP response from `http://127.0.0.1:[..]/dl/bar/1.0.0/download`, got 500 +body: +internal server error +warning: spurious network error (2 tries remaining): failed to get successful HTTP response from `http://127.0.0.1:[..]/dl/bar/1.0.0/download`, got 500 +body: +internal server error +[DOWNLOADED] bar v1.0.0 (registry `dummy-registry`) +").run(); +} + +/// Creates a random prefix to randomly spread out the package names +/// to somewhat evenly distribute the different failures at different +/// points. +fn rand_prefix() -> String { + use rand::Rng; + const CHARS: &[u8] = b"abcdefghijklmnopqrstuvwxyz"; + let mut rng = rand::thread_rng(); + (0..5) + .map(|_| CHARS[rng.gen_range(0..CHARS.len())] as char) + .collect() +} + +#[cargo_test] +fn dl_retry_multiple() { + // Tests retry behavior of downloading lots of packages with various + // failure rates. + + // The index is the number of retries, the value is the number of packages + // that retry that number of times. Thus 50 packages succeed on first try, + // 25 on second, etc. + const RETRIES: &[u32] = &[50, 25, 12, 6]; + + let pkgs: Vec<_> = RETRIES + .iter() + .enumerate() + .flat_map(|(retries, num)| { + (0..*num) + .into_iter() + .map(move |n| (retries as u32, format!("{}-{n}-{retries}", rand_prefix()))) + }) + .collect(); + + let mut builder = RegistryBuilder::new().http_index(); + let fail_counts: Arc>> = Arc::new(Mutex::new(vec![0; pkgs.len()])); + let mut cargo_toml = r#" + [package] + name = "foo" + version = "0.1.0" + + [dependencies] + "# + .to_string(); + // The expected stderr output. + let mut expected = "\ +[UPDATING] `dummy-registry` index +[DOWNLOADING] crates ... +" + .to_string(); + for (n, (retries, name)) in pkgs.iter().enumerate() { + let count_clone = fail_counts.clone(); + let retries = *retries; + builder = + builder.add_responder(format!("/dl/{name}/1.0.0/download"), move |req, server| { + let mut fail_counts = count_clone.lock().unwrap(); + if fail_counts[n] < retries { + fail_counts[n] += 1; + server.internal_server_error(req) + } else { + server.dl(req) + } + }); + write!(&mut cargo_toml, "{name} = \"1.0.0\"\n").unwrap(); + for retry in 0..retries { + let remain = 3 - retry; + write!( + &mut expected, + "warning: spurious network error ({remain} tries remaining): \ + failed to get successful HTTP response from \ + `http://127.0.0.1:[..]/dl/{name}/1.0.0/download`, got 500\n\ + body:\n\ + internal server error\n" + ) + .unwrap(); + } + write!( + &mut expected, + "[DOWNLOADED] {name} v1.0.0 (registry `dummy-registry`)\n" + ) + .unwrap(); + } + let _server = builder.build(); + for (_, name) in &pkgs { + Package::new(name, "1.0.0").publish(); + } + let p = project() + .file("Cargo.toml", &cargo_toml) + .file("src/lib.rs", "") + .build(); + p.cargo("fetch").with_stderr_unordered(expected).run(); +} + #[cargo_test] fn deleted_entry() { // Checks the behavior when a package is removed from the index. From 2f8dafe4070a53a32c7d4c1b6c75aef18105a8a3 Mon Sep 17 00:00:00 2001 From: Eric Huss Date: Sat, 25 Mar 2023 11:58:44 -0700 Subject: [PATCH 4/6] Add some more docs and comments to `SleepTracker`. --- src/cargo/util/network/sleep.rs | 9 +++++++++ 1 file changed, 9 insertions(+) diff --git a/src/cargo/util/network/sleep.rs b/src/cargo/util/network/sleep.rs index c77e056be4a..d4105065e29 100644 --- a/src/cargo/util/network/sleep.rs +++ b/src/cargo/util/network/sleep.rs @@ -4,12 +4,19 @@ use core::cmp::Ordering; use std::collections::BinaryHeap; use std::time::{Duration, Instant}; +/// A tracker for network requests that have failed, and are awaiting to be +/// retried in the future. pub struct SleepTracker { + /// This is a priority queue that tracks the time when the next sleeper + /// should awaken (based on the [`Sleeper::wakeup`] property). heap: BinaryHeap>, } +/// An individual network request that is waiting to be retried in the future. struct Sleeper { + /// The time when this requests should be retried. wakeup: Instant, + /// Information about the network request. data: T, } @@ -21,6 +28,8 @@ impl PartialEq for Sleeper { impl PartialOrd for Sleeper { fn partial_cmp(&self, other: &Sleeper) -> Option { + // This reverses the comparison so that the BinaryHeap tracks the + // entry with the *lowest* wakeup time. Some(other.wakeup.cmp(&self.wakeup)) } } From e0d8204aed829e45835b9885dbe073d006ebc058 Mon Sep 17 00:00:00 2001 From: Eric Huss Date: Fri, 31 Mar 2023 14:21:23 -0700 Subject: [PATCH 5/6] Add `_MS` suffix to retry constants. --- src/cargo/util/network/retry.rs | 18 +++++++++--------- 1 file changed, 9 insertions(+), 9 deletions(-) diff --git a/src/cargo/util/network/retry.rs b/src/cargo/util/network/retry.rs index 388857f2219..2ee2c1223ab 100644 --- a/src/cargo/util/network/retry.rs +++ b/src/cargo/util/network/retry.rs @@ -20,16 +20,16 @@ pub enum RetryResult { } /// Maximum amount of time a single retry can be delayed (milliseconds). -const MAX_RETRY_SLEEP: u64 = 10 * 1000; +const MAX_RETRY_SLEEP_MS: u64 = 10 * 1000; /// The minimum initial amount of time a retry will be delayed (milliseconds). /// /// The actual amount of time will be a random value above this. -const INITIAL_RETRY_SLEEP_BASE: u64 = 500; +const INITIAL_RETRY_SLEEP_BASE_MS: u64 = 500; /// The maximum amount of additional time the initial retry will take (milliseconds). /// -/// The initial delay will be [`INITIAL_RETRY_SLEEP_BASE`] plus a random range +/// The initial delay will be [`INITIAL_RETRY_SLEEP_BASE_MS`] plus a random range /// from 0 to this value. -const INITIAL_RETRY_JITTER: u64 = 1000; +const INITIAL_RETRY_JITTER_MS: u64 = 1000; impl<'a> Retry<'a> { pub fn new(config: &'a Config) -> CargoResult> { @@ -55,11 +55,11 @@ impl<'a> Retry<'a> { self.retries += 1; let sleep = if self.retries == 1 { let mut rng = rand::thread_rng(); - INITIAL_RETRY_SLEEP_BASE + rng.gen_range(0..INITIAL_RETRY_JITTER) + INITIAL_RETRY_SLEEP_BASE_MS + rng.gen_range(0..INITIAL_RETRY_JITTER_MS) } else { min( - ((self.retries - 1) * 3) * 1000 + INITIAL_RETRY_SLEEP_BASE, - MAX_RETRY_SLEEP, + ((self.retries - 1) * 3) * 1000 + INITIAL_RETRY_SLEEP_BASE_MS, + MAX_RETRY_SLEEP_MS, ) }; RetryResult::Retry(sleep) @@ -208,8 +208,8 @@ fn default_retry_schedule() { match retry.r#try(|| spurious()) { RetryResult::Retry(sleep) => { assert!( - sleep >= INITIAL_RETRY_SLEEP_BASE - && sleep < INITIAL_RETRY_SLEEP_BASE + INITIAL_RETRY_JITTER + sleep >= INITIAL_RETRY_SLEEP_BASE_MS + && sleep < INITIAL_RETRY_SLEEP_BASE_MS + INITIAL_RETRY_JITTER_MS ); } _ => panic!("unexpected non-retry"), From 4fdea658a895d5de01fd7dbe92848d50361092f2 Mon Sep 17 00:00:00 2001 From: Eric Huss Date: Fri, 31 Mar 2023 14:22:27 -0700 Subject: [PATCH 6/6] Don't place side-effect expressions in assert! macros. --- src/cargo/sources/registry/http_remote.rs | 31 ++++++++++------------- 1 file changed, 14 insertions(+), 17 deletions(-) diff --git a/src/cargo/sources/registry/http_remote.rs b/src/cargo/sources/registry/http_remote.rs index 01a8958fc16..c0552734b33 100644 --- a/src/cargo/sources/registry/http_remote.rs +++ b/src/cargo/sources/registry/http_remote.rs @@ -269,7 +269,12 @@ impl<'cfg> HttpRegistry<'cfg> { }; for (token, result) in results { let (mut download, handle) = self.downloads.pending.remove(&token).unwrap(); - assert!(self.downloads.pending_paths.remove(&download.path)); + let was_present = self.downloads.pending_paths.remove(&download.path); + assert!( + was_present, + "expected pending_paths to contain {:?}", + download.path + ); let mut handle = self.multi.remove(handle)?; let data = download.data.take(); let url = self.full_url(&download.path); @@ -403,17 +408,10 @@ impl<'cfg> HttpRegistry<'cfg> { for (dl, handle) in self.downloads.sleeping.to_retry() { let mut handle = self.multi.add(handle)?; handle.set_token(dl.token)?; - assert!( - self.downloads.pending_paths.insert(dl.path.to_path_buf()), - "path queued for download more than once" - ); - assert!( - self.downloads - .pending - .insert(dl.token, (dl, handle)) - .is_none(), - "dl token queued more than once" - ); + let is_new = self.downloads.pending_paths.insert(dl.path.to_path_buf()); + assert!(is_new, "path queued for download more than once"); + let previous = self.downloads.pending.insert(dl.token, (dl, handle)); + assert!(previous.is_none(), "dl token queued more than once"); } Ok(()) } @@ -477,8 +475,9 @@ impl<'cfg> RegistryData for HttpRegistry<'cfg> { let result = result.with_context(|| format!("download of {} failed", path.display()))?; + let is_new = self.fresh.insert(path.to_path_buf()); assert!( - self.fresh.insert(path.to_path_buf()), + is_new, "downloaded the index file `{}` twice", path.display() ); @@ -634,10 +633,8 @@ impl<'cfg> RegistryData for HttpRegistry<'cfg> { let token = self.downloads.next; self.downloads.next += 1; debug!("downloading {} as {}", path.display(), token); - assert!( - self.downloads.pending_paths.insert(path.to_path_buf()), - "path queued for download more than once" - ); + let is_new = self.downloads.pending_paths.insert(path.to_path_buf()); + assert!(is_new, "path queued for download more than once"); // Each write should go to self.downloads.pending[&token].data. // Since the write function must be 'static, we access downloads through a thread-local.