Skip to content

Commit

Permalink
Merge pull request #349 from MercyMM/master
Browse files Browse the repository at this point in the history
storage: update BlobPrefetchRequest fields type from u32 to u64
  • Loading branch information
liubogithub authored Mar 25, 2022
2 parents 4fc8f77 + 05410dd commit ea0234f
Show file tree
Hide file tree
Showing 11 changed files with 57 additions and 53 deletions.
8 changes: 0 additions & 8 deletions blobfs/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,8 +27,6 @@ use rafs::{
use serde::Deserialize;
use std::any::Any;
#[cfg(feature = "virtiofs")]
use std::convert::TryInto;
#[cfg(feature = "virtiofs")]
use std::ffi::CStr;
use std::ffi::CString;
#[cfg(feature = "virtiofs")]
Expand Down Expand Up @@ -165,8 +163,6 @@ impl BootstrapArgs {
/// combination of mount namespaces and the pivot_root system call.
pub struct BlobFs {
pfs: PassthroughFs,
#[cfg(feature = "virtiofs")]
cfg: Config,
#[allow(dead_code)]
bootstrap_args: BootstrapArgs,
}
Expand All @@ -176,14 +172,10 @@ impl BlobFs {
pub fn new(cfg: Config) -> io::Result<BlobFs> {
trace!("BlobFs config is: {:?}", cfg);

#[cfg(feature = "virtiofs")]
let cfg_bak = cfg.clone();
let bootstrap_args = Self::load_bootstrap(&cfg)?;
let pfs = PassthroughFs::new(cfg.ps_config)?;
Ok(BlobFs {
pfs,
#[cfg(feature = "virtiofs")]
cfg: cfg_bak,
bootstrap_args,
})
}
Expand Down
50 changes: 28 additions & 22 deletions blobfs/src/sync_io.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,18 @@ use storage::device::BlobPrefetchRequest;

impl BlobFs {
#[cfg(feature = "virtiofs")]
fn get_blob_id_and_size(&self, inode: Inode) -> io::Result<(String, i64)> {
fn check_st_size(blob_id: &Path, size: i64) -> io::Result<()> {
if size < 0 {
return Err(einval!(format!(
"load_chunks_on_demand: blob_id {:?}, size: {:?} is less than 0",
blob_id, size
)));
}
Ok(())
}

#[cfg(feature = "virtiofs")]
fn get_blob_id_and_size(&self, inode: Inode) -> io::Result<(String, u64)> {
// locate blob file that the inode refers to
let blob_id_full_path = self.pfs.readlinkat_proc_file(inode)?;
let parent = blob_id_full_path
Expand All @@ -37,12 +48,6 @@ impl BlobFs {
blob_id_full_path
);

debug_assert!(
parent
== Path::new(self.cfg.ps_config.root_dir.as_str())
.join(self.bootstrap_args.blob_cache_dir.as_str())
);

let blob_file = Self::open_file(
libc::AT_FDCWD,
&blob_id_full_path.as_path(),
Expand All @@ -60,31 +65,32 @@ impl BlobFs {

trace!("load_chunks_on_demand: blob_id {:?}", blob_id);

Ok((blob_id.to_os_string().into_string().unwrap(), st.st_size))
Self::check_st_size(blob_id_full_path.as_path(), st.st_size)?;

Ok((
blob_id.to_os_string().into_string().unwrap(),
st.st_size as u64,
))
}

#[cfg(feature = "virtiofs")]
fn load_chunks_on_demand(&self, inode: Inode, foffset: u64) -> io::Result<()> {
fn load_chunks_on_demand(&self, inode: Inode, offset: u64) -> io::Result<()> {
// prepare BlobPrefetchRequest and call device.prefetch().
// Make sure prefetch doesn't use delay_persist as we need the
// data immediately.
let (blob_id, size) = self.get_blob_id_and_size(inode)?;
let offset: u32 = foffset.try_into().map_err(|_| {
einval!(format!(
"blobfs: load_chunks_on_demand: foffset {} is larger than u32::MAX",
foffset
))
})?;
let len = (size - offset as i64).try_into().map_err(|_| {
einval!(format!(
"blobfs: load_chunks_on_demand: len {} is larger than u32::MAX",
(size - offset as i64)
))
})?;
if size <= offset {
return Err(einval!(format!(
"load_chunks_on_demand: blob_id {:?}, offset {:?} is larger than size {:?}",
blob_id, offset, size
)));
}

let len = size - offset;
let req = BlobPrefetchRequest {
blob_id,
offset,
len: min(len, 0x0020_0000_u32), // 2M range
len: min(len, 0x0020_0000_u64), // 2M range
};

self.bootstrap_args.fetch_range_sync(&[req]).map_err(|e| {
Expand Down
4 changes: 2 additions & 2 deletions rafs/src/fs.rs
Original file line number Diff line number Diff line change
Expand Up @@ -491,8 +491,8 @@ impl Rafs {
.iter()
.map(|b| BlobPrefetchRequest {
blob_id: b.blob_id().to_owned(),
offset: b.readahead_offset() as u32,
len: b.readahead_size() as u32,
offset: b.readahead_offset(),
len: b.readahead_size(),
})
.collect::<Vec<BlobPrefetchRequest>>();
device.prefetch(&[], &prefetches).unwrap_or_else(|e| {
Expand Down
6 changes: 3 additions & 3 deletions storage/src/backend/localfs.rs
Original file line number Diff line number Diff line change
Expand Up @@ -124,7 +124,7 @@ impl BlobReader for LocalFsEntry {
.map_err(|e| LocalFsError::ReadVecBlob(e).into())
}

fn prefetch_blob_data_range(&self, ra_offset: u32, ra_size: u32) -> BackendResult<()> {
fn prefetch_blob_data_range(&self, ra_offset: u64, ra_size: u64) -> BackendResult<()> {
if !self.readahead {
return Ok(());
}
Expand Down Expand Up @@ -153,13 +153,13 @@ impl BlobReader for LocalFsEntry {
}

// Prefetch data according to the hint if it's valid.
let end = ra_offset as u64 + ra_size as u64;
let end = ra_offset + ra_size;
if ra_size != 0 && end <= blob_size {
info!(
"kick off hinted blob readahead offset {} len {}",
ra_offset, ra_size
);
readahead(self.file.as_raw_fd(), ra_offset as u64, end);
readahead(self.file.as_raw_fd(), ra_offset, end);
}

// start access logging
Expand Down
2 changes: 1 addition & 1 deletion storage/src/backend/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -180,7 +180,7 @@ pub trait BlobReader: Send + Sync {
/// This method only prefetch blob data from storage backends, it doesn't cache data in the
/// blob cache subsystem. So it's useful for disk and file system based storage backends, but
/// it may not help for Registry/OSS based storage backends.
fn prefetch_blob_data_range(&self, ra_offset: u32, ra_size: u32) -> BackendResult<()>;
fn prefetch_blob_data_range(&self, ra_offset: u64, ra_size: u64) -> BackendResult<()>;

/// Stop the background data prefetching tasks.
fn stop_data_prefetch(&self) -> BackendResult<()>;
Expand Down
2 changes: 1 addition & 1 deletion storage/src/backend/oss.rs
Original file line number Diff line number Diff line change
Expand Up @@ -202,7 +202,7 @@ impl BlobReader for OssReader {
.map(|size| size as usize)?)
}

fn prefetch_blob_data_range(&self, _ra_offset: u32, _ra_size: u32) -> BackendResult<()> {
fn prefetch_blob_data_range(&self, _ra_offset: u64, _ra_size: u64) -> BackendResult<()> {
Err(BackendError::Unsupported(
"Oss backend does not support prefetch as per on-disk blob entries".to_string(),
))
Expand Down
2 changes: 1 addition & 1 deletion storage/src/backend/registry.rs
Original file line number Diff line number Diff line change
Expand Up @@ -534,7 +534,7 @@ impl BlobReader for RegistryReader {
.map_err(BackendError::Registry)
}

fn prefetch_blob_data_range(&self, _ra_offset: u32, _ra_size: u32) -> BackendResult<()> {
fn prefetch_blob_data_range(&self, _ra_offset: u64, _ra_size: u64) -> BackendResult<()> {
Err(BackendError::Unsupported(
"Registry backend does not support prefetch as per on-disk blob entries".to_string(),
))
Expand Down
7 changes: 2 additions & 5 deletions storage/src/cache/worker.rs
Original file line number Diff line number Diff line change
Expand Up @@ -304,11 +304,8 @@ impl AsyncWorkerMgr {
e
);
}
} else if offset < u32::MAX as u64 && size < u32::MAX as u64 {
let _ = cache.reader().prefetch_blob_data_range(
offset as u32,
std::cmp::min(size as u32, u32::MAX - offset as u32),
);
} else {
let _ = cache.reader().prefetch_blob_data_range(offset, size);
}

Ok(())
Expand Down
4 changes: 2 additions & 2 deletions storage/src/device.rs
Original file line number Diff line number Diff line change
Expand Up @@ -766,9 +766,9 @@ pub struct BlobPrefetchRequest {
/// The ID of the blob to prefetch data for.
pub blob_id: String,
/// Offset into the blob to prefetch data.
pub offset: u32,
pub offset: u64,
/// Size of data to prefetch.
pub len: u32,
pub len: u64,
}

/// Trait to provide direct access to underlying uncompressed blob file.
Expand Down
21 changes: 15 additions & 6 deletions storage/src/meta/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -618,17 +618,21 @@ impl BlobMetaState {
let mut size = self.chunk_count as usize;
let mut left = 0;
let mut right = size;
let mut start = 0;
let mut end = 0;

while left < right {
let mid = left + size / 2;
// SAFETY: the call is made safe by the following invariants:
// - `mid >= 0`
// - `mid < size`: `mid` is limited by `[left; right)` bound.
let entry = unsafe { chunks.get_unchecked(mid) };
let (start, end) = if compressed {
(entry.compressed_offset(), entry.compressed_end())
if compressed {
start = entry.compressed_offset();
end = entry.compressed_end();
} else {
(entry.uncompressed_offset(), entry.uncompressed_end())
start = entry.uncompressed_offset();
end = entry.uncompressed_end();
};

if start > addr {
Expand All @@ -642,7 +646,12 @@ impl BlobMetaState {
size = right - left;
}

Err(einval!())
// if addr == self.chunks[last].compressed_offset, return einval
// with error msg.
Err(einval!(format!(
"start: {}, end: {}, addr: {}",
start, end, addr
)))
}
}

Expand Down Expand Up @@ -885,8 +894,8 @@ mod tests {

fn prefetch_blob_data_range(
&self,
_blob_readahead_offset: u32,
_blob_readahead_size: u32,
_blob_readahead_offset: u64,
_blob_readahead_size: u64,
) -> BackendResult<()> {
Ok(())
}
Expand Down
4 changes: 2 additions & 2 deletions storage/src/test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -34,8 +34,8 @@ impl BlobReader for MockBackend {

fn prefetch_blob_data_range(
&self,
_blob_readahead_offset: u32,
_blob_readahead_size: u32,
_blob_readahead_offset: u64,
_blob_readahead_size: u64,
) -> BackendResult<()> {
Ok(())
}
Expand Down

0 comments on commit ea0234f

Please sign in to comment.