Skip to content

Commit

Permalink
Auto merge of #6297 - alexcrichton:beta-net, r=alexcrichton
Browse files Browse the repository at this point in the history
[beta] Timeout batch downloads, not each download

This is a beta backport of #6285
  • Loading branch information
bors committed Nov 10, 2018
2 parents 2c011df + 9d41017 commit 5d96734
Showing 1 changed file with 40 additions and 41 deletions.
81 changes: 40 additions & 41 deletions src/cargo/core/package.rs
Original file line number Diff line number Diff line change
Expand Up @@ -267,6 +267,17 @@ pub struct Downloads<'a, 'cfg: 'a> {
largest: (u64, String),
start: Instant,
success: bool,

/// Timeout management, both of timeout thresholds as well as whether or not
/// our connection has timed out (and accompanying message if it has).
///
/// Note that timeout management is done manually here instead of in libcurl
/// because we want to apply timeouts to an entire batch of operations, not
/// any one particular single operatino
timeout: ops::HttpTimeout, // timeout configuration
updated_at: Cell<Instant>, // last time we received bytes
next_speed_check: Cell<Instant>, // if threshold isn't 0 by this time, error
next_speed_check_bytes_threshold: Cell<u64>, // decremented when we receive bytes
}

struct Download<'cfg> {
Expand All @@ -293,24 +304,7 @@ struct Download<'cfg> {

/// The moment we started this transfer at
start: Instant,

/// Last time we noticed that we got some more data from libcurl
updated_at: Cell<Instant>,

/// Timeout management, both of timeout thresholds as well as whether or not
/// our connection has timed out (and accompanying message if it has).
///
/// Note that timeout management is done manually here because we have a
/// `Multi` with a lot of active transfers but between transfers finishing
/// we perform some possibly slow synchronous work (like grabbing file
/// locks, extracting tarballs, etc). The default timers on our `Multi` keep
/// running during this work, but we don't want them to count towards timing
/// everythig out. As a result, we manage this manually and take the time
/// for synchronous work into account manually.
timeout: ops::HttpTimeout,
timed_out: Cell<Option<String>>,
next_speed_check: Cell<Instant>,
next_speed_check_bytes_threshold: Cell<u64>,

/// Logic used to track retrying this download if it's a spurious failure.
retry: Retry<'cfg>,
Expand Down Expand Up @@ -359,6 +353,7 @@ impl<'cfg> PackageSet<'cfg> {

pub fn enable_download<'a>(&'a self) -> CargoResult<Downloads<'a, 'cfg>> {
assert!(!self.downloading.replace(true));
let timeout = ops::HttpTimeout::new(self.config)?;
Ok(Downloads {
start: Instant::now(),
set: self,
Expand All @@ -375,6 +370,10 @@ impl<'cfg> PackageSet<'cfg> {
downloaded_bytes: 0,
largest: (0, String::new()),
success: false,
updated_at: Cell::new(Instant::now()),
timeout,
next_speed_check: Cell::new(Instant::now()),
next_speed_check_bytes_threshold: Cell::new(0),
})
}

Expand Down Expand Up @@ -446,7 +445,7 @@ impl<'a, 'cfg> Downloads<'a, 'cfg> {
debug!("downloading {} as {}", id, token);
assert!(self.pending_ids.insert(id.clone()));

let (mut handle, timeout) = ops::http_handle_and_timeout(self.set.config)?;
let (mut handle, _timeout) = ops::http_handle_and_timeout(self.set.config)?;
handle.get(true)?;
handle.url(&url)?;
handle.follow_location(true)?; // follow redirects
Expand Down Expand Up @@ -501,7 +500,6 @@ impl<'a, 'cfg> Downloads<'a, 'cfg> {
self.set.config.shell().status("Downloading", "crates ...")?;
}

let now = Instant::now();
let dl = Download {
token,
data: RefCell::new(Vec::new()),
Expand All @@ -511,11 +509,7 @@ impl<'a, 'cfg> Downloads<'a, 'cfg> {
total: Cell::new(0),
current: Cell::new(0),
start: Instant::now(),
updated_at: Cell::new(now),
timeout,
timed_out: Cell::new(None),
next_speed_check: Cell::new(now),
next_speed_check_bytes_threshold: Cell::new(0),
retry: Retry::new(self.set.config)?,
};
self.enqueue(dl, handle)?;
Expand Down Expand Up @@ -638,10 +632,8 @@ impl<'a, 'cfg> Downloads<'a, 'cfg> {
// active downloads to make sure they don't fire because of a slowly
// extracted tarball.
let finish_dur = start.elapsed();
for (dl, _) in self.pending.values_mut() {
dl.updated_at.set(dl.updated_at.get() + finish_dur);
dl.next_speed_check.set(dl.next_speed_check.get() + finish_dur);
}
self.updated_at.set(self.updated_at.get() + finish_dur);
self.next_speed_check.set(self.next_speed_check.get() + finish_dur);

let slot = &self.set.packages[&dl.id];
assert!(slot.fill(pkg).is_ok());
Expand All @@ -652,12 +644,12 @@ impl<'a, 'cfg> Downloads<'a, 'cfg> {
let mut handle = self.set.multi.add(handle)?;
let now = Instant::now();
handle.set_token(dl.token)?;
self.updated_at.set(now);
self.next_speed_check.set(now + self.timeout.dur);
self.next_speed_check_bytes_threshold.set(self.timeout.low_speed_limit as u64);
dl.timed_out.set(None);
dl.updated_at.set(now);
dl.current.set(0);
dl.total.set(0);
dl.next_speed_check.set(now + dl.timeout.dur);
dl.next_speed_check_bytes_threshold.set(dl.timeout.low_speed_limit as u64);
self.pending.insert(dl.token, (dl, handle));
Ok(())
}
Expand Down Expand Up @@ -712,25 +704,31 @@ impl<'a, 'cfg> Downloads<'a, 'cfg> {
dl.total.set(total);
let now = Instant::now();
if cur != dl.current.get() {
let delta = cur - dl.current.get();
let threshold = self.next_speed_check_bytes_threshold.get();

dl.current.set(cur);
dl.updated_at.set(now);
self.updated_at.set(now);

if dl.current.get() >= dl.next_speed_check_bytes_threshold.get() {
dl.next_speed_check.set(now + dl.timeout.dur);
dl.next_speed_check_bytes_threshold.set(
dl.current.get() + dl.timeout.low_speed_limit as u64,
if delta >= threshold {
self.next_speed_check.set(now + self.timeout.dur);
self.next_speed_check_bytes_threshold.set(
self.timeout.low_speed_limit as u64,
);
} else {
self.next_speed_check_bytes_threshold.set(threshold - delta);
}
}
if !self.tick(WhyTick::DownloadUpdate).is_ok() {
return false
}

// If we've spent too long not actually receiving any data we time out.
if now - dl.updated_at.get() > dl.timeout.dur {
if now - self.updated_at.get() > self.timeout.dur {
self.updated_at.set(now);
let msg = format!("failed to download any data for `{}` within {}s",
dl.id,
dl.timeout.dur.as_secs());
self.timeout.dur.as_secs());
dl.timed_out.set(Some(msg));
return false
}
Expand All @@ -739,13 +737,14 @@ impl<'a, 'cfg> Downloads<'a, 'cfg> {
// limit, see if we've transferred enough data during this threshold. If
// it fails this check then we fail because the download is going too
// slowly.
if now >= dl.next_speed_check.get() {
assert!(dl.current.get() < dl.next_speed_check_bytes_threshold.get());
if now >= self.next_speed_check.get() {
self.next_speed_check.set(now + self.timeout.dur);
assert!(self.next_speed_check_bytes_threshold.get() > 0);
let msg = format!("download of `{}` failed to transfer more \
than {} bytes in {}s",
dl.id,
dl.timeout.low_speed_limit,
dl.timeout.dur.as_secs());
self.timeout.low_speed_limit,
self.timeout.dur.as_secs());
dl.timed_out.set(Some(msg));
return false
}
Expand Down

0 comments on commit 5d96734

Please sign in to comment.