Skip to content

Commit

Permalink
nydusd: support sharing data blobs within a domain
Browse files Browse the repository at this point in the history
Currently implementation doesn't support sharing data blobs among
bootstraps/container images. So enhance the implementation to support
data blob sharing.

Signed-off-by: Jiang Liu <gerry@linux.alibaba.com>
  • Loading branch information
jiangliu committed Jun 6, 2022
1 parent c74a772 commit 338d75a
Showing 1 changed file with 114 additions and 29 deletions.
143 changes: 114 additions & 29 deletions src/bin/nydusd/blob_cache.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
use std::collections::HashMap;
use std::io::{Error, ErrorKind, Result};
use std::path::{Path, PathBuf};
use std::sync::atomic::{AtomicU32, Ordering};
use std::sync::{Arc, Mutex, MutexGuard};

use nydus_api::http::{
Expand All @@ -28,27 +29,31 @@ pub fn generate_blob_key(domain_id: &str, blob_id: &str) -> String {
}

/// Configuration information for cached bootstrap blob objects.
#[derive(Clone)]
pub struct BlobCacheConfigBootstrap {
blob_id: String,
scoped_blob_id: String,
path: PathBuf,
factory_config: Arc<FactoryConfig>,
data_blobs: Mutex<Vec<Arc<BlobCacheConfigDataBlob>>>,
}

impl BlobCacheConfigBootstrap {
/// Get file path of the bootstrap blob file.
pub fn path(&self) -> &Path {
&self.path
}

fn add_data_blob(&self, blob: Arc<BlobCacheConfigDataBlob>) {
self.data_blobs.lock().unwrap().push(blob);
}
}

/// Configuration information for cached data blob objects.
#[derive(Clone)]
pub struct BlobCacheConfigDataBlob {
blob_info: Arc<BlobInfo>,
scoped_blob_id: String,
factory_config: Arc<FactoryConfig>,
ref_count: AtomicU32,
}

impl BlobCacheConfigDataBlob {
Expand Down Expand Up @@ -84,6 +89,7 @@ impl BlobCacheObjectConfig {
blob_info,
scoped_blob_id,
factory_config,
ref_count: AtomicU32::new(1),
}))
}

Expand All @@ -100,6 +106,7 @@ impl BlobCacheObjectConfig {
scoped_blob_id,
path,
factory_config,
data_blobs: Mutex::new(Vec::new()),
}))
}

Expand All @@ -109,6 +116,13 @@ impl BlobCacheObjectConfig {
BlobCacheObjectConfig::DataBlob(o) => &o.scoped_blob_id,
}
}

fn bootstrap_config(&self) -> Option<Arc<BlobCacheConfigBootstrap>> {
match self {
BlobCacheObjectConfig::Bootstrap(o) => Some(o.clone()),
BlobCacheObjectConfig::DataBlob(_o) => None,
}
}
}

#[derive(Default)]
Expand All @@ -125,17 +139,29 @@ impl BlobCacheState {

fn try_add(&mut self, config: BlobCacheObjectConfig) -> Result<()> {
let key = config.get_key();
if self.id_to_config_map.contains_key(key) {
return Err(Error::new(
ErrorKind::AlreadyExists,
"blob configuration information already exists",
));

if let Some(entry) = self.id_to_config_map.get(key) {
match entry {
BlobCacheObjectConfig::Bootstrap(_o) => {
// Bootstrap blob must be unique.
return Err(Error::new(
ErrorKind::AlreadyExists,
"blob_cache: bootstrap blob already exists",
));
}
BlobCacheObjectConfig::DataBlob(o) => {
// Data blob is reference counted.
o.ref_count.fetch_add(1, Ordering::AcqRel);
}
}
} else {
self.id_to_config_map.insert(key.to_owned(), config);
}
self.id_to_config_map.insert(key.to_owned(), config);

Ok(())
}

fn remove(&mut self, param: &BlobCacheObjectId) {
fn remove(&mut self, param: &BlobCacheObjectId) -> Result<()> {
if param.blob_id.is_empty() && !param.domain_id.is_empty() {
// Remove all blobs associated with the domain.
let scoped_blob_prefix = format!("{}{}", param.domain_id, ID_SPLITTER);
Expand All @@ -148,16 +174,33 @@ impl BlobCacheState {
}
});
} else {
let scoped_blob_prefix = if param.domain_id.is_empty() {
format!("{}", param.blob_id)
} else {
format!("{}{}{}", param.domain_id, ID_SPLITTER, param.blob_id)
};
self.id_to_config_map.retain(|_k, v| match v {
BlobCacheObjectConfig::Bootstrap(o) => !o.scoped_blob_id.eq(&scoped_blob_prefix),
BlobCacheObjectConfig::DataBlob(o) => !o.scoped_blob_id.eq(&scoped_blob_prefix),
});
let mut data_blobs = Vec::new();
let mut is_bootstrap = false;
let scoped_blob_prefix = generate_blob_key(&param.domain_id, &param.blob_id);

match self.id_to_config_map.get(&scoped_blob_prefix) {
None => return Err(enoent!("blob_cache: cache entry not found")),
Some(BlobCacheObjectConfig::Bootstrap(o)) => {
is_bootstrap = true;
data_blobs = o.data_blobs.lock().unwrap().clone();
}
Some(BlobCacheObjectConfig::DataBlob(o)) => {
data_blobs.push(o.clone());
}
}

for entry in data_blobs {
if entry.ref_count.fetch_sub(1, Ordering::AcqRel) == 1 {
self.id_to_config_map.remove(&entry.scoped_blob_id);
}
}

if is_bootstrap {
self.id_to_config_map.remove(&scoped_blob_prefix);
}
}

Ok(())
}

fn get(&self, key: &str) -> Option<BlobCacheObjectConfig> {
Expand Down Expand Up @@ -216,9 +259,7 @@ impl BlobCacheMgr {

/// Remove a blob object from the cache manager.
pub fn remove_blob_entry(&self, param: &BlobCacheObjectId) -> Result<()> {
let mut state = self.get_state();
state.remove(param);
Ok(())
self.get_state().remove(param)
}

/// Get configuration information for the blob with `key`.
Expand Down Expand Up @@ -310,29 +351,48 @@ impl BlobCacheMgr {
factory_config: Arc<FactoryConfig>,
) -> Result<()> {
let rs = RafsSuper::load_from_metadata(&path, RafsMode::Direct, true)?;
let meta_config = BlobCacheObjectConfig::new_bootstrap_blob(
let bootstrap = BlobCacheObjectConfig::new_bootstrap_blob(
domain_id.to_string(),
id.to_string(),
path,
factory_config.clone(),
);

let mut state = self.get_state();
state.try_add(meta_config)?;
state.try_add(bootstrap.clone())?;
// Safe to unwrap() because it's a bootstrap.
let bs_obj = bootstrap.bootstrap_config().unwrap();

// Try to add the referenced data blob object if it doesn't exist yet.
for bi in rs.superblock.get_blob_infos() {
debug!(
"blob_cache: found blob {} in domain {}",
"blob_cache: add data blob {} to domain {}",
&bi.blob_id(),
domain_id
);
let blob_config = BlobCacheObjectConfig::new_data_blob(
let data_blob = BlobCacheObjectConfig::new_data_blob(
domain_id.to_string(),
bi,
factory_config.clone(),
);
state.try_add(blob_config)?;
let key = data_blob.get_key().to_string();
let data_blob_config = match &data_blob {
BlobCacheObjectConfig::DataBlob(entry) => entry.clone(),
_ => panic!("blob_cache: internal error"),
};

if let Err(e) = state.try_add(data_blob) {
// Rollback added bootstrap/data blobs.
let id = BlobCacheObjectId {
domain_id: domain_id.to_string(),
blob_id: id.to_string(),
};
let _ = state.remove(&id);
return Err(e);
}

// Associate the data blob with the bootstrap blob.
bs_obj.add_data_blob(data_blob_config);
}

Ok(())
Expand Down Expand Up @@ -399,6 +459,7 @@ mod tests {
scoped_blob_id: "domain1".to_string(),
path: path.clone(),
factory_config,
data_blobs: Mutex::new(Vec::new()),
};
assert_eq!(blob.path(), &path);
}
Expand Down Expand Up @@ -490,7 +551,7 @@ mod tests {
cache_config: entry.blob_config.cache_config,
metadata_path: Some(path.to_string()),
};
let entry = BlobCacheEntry {
let mut entry = BlobCacheEntry {
blob_type: BLOB_CACHE_TYPE_BOOTSTRAP.to_string(),
blob_id: "image_v2".to_string(),
blob_config,
Expand All @@ -510,7 +571,31 @@ mod tests {
);
assert!(mgr.get_config(&key).is_some());

let state = mgr.get_state();
assert_eq!(state.id_to_config_map.len(), 19);
assert_eq!(mgr.get_state().id_to_config_map.len(), 19);

entry.blob_id = "image_v2_cloned".to_string();
let blob_id_cloned = generate_blob_key(&entry.domain_id, &entry.blob_id);
mgr.add_blob_entry(&entry).unwrap();
assert_eq!(mgr.get_state().id_to_config_map.len(), 20);
assert!(mgr.get_config(&blob_id).is_some());
assert!(mgr.get_config(&blob_id_cloned).is_some());

mgr.remove_blob_entry(&BlobCacheObjectId {
domain_id: entry.domain_id.clone(),
blob_id: "image_v2".to_string(),
})
.unwrap();
assert_eq!(mgr.get_state().id_to_config_map.len(), 19);
assert!(mgr.get_config(&blob_id).is_none());
assert!(mgr.get_config(&blob_id_cloned).is_some());

mgr.remove_blob_entry(&BlobCacheObjectId {
domain_id: entry.domain_id,
blob_id: "image_v2_cloned".to_string(),
})
.unwrap();
assert_eq!(mgr.get_state().id_to_config_map.len(), 0);
assert!(mgr.get_config(&blob_id).is_none());
assert!(mgr.get_config(&blob_id_cloned).is_none());
}
}

0 comments on commit 338d75a

Please sign in to comment.