From 3507f88dd61bc9e995e0f742091f3e7553cd3132 Mon Sep 17 00:00:00 2001 From: Vaughn Dice Date: Mon, 20 May 2024 14:36:56 -0600 Subject: [PATCH] ref(oci/client): update unpack_archive_layer to take cache; make pub Signed-off-by: Vaughn Dice --- crates/oci/src/client.rs | 80 ++++++++++++++++++++-------------------- 1 file changed, 41 insertions(+), 39 deletions(-) diff --git a/crates/oci/src/client.rs b/crates/oci/src/client.rs index 36d338d8b2..f1cbcb2896 100644 --- a/crates/oci/src/client.rs +++ b/crates/oci/src/client.rs @@ -413,7 +413,7 @@ impl Client { this.cache.write_wasm(&bytes, &layer.digest).await?; } ARCHIVE_MEDIATYPE => { - this.unpack_archive_layer(&bytes, &layer.digest).await?; + unpack_archive_layer(&this.cache, &bytes, &layer.digest).await?; } _ => { this.cache.write_data(&bytes, &layer.digest).await?; @@ -515,44 +515,6 @@ impl Client { } } - /// Unpack archive layer into self.cache - async fn unpack_archive_layer( - &self, - bytes: impl AsRef<[u8]>, - digest: impl AsRef, - ) -> Result<()> { - // Write archive layer to cache as usual - self.cache.write_data(&bytes, &digest).await?; - - // Unpack archive into a staging dir - let path = self - .cache - .data_file(&digest) - .context("unable to read archive layer from cache")?; - let staging_dir = tempfile::tempdir()?; - crate::utils::unarchive(path.as_ref(), staging_dir.path()).await?; - - // Traverse unpacked contents and if a file, write to cache by digest - // (if it doesn't already exist) - for entry in WalkDir::new(staging_dir.path()) { - let entry = entry?; - if entry.file_type().is_file() && !entry.file_type().is_dir() { - let bytes = tokio::fs::read(entry.path()).await?; - let digest = format!("sha256:{}", sha256::hex_digest_from_bytes(&bytes)); - if self.cache.data_file(&digest).is_ok() { - tracing::debug!( - "Skipping unpacked asset {:?}; file already exists", - entry.path() - ); - } else { - tracing::debug!("Adding unpacked asset {:?} to cache", entry.path()); - self.cache.write_data(bytes, &digest).await?; - } - } - } - Ok(()) - } - /// Save a credential set containing the registry username and password. pub async fn login( server: impl AsRef, @@ -655,6 +617,46 @@ impl Client { } } +/// Unpack contents of the provided archive layer, represented by bytes and its +/// corresponding digest, into the provided cache. +/// A temporary staging directory is created via tempfile::tempdir() to store +/// the unpacked contents prior to writing to the cache. +pub async fn unpack_archive_layer( + cache: &Cache, + bytes: impl AsRef<[u8]>, + digest: impl AsRef, +) -> Result<()> { + // Write archive layer to cache as usual + cache.write_data(&bytes, &digest).await?; + + // Unpack archive into a staging dir + let path = cache + .data_file(&digest) + .context("unable to read archive layer from cache")?; + let staging_dir = tempfile::tempdir()?; + crate::utils::unarchive(path.as_ref(), staging_dir.path()).await?; + + // Traverse unpacked contents and if a file, write to cache by digest + // (if it doesn't already exist) + for entry in WalkDir::new(staging_dir.path()) { + let entry = entry?; + if entry.file_type().is_file() && !entry.file_type().is_dir() { + let bytes = tokio::fs::read(entry.path()).await?; + let digest = format!("sha256:{}", sha256::hex_digest_from_bytes(&bytes)); + if cache.data_file(&digest).is_ok() { + tracing::debug!( + "Skipping unpacked asset {:?}; file already exists", + entry.path() + ); + } else { + tracing::debug!("Adding unpacked asset {:?} to cache", entry.path()); + cache.write_data(bytes, &digest).await?; + } + } + } + Ok(()) +} + fn digest_from_url(manifest_url: &str) -> Option { // The URL is in the form "https://host/v2/refname/manifests/sha256:..." let manifest_url = Url::parse(manifest_url).ok()?;