From 4f444fc7a845b2e1005b179792891310413e4586 Mon Sep 17 00:00:00 2001 From: Charlie Marsh Date: Thu, 19 Sep 2024 13:00:22 -0400 Subject: [PATCH] Make it sound --- crates/uv-distribution/src/source/mod.rs | 60 ++++++++++++++++++++---- 1 file changed, 52 insertions(+), 8 deletions(-) diff --git a/crates/uv-distribution/src/source/mod.rs b/crates/uv-distribution/src/source/mod.rs index 9bcf1a7d26a81..0b72c6befbb39 100644 --- a/crates/uv-distribution/src/source/mod.rs +++ b/crates/uv-distribution/src/source/mod.rs @@ -23,7 +23,7 @@ use platform_tags::Tags; use pypi_types::{HashDigest, Metadata12, Metadata23, RequiresTxt}; use reqwest::Response; use tokio_util::compat::FuturesAsyncReadCompatExt; -use tracing::{debug, info_span, instrument, Instrument}; +use tracing::{debug, info_span, instrument, warn, Instrument}; use url::Url; use uv_cache::{Cache, CacheBucket, CacheEntry, CacheShard, Removal, WheelCache}; use uv_cache_info::CacheInfo; @@ -611,7 +611,6 @@ impl<'a, T: BuildContext> SourceDistributionBuilder<'a, T> { // new revision, to collect the source and built artifacts. let revision = Revision::new(); - // Download the source distribution. debug!("Downloading source distribution: {source}"); let entry = cache_shard.shard(revision.id()).entry(filename); let hashes = self @@ -637,10 +636,8 @@ impl<'a, T: BuildContext> SourceDistributionBuilder<'a, T> { })?; // If the archive is missing the required hashes, force a refresh. - if revision.has_digests(hashes) { - Ok(revision) - } else { - client + if !revision.has_digests(hashes) { + return client .managed(|client| async move { client .cached_client() @@ -651,8 +648,42 @@ impl<'a, T: BuildContext> SourceDistributionBuilder<'a, T> { CachedClientError::Client(err) => Error::Client(err), }) }) - .await + .await; } + + // If the archive is missing its source, force a refresh, but write to the same revision. + let entry = cache_shard.shard(revision.id()).entry(filename); + if !entry.path().is_dir() { + warn!("Re-downloading missing source distribution: {source}"); + + let download = |response| { + async { + debug!("Downloading source distribution: {source}"); + let hashes = self + .download_archive(response, source, filename, ext, entry.path(), hashes) + .await?; + + Ok(revision.with_hashes(hashes)) + } + .boxed_local() + .instrument(info_span!("download", source_dist = %source)) + }; + + return client + .managed(|client| async move { + client + .cached_client() + .skip_cache(Self::request(url.clone(), client)?, &cache_entry, download) + .await + .map_err(|err| match err { + CachedClientError::Callback(err) => err, + CachedClientError::Client(err) => Error::Client(err), + }) + }) + .await; + } + + Ok(revision) } /// Build a source distribution from a local archive (e.g., `.tar.gz` or `.zip`). @@ -864,7 +895,20 @@ impl<'a, T: BuildContext> SourceDistributionBuilder<'a, T> { if let Some(pointer) = LocalRevisionPointer::read_from(&revision_entry)? { if *pointer.cache_info() == cache_info { if pointer.revision().has_digests(hashes) { - return Ok(pointer); + let entry = cache_shard.shard(pointer.revision().id()).entry("source"); + + // If the source distribution itself is absent, we have to re-fetch it. + if entry.path().is_dir() { + return Ok(pointer); + } + + warn!("Re-extracting missing source distribution: {source}"); + let hashes = self + .persist_archive(&resource.path, resource.ext, entry.path(), hashes) + .await?; + if hashes == pointer.revision().hashes() { + return Ok(pointer); + } } } }