Skip to content

Commit

Permalink
stargz: fix large chunk processing for fscache
Browse files Browse the repository at this point in the history
Signed-off-by: Yan Song <yansong.ys@antfin.com>
  • Loading branch information
imeoer committed Jul 1, 2022
1 parent ff7cdda commit 88f0e43
Show file tree
Hide file tree
Showing 9 changed files with 59 additions and 39 deletions.
2 changes: 1 addition & 1 deletion rafs/src/metadata/direct_v6.rs
Original file line number Diff line number Diff line change
Expand Up @@ -976,7 +976,7 @@ impl RafsInode for OndiskInodeWrapper {
.map(Arc::clone);
});

find.ok_or_else(|| enoent!("can't find chunk info"))
find.ok_or_else(|| enoent!(format!("can't find chunk info {}", chunk_addr.block_addr())))
}
// TODO(tianqian.zyf): Use get_xattrs implement it
fn get_xattr(&self, name: &OsStr) -> Result<Option<XattrValue>> {
Expand Down
31 changes: 25 additions & 6 deletions src/bin/nydus-image/builder/stargz.rs
Original file line number Diff line number Diff line change
Expand Up @@ -612,6 +612,27 @@ impl StargzBuilder {

let mut chunk_map = HashMap::new();

let mut blob_chunks = Vec::new();
for node in &bootstrap_ctx.nodes {
if node.overlay.is_lower_layer() {
continue;
}
for chunk in node.chunks.iter() {
blob_chunks.push(chunk.clone());
}
}
blob_chunks.sort_by(|a, b| {
a.inner
.uncompressed_offset()
.cmp(&b.inner.uncompressed_offset())
});
for chunk in &mut blob_chunks {
let chunk_index = blob_ctx.alloc_index()?;
chunk.inner.set_index(chunk_index);
chunk_map.insert(chunk.inner.id(), chunk_index);
blob_ctx.add_chunk_meta_info(&chunk.inner)?;
}

// Set blob index and inode digest for upper nodes
for node in &mut bootstrap_ctx.nodes {
if node.overlay.is_lower_layer() {
Expand All @@ -624,13 +645,11 @@ impl StargzBuilder {
chunk.inner.set_blob_index(blob_index);
if let Some(chunk_index) = chunk_map.get(chunk.inner.id()) {
chunk.inner.set_index(*chunk_index);
} else {
let chunk_index = blob_ctx.alloc_index()?;
chunk.inner.set_index(chunk_index);
chunk_map.insert(*chunk.inner.id(), chunk.inner.index());
blob_ctx.add_chunk_meta_info(&chunk.inner)?;
}
decompressed_blob_size += chunk.inner.uncompressed_size() as u64;
decompressed_blob_size = std::cmp::max(
chunk.inner.uncompressed_offset() + chunk.inner.uncompressed_size() as u64,
decompressed_blob_size,
);
compressed_blob_size += chunk.inner.compressed_size() as u64;
inode_hasher.digest_update(chunk.inner.id().as_ref());
}
Expand Down
2 changes: 1 addition & 1 deletion src/bin/nydusd/fs_cache.rs
Original file line number Diff line number Diff line change
Expand Up @@ -502,7 +502,7 @@ impl FsCacheHandler {

match BLOB_FACTORY.new_blob_cache(config.factory_config(), &blob_ref) {
Err(_e) => Err(-libc::ENOENT),
Ok(blob) => match blob.blob_size() {
Ok(blob) => match blob.blob_uncompressed_size() {
Err(_e) => Err(-libc::EIO),
Ok(v) => Ok((blob, v)),
},
Expand Down
11 changes: 8 additions & 3 deletions storage/src/cache/cachedfile.rs
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,8 @@ pub(crate) struct FileCacheEntry {
pub(crate) runtime: Arc<Runtime>,
pub(crate) workers: Arc<AsyncWorkerMgr>,

pub(crate) blob_size: u64,
pub(crate) blob_compressed_size: u64,
pub(crate) blob_uncompressed_size: u64,
pub(crate) compressor: compress::Algorithm,
pub(crate) digester: digest::Algorithm,
// Whether `get_blob_object()` is supported.
Expand Down Expand Up @@ -89,8 +90,12 @@ impl BlobCache for FileCacheEntry {
self.blob_info.blob_id()
}

fn blob_size(&self) -> Result<u64> {
Ok(self.blob_size)
fn blob_compressed_size(&self) -> Result<u64> {
Ok(self.blob_compressed_size)
}

fn blob_uncompressed_size(&self) -> Result<u64> {
Ok(self.blob_uncompressed_size)
}

fn compressor(&self) -> compress::Algorithm {
Expand Down
6 changes: 5 additions & 1 deletion storage/src/cache/dummycache.rs
Original file line number Diff line number Diff line change
Expand Up @@ -49,10 +49,14 @@ impl BlobCache for DummyCache {
&self.blob_id
}

fn blob_size(&self) -> Result<u64> {
fn blob_compressed_size(&self) -> Result<u64> {
self.reader.blob_size().map_err(|e| eother!(e))
}

fn blob_uncompressed_size(&self) -> Result<u64> {
unimplemented!();
}

fn compressor(&self) -> compress::Algorithm {
self.compressor
}
Expand Down
6 changes: 4 additions & 2 deletions storage/src/cache/filecache/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -182,7 +182,8 @@ impl FileCacheEntry {
.get_reader(blob_info.blob_id())
.map_err(|_e| eio!("failed to get blob reader"))?;

let blob_size = Self::get_blob_size(&reader, &blob_info)?;
let blob_compressed_size = Self::get_blob_size(&reader, &blob_info)?;
let blob_uncompressed_size = blob_info.uncompressed_size();
let compressor = blob_info.compressor();
let digester = blob_info.digester();
let is_stargz = blob_info.is_stargz();
Expand Down Expand Up @@ -225,7 +226,8 @@ impl FileCacheEntry {
runtime,
workers,

blob_size,
blob_compressed_size,
blob_uncompressed_size,
compressor,
digester,
is_get_blob_object_supported,
Expand Down
5 changes: 3 additions & 2 deletions storage/src/cache/fscache/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -179,7 +179,7 @@ impl FileCacheEntry {
.backend
.get_reader(blob_info.blob_id())
.map_err(|_e| eio!("failed to get blob reader"))?;
let blob_size = blob_info.uncompressed_size();
let blob_compressed_size = Self::get_blob_size(&reader, &blob_info)?;
let meta = if blob_info.meta_ci_is_valid() {
Some(Arc::new(BlobMetaInfo::new(
&blob_file_path,
Expand All @@ -201,7 +201,8 @@ impl FileCacheEntry {
runtime,
workers,

blob_size,
blob_compressed_size,
blob_uncompressed_size: blob_info.uncompressed_size(),
compressor: blob_info.compressor(),
digester: blob_info.digester(),
is_get_blob_object_supported: true,
Expand Down
7 changes: 5 additions & 2 deletions storage/src/cache/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -125,7 +125,10 @@ pub trait BlobCache: Send + Sync {
fn blob_id(&self) -> &str;

/// Get size of the blob object.
fn blob_size(&self) -> Result<u64>;
fn blob_uncompressed_size(&self) -> Result<u64>;

/// Get size of the blob object.
fn blob_compressed_size(&self) -> Result<u64>;

/// Get data compression algorithm to handle chunks in the blob.
fn compressor(&self) -> compress::Algorithm;
Expand Down Expand Up @@ -252,7 +255,7 @@ pub trait BlobCache: Send + Sync {
let raw_chunk = if chunk.is_compressed() {
// Need a scratch buffer to decompress compressed data.
let c_size = if self.is_stargz() {
let blob_size = self.blob_size()?;
let blob_size = self.blob_compressed_size()?;
let max_size = blob_size.checked_sub(offset).ok_or_else(|| {
einval!("chunk compressed offset is bigger than blob file size")
})?;
Expand Down
28 changes: 7 additions & 21 deletions storage/src/meta/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -492,7 +492,7 @@ impl BlobMetaInfo {
Err(einval!(format!(
"entry not found index {} infos.len {}",
index,
infos.len()
infos.len(),
)))
}
}
Expand Down Expand Up @@ -569,7 +569,12 @@ impl BlobMetaInfo {
if (!self.state.is_stargz && entry.compressed_end() > self.state.compressed_size)
|| entry.uncompressed_end() > self.state.uncompressed_size
{
Err(einval!())
Err(einval!(format!(
"{} uncompressed_end {} > uncompressed_size {}",
self.state.blob_index,
entry.uncompressed_end(),
self.state.uncompressed_size,
)))
} else {
Ok(())
}
Expand Down Expand Up @@ -677,25 +682,6 @@ impl BlobMetaState {
let mut start = 0;
let mut end = 0;

if self.is_stargz {
// FIXME: since stargz chunks are not currently allocated chunk index in the order of uncompressed_offset,
// a binary search is not available for now, here is a heavy overhead workaround, need to be fixed.
for i in 0..self.chunk_count {
let off = if compressed {
chunks[i as usize].compressed_offset()
} else {
chunks[i as usize].uncompressed_offset()
};
if addr == off {
return Ok(i as usize);
}
}
return Err(einval!(format!(
"can't find stargz chunk by offset {}",
addr,
)));
}

while left < right {
let mid = left + size / 2;
// SAFETY: the call is made safe by the following invariants:
Expand Down

0 comments on commit 88f0e43

Please sign in to comment.