Skip to content

Commit

Permalink
fscache: add prefetch_all
Browse files Browse the repository at this point in the history
If prefetch_config.enable = true, prefetch all data after open
data blob.

Signed-off-by: Hongnan Li <hongnan.li@linux.alibaba.com>
  • Loading branch information
Hongnan Li authored and hsiangkao committed May 25, 2022
1 parent 497b45d commit f09f83d
Show file tree
Hide file tree
Showing 5 changed files with 60 additions and 12 deletions.
3 changes: 3 additions & 0 deletions api/src/http.rs
Original file line number Diff line number Diff line change
Expand Up @@ -95,6 +95,8 @@ pub struct BlobCacheEntry {
/// Domain id for the blob, which is used to group cached blobs into management domains.
#[serde(default)]
pub domain_id: String,
#[serde(default)]
pub fs_prefetch: Value,
}

/// Configuration information for a list of cached blob objects.
Expand Down Expand Up @@ -161,6 +163,7 @@ pub enum MetricsErrorKind {

/// Errors generated by/related to the API service, sent back through `ApiResponse`.
#[derive(Debug)]
#[allow(clippy::large_enum_variant)]
pub enum ApiError {
/// Daemon internal error
DaemonAbnormal(DaemonErrorKind),
Expand Down
2 changes: 1 addition & 1 deletion rafs/src/fs.rs
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ fn default_threads_count() -> usize {
8
}

fn default_merging_size() -> usize {
pub fn default_merging_size() -> usize {
128 * 1024
}

Expand Down
5 changes: 4 additions & 1 deletion src/bin/nydusd/blob_cache.rs
Original file line number Diff line number Diff line change
Expand Up @@ -277,7 +277,10 @@ impl BlobCacheMgr {
cache_compressed: false,
cache_config: entry.blob_config.cache_config.clone(),
cache_validate: false,
prefetch_config: Default::default(),
prefetch_config: match serde_json::from_value(entry.fs_prefetch.clone()) {
Ok(fs_prefetch) => fs_prefetch,
Err(_e) => Default::default(),
},
},
});

Expand Down
60 changes: 51 additions & 9 deletions src/bin/nydusd/fs_cache.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@

//! Handler to cooperate with Linux fscache subsystem for blob cache.

use std::cmp;
use std::collections::HashMap;
use std::convert::TryFrom;
use std::fs::{File, OpenOptions};
Expand All @@ -18,6 +19,7 @@ use std::sync::{Arc, Barrier, Mutex, MutexGuard};
use mio::unix::SourceFd;
use mio::{Events, Interest, Poll, Token, Waker};
use storage::cache::BlobCache;
use storage::device::BlobPrefetchRequest;
use storage::factory::BLOB_FACTORY;

use crate::blob_cache::{
Expand Down Expand Up @@ -407,23 +409,63 @@ impl FsCacheHandler {
) -> String {
let mut state = self.state.lock().unwrap();

if state.id_to_object_map.contains_key(&hdr.object_id) {
unsafe {
libc::close(msg.fd as i32);
};
format!("copen {},{}", hdr.msg_id, -libc::EALREADY)
} else {
use std::collections::hash_map::Entry::Vacant;
if let Vacant(e) = state.id_to_object_map.entry(hdr.object_id) {
match self.create_data_blob_object(&config, msg.fd) {
Err(s) => format!("copen {},{}", hdr.msg_id, s),
Ok((blob, blob_size)) => {
state
.id_to_object_map
.insert(hdr.object_id, (FsCacheObject::DataBlob(blob), msg.fd));
e.insert((FsCacheObject::DataBlob(blob.clone()), msg.fd));
state.id_to_config_map.insert(hdr.object_id, config.clone());
let _ = self.do_prefetch(&config, blob);
format!("copen {},{}", hdr.msg_id, blob_size)
}
}
} else {
unsafe {
libc::close(msg.fd as i32);
};
format!("copen {},{}", hdr.msg_id, -libc::EALREADY)
}
}

pub fn do_prefetch(&self, config: &BlobCacheConfigDataBlob, blob: Arc<dyn BlobCache>) {
let blob_info = config.blob_info().deref();
let factory_config = config.factory_config().deref();
if !factory_config.cache.prefetch_config.enable {
return;
}
let size = match factory_config
.cache
.prefetch_config
.merging_size
.checked_next_power_of_two()
{
None => rafs::fs::default_merging_size() as u64,
Some(1) => rafs::fs::default_merging_size() as u64,
Some(s) => s as u64,
};
let blob_size = blob_info.compressed_size();
let count = (blob_size + size - 1) / size;
let mut blob_req = Vec::with_capacity(count as usize);
let mut pre_offset = 0u64;
for _i in 0..count {
blob_req.push(BlobPrefetchRequest {
blob_id: blob_info.blob_id().to_owned(),
offset: pre_offset,
len: cmp::min(size, blob_size - pre_offset),
});
pre_offset += size;
if pre_offset > blob_size {
break;
}
}
info!("blob prefetch start");
let _ = std::thread::spawn(move || {
let _ = blob
.prefetch(blob.clone(), &blob_req, &[])
.map_err(|_e| eio!("failed to prefetch blob data"));
let _ = blob.stop_prefetch();
});
}

/// The `fscache` factory essentially creates a namespace for blob objects cached by the
Expand Down
2 changes: 1 addition & 1 deletion storage/src/cache/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -117,7 +117,7 @@ impl<'a, F: FnMut(BlobIoRange)> BlobIoMergeState<'a, F> {
}

/// Configuration information for blob data prefetching.
#[derive(Clone, Debug, Default, Eq, Hash, PartialEq)]
#[derive(Clone, Debug, Default, Eq, Hash, PartialEq, Deserialize, Serialize)]
pub struct BlobPrefetchConfig {
/// Whether to enable blob data prefetching.
pub enable: bool,
Expand Down

0 comments on commit f09f83d

Please sign in to comment.