Skip to content

Commit

Permalink
ref(oci/client): update unpack_archive_layer to take cache; make pub
Browse files Browse the repository at this point in the history
Signed-off-by: Vaughn Dice <vaughn.dice@fermyon.com>
  • Loading branch information
vdice committed May 23, 2024
1 parent 8e99755 commit 3507f88
Showing 1 changed file with 41 additions and 39 deletions.
80 changes: 41 additions & 39 deletions crates/oci/src/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -413,7 +413,7 @@ impl Client {
this.cache.write_wasm(&bytes, &layer.digest).await?;
}
ARCHIVE_MEDIATYPE => {
this.unpack_archive_layer(&bytes, &layer.digest).await?;
unpack_archive_layer(&this.cache, &bytes, &layer.digest).await?;
}
_ => {
this.cache.write_data(&bytes, &layer.digest).await?;
Expand Down Expand Up @@ -515,44 +515,6 @@ impl Client {
}
}

/// Unpack archive layer into self.cache
async fn unpack_archive_layer(
&self,
bytes: impl AsRef<[u8]>,
digest: impl AsRef<str>,
) -> Result<()> {
// Write archive layer to cache as usual
self.cache.write_data(&bytes, &digest).await?;

// Unpack archive into a staging dir
let path = self
.cache
.data_file(&digest)
.context("unable to read archive layer from cache")?;
let staging_dir = tempfile::tempdir()?;
crate::utils::unarchive(path.as_ref(), staging_dir.path()).await?;

// Traverse unpacked contents and if a file, write to cache by digest
// (if it doesn't already exist)
for entry in WalkDir::new(staging_dir.path()) {
let entry = entry?;
if entry.file_type().is_file() && !entry.file_type().is_dir() {
let bytes = tokio::fs::read(entry.path()).await?;
let digest = format!("sha256:{}", sha256::hex_digest_from_bytes(&bytes));
if self.cache.data_file(&digest).is_ok() {
tracing::debug!(
"Skipping unpacked asset {:?}; file already exists",
entry.path()
);
} else {
tracing::debug!("Adding unpacked asset {:?} to cache", entry.path());
self.cache.write_data(bytes, &digest).await?;
}
}
}
Ok(())
}

/// Save a credential set containing the registry username and password.
pub async fn login(
server: impl AsRef<str>,
Expand Down Expand Up @@ -655,6 +617,46 @@ impl Client {
}
}

/// Unpack contents of the provided archive layer, represented by bytes and its
/// corresponding digest, into the provided cache.
/// A temporary staging directory is created via tempfile::tempdir() to store
/// the unpacked contents prior to writing to the cache.
pub async fn unpack_archive_layer(
cache: &Cache,
bytes: impl AsRef<[u8]>,
digest: impl AsRef<str>,
) -> Result<()> {
// Write archive layer to cache as usual
cache.write_data(&bytes, &digest).await?;

// Unpack archive into a staging dir
let path = cache
.data_file(&digest)
.context("unable to read archive layer from cache")?;
let staging_dir = tempfile::tempdir()?;
crate::utils::unarchive(path.as_ref(), staging_dir.path()).await?;

// Traverse unpacked contents and if a file, write to cache by digest
// (if it doesn't already exist)
for entry in WalkDir::new(staging_dir.path()) {
let entry = entry?;
if entry.file_type().is_file() && !entry.file_type().is_dir() {
let bytes = tokio::fs::read(entry.path()).await?;
let digest = format!("sha256:{}", sha256::hex_digest_from_bytes(&bytes));
if cache.data_file(&digest).is_ok() {
tracing::debug!(
"Skipping unpacked asset {:?}; file already exists",
entry.path()
);
} else {
tracing::debug!("Adding unpacked asset {:?} to cache", entry.path());
cache.write_data(bytes, &digest).await?;
}
}
}
Ok(())
}

fn digest_from_url(manifest_url: &str) -> Option<String> {
// The URL is in the form "https://host/v2/refname/manifests/sha256:..."
let manifest_url = Url::parse(manifest_url).ok()?;
Expand Down

0 comments on commit 3507f88

Please sign in to comment.