Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

storage: update BlobPrefetchRequest fields type from u32 to u64 #349

Merged
merged 3 commits into from
Mar 25, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 0 additions & 8 deletions blobfs/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,8 +27,6 @@ use rafs::{
use serde::Deserialize;
use std::any::Any;
#[cfg(feature = "virtiofs")]
use std::convert::TryInto;
#[cfg(feature = "virtiofs")]
use std::ffi::CStr;
use std::ffi::CString;
#[cfg(feature = "virtiofs")]
Expand Down Expand Up @@ -165,8 +163,6 @@ impl BootstrapArgs {
/// combination of mount namespaces and the pivot_root system call.
pub struct BlobFs {
pfs: PassthroughFs,
#[cfg(feature = "virtiofs")]
cfg: Config,
#[allow(dead_code)]
bootstrap_args: BootstrapArgs,
}
Expand All @@ -176,14 +172,10 @@ impl BlobFs {
pub fn new(cfg: Config) -> io::Result<BlobFs> {
trace!("BlobFs config is: {:?}", cfg);

#[cfg(feature = "virtiofs")]
let cfg_bak = cfg.clone();
let bootstrap_args = Self::load_bootstrap(&cfg)?;
let pfs = PassthroughFs::new(cfg.ps_config)?;
Ok(BlobFs {
pfs,
#[cfg(feature = "virtiofs")]
cfg: cfg_bak,
bootstrap_args,
})
}
Expand Down
50 changes: 28 additions & 22 deletions blobfs/src/sync_io.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,18 @@ use storage::device::BlobPrefetchRequest;

impl BlobFs {
#[cfg(feature = "virtiofs")]
fn get_blob_id_and_size(&self, inode: Inode) -> io::Result<(String, i64)> {
fn check_st_size(blob_id: &Path, size: i64) -> io::Result<()> {
if size < 0 {
return Err(einval!(format!(
"load_chunks_on_demand: blob_id {:?}, size: {:?} is less than 0",
blob_id, size
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is it supposed to be if size < 0?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It is a mistakes. I will fix it

)));
}
Ok(())
}

#[cfg(feature = "virtiofs")]
fn get_blob_id_and_size(&self, inode: Inode) -> io::Result<(String, u64)> {
// locate blob file that the inode refers to
let blob_id_full_path = self.pfs.readlinkat_proc_file(inode)?;
let parent = blob_id_full_path
Expand All @@ -37,12 +48,6 @@ impl BlobFs {
blob_id_full_path
);

debug_assert!(
parent
== Path::new(self.cfg.ps_config.root_dir.as_str())
.join(self.bootstrap_args.blob_cache_dir.as_str())
);

let blob_file = Self::open_file(
libc::AT_FDCWD,
&blob_id_full_path.as_path(),
Expand All @@ -60,31 +65,32 @@ impl BlobFs {

trace!("load_chunks_on_demand: blob_id {:?}", blob_id);

Ok((blob_id.to_os_string().into_string().unwrap(), st.st_size))
Self::check_st_size(blob_id_full_path.as_path(), st.st_size)?;

Ok((
blob_id.to_os_string().into_string().unwrap(),
st.st_size as u64,
))
}

#[cfg(feature = "virtiofs")]
fn load_chunks_on_demand(&self, inode: Inode, foffset: u64) -> io::Result<()> {
fn load_chunks_on_demand(&self, inode: Inode, offset: u64) -> io::Result<()> {
// prepare BlobPrefetchRequest and call device.prefetch().
// Make sure prefetch doesn't use delay_persist as we need the
// data immediately.
let (blob_id, size) = self.get_blob_id_and_size(inode)?;
let offset: u32 = foffset.try_into().map_err(|_| {
einval!(format!(
"blobfs: load_chunks_on_demand: foffset {} is larger than u32::MAX",
foffset
))
})?;
let len = (size - offset as i64).try_into().map_err(|_| {
einval!(format!(
"blobfs: load_chunks_on_demand: len {} is larger than u32::MAX",
(size - offset as i64)
))
})?;
if size <= offset {
return Err(einval!(format!(
"load_chunks_on_demand: blob_id {:?}, offset {:?} is larger than size {:?}",
blob_id, offset, size
)));
}

let len = size - offset;
let req = BlobPrefetchRequest {
blob_id,
offset,
len: min(len, 0x0020_0000_u32), // 2M range
len: min(len, 0x0020_0000_u64), // 2M range
};

self.bootstrap_args.fetch_range_sync(&[req]).map_err(|e| {
Expand Down
4 changes: 2 additions & 2 deletions rafs/src/fs.rs
Original file line number Diff line number Diff line change
Expand Up @@ -491,8 +491,8 @@ impl Rafs {
.iter()
.map(|b| BlobPrefetchRequest {
blob_id: b.blob_id().to_owned(),
offset: b.readahead_offset() as u32,
len: b.readahead_size() as u32,
offset: b.readahead_offset(),
len: b.readahead_size(),
})
.collect::<Vec<BlobPrefetchRequest>>();
device.prefetch(&[], &prefetches).unwrap_or_else(|e| {
Expand Down
6 changes: 3 additions & 3 deletions storage/src/backend/localfs.rs
Original file line number Diff line number Diff line change
Expand Up @@ -124,7 +124,7 @@ impl BlobReader for LocalFsEntry {
.map_err(|e| LocalFsError::ReadVecBlob(e).into())
}

fn prefetch_blob_data_range(&self, ra_offset: u32, ra_size: u32) -> BackendResult<()> {
fn prefetch_blob_data_range(&self, ra_offset: u64, ra_size: u64) -> BackendResult<()> {
if !self.readahead {
return Ok(());
}
Expand Down Expand Up @@ -153,13 +153,13 @@ impl BlobReader for LocalFsEntry {
}

// Prefetch data according to the hint if it's valid.
let end = ra_offset as u64 + ra_size as u64;
let end = ra_offset + ra_size;
if ra_size != 0 && end <= blob_size {
info!(
"kick off hinted blob readahead offset {} len {}",
ra_offset, ra_size
);
readahead(self.file.as_raw_fd(), ra_offset as u64, end);
readahead(self.file.as_raw_fd(), ra_offset, end);
}

// start access logging
Expand Down
2 changes: 1 addition & 1 deletion storage/src/backend/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -180,7 +180,7 @@ pub trait BlobReader: Send + Sync {
/// This method only prefetch blob data from storage backends, it doesn't cache data in the
/// blob cache subsystem. So it's useful for disk and file system based storage backends, but
/// it may not help for Registry/OSS based storage backends.
fn prefetch_blob_data_range(&self, ra_offset: u32, ra_size: u32) -> BackendResult<()>;
fn prefetch_blob_data_range(&self, ra_offset: u64, ra_size: u64) -> BackendResult<()>;

/// Stop the background data prefetching tasks.
fn stop_data_prefetch(&self) -> BackendResult<()>;
Expand Down
2 changes: 1 addition & 1 deletion storage/src/backend/oss.rs
Original file line number Diff line number Diff line change
Expand Up @@ -202,7 +202,7 @@ impl BlobReader for OssReader {
.map(|size| size as usize)?)
}

fn prefetch_blob_data_range(&self, _ra_offset: u32, _ra_size: u32) -> BackendResult<()> {
fn prefetch_blob_data_range(&self, _ra_offset: u64, _ra_size: u64) -> BackendResult<()> {
Err(BackendError::Unsupported(
"Oss backend does not support prefetch as per on-disk blob entries".to_string(),
))
Expand Down
2 changes: 1 addition & 1 deletion storage/src/backend/registry.rs
Original file line number Diff line number Diff line change
Expand Up @@ -534,7 +534,7 @@ impl BlobReader for RegistryReader {
.map_err(BackendError::Registry)
}

fn prefetch_blob_data_range(&self, _ra_offset: u32, _ra_size: u32) -> BackendResult<()> {
fn prefetch_blob_data_range(&self, _ra_offset: u64, _ra_size: u64) -> BackendResult<()> {
Err(BackendError::Unsupported(
"Registry backend does not support prefetch as per on-disk blob entries".to_string(),
))
Expand Down
7 changes: 2 additions & 5 deletions storage/src/cache/worker.rs
Original file line number Diff line number Diff line change
Expand Up @@ -304,11 +304,8 @@ impl AsyncWorkerMgr {
e
);
}
} else if offset < u32::MAX as u64 && size < u32::MAX as u64 {
let _ = cache.reader().prefetch_blob_data_range(
offset as u32,
std::cmp::min(size as u32, u32::MAX - offset as u32),
);
} else {
let _ = cache.reader().prefetch_blob_data_range(offset, size);
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

By design the prefetch table is supposed to have (offset, size) < u32::MAX.
Can we leave a warn! or info! here about (offset, size) is > u32::MAX?

Copy link
Author

@MercyMM MercyMM Mar 25, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

After this patch, u64 is now by design. This does not affect default behavior. If it does not cause potential bugs, I feel that warn! is useless.

}

Ok(())
Expand Down
4 changes: 2 additions & 2 deletions storage/src/device.rs
Original file line number Diff line number Diff line change
Expand Up @@ -766,9 +766,9 @@ pub struct BlobPrefetchRequest {
/// The ID of the blob to prefetch data for.
pub blob_id: String,
/// Offset into the blob to prefetch data.
pub offset: u32,
pub offset: u64,
/// Size of data to prefetch.
pub len: u32,
pub len: u64,
}

/// Trait to provide direct access to underlying uncompressed blob file.
Expand Down
21 changes: 15 additions & 6 deletions storage/src/meta/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -618,17 +618,21 @@ impl BlobMetaState {
let mut size = self.chunk_count as usize;
let mut left = 0;
let mut right = size;
let mut start = 0;
let mut end = 0;

while left < right {
let mid = left + size / 2;
// SAFETY: the call is made safe by the following invariants:
// - `mid >= 0`
// - `mid < size`: `mid` is limited by `[left; right)` bound.
let entry = unsafe { chunks.get_unchecked(mid) };
let (start, end) = if compressed {
(entry.compressed_offset(), entry.compressed_end())
if compressed {
start = entry.compressed_offset();
end = entry.compressed_end();
} else {
(entry.uncompressed_offset(), entry.uncompressed_end())
start = entry.uncompressed_offset();
end = entry.uncompressed_end();
};

if start > addr {
Expand All @@ -642,7 +646,12 @@ impl BlobMetaState {
size = right - left;
}

Err(einval!())
// if addr == self.chunks[last].compressed_offset, return einval
// with error msg.
Err(einval!(format!(
"start: {}, end: {}, addr: {}",
start, end, addr
)))
}
}

Expand Down Expand Up @@ -885,8 +894,8 @@ mod tests {

fn prefetch_blob_data_range(
&self,
_blob_readahead_offset: u32,
_blob_readahead_size: u32,
_blob_readahead_offset: u64,
_blob_readahead_size: u64,
) -> BackendResult<()> {
Ok(())
}
Expand Down
4 changes: 2 additions & 2 deletions storage/src/test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -34,8 +34,8 @@ impl BlobReader for MockBackend {

fn prefetch_blob_data_range(
&self,
_blob_readahead_offset: u32,
_blob_readahead_size: u32,
_blob_readahead_offset: u64,
_blob_readahead_size: u64,
) -> BackendResult<()> {
Ok(())
}
Expand Down