diff --git a/src/bin/nydus-image/core/blob.rs b/src/bin/nydus-image/core/blob.rs index b1b4a2b07b9..aa4a24acca6 100644 --- a/src/bin/nydus-image/core/blob.rs +++ b/src/bin/nydus-image/core/blob.rs @@ -81,7 +81,7 @@ impl Blob { Ok(blob_exists) } - fn dump_meta_data(&mut self, blob_ctx: &mut BlobContext) -> Result<()> { + pub(crate) fn dump_meta_data(&mut self, blob_ctx: &mut BlobContext) -> Result<()> { if !blob_ctx.blob_meta_info_enabled { return Ok(()); } diff --git a/src/bin/nydus-image/core/blob_compact.rs b/src/bin/nydus-image/core/blob_compact.rs new file mode 100644 index 00000000000..b37dfed965e --- /dev/null +++ b/src/bin/nydus-image/core/blob_compact.rs @@ -0,0 +1,613 @@ +// Copyright 2020 Ant Group. All rights reserved. +// +// SPDX-License-Identifier: Apache-2.0 + +use crate::core::blob::Blob; +use crate::core::bootstrap::Bootstrap; +use crate::core::chunk_dict::{ChunkDict, HashChunkDict}; +use crate::core::context::{ + ArtifactStorage, BlobContext, BlobManager, BootstrapManager, BuildContext, BuildOutput, + RafsVersion, SourceType, +}; +use crate::core::node::{ChunkWrapper, Node, WhiteoutSpec}; +use crate::core::tree::Tree; +use anyhow::Result; +use nydus_utils::digest::RafsDigest; +use nydus_utils::try_round_up_4k; +use rafs::metadata::{RafsMode, RafsSuper}; +use serde::{Deserialize, Serialize}; +use sha2::Digest; +use std::collections::HashMap; +use std::path::PathBuf; +use std::sync::Arc; +use storage::backend::BlobBackend; +use storage::utils::alloc_buf; + +const DEFAULT_COMPACT_BLOB_SIZE: usize = 10 * 1024 * 1024; +const DEFAULT_MAX_COMPACT_SIZE: usize = 100 * 1024 * 1024; + +const fn default_compact_blob_size() -> usize { + DEFAULT_COMPACT_BLOB_SIZE +} + +const fn default_max_compact_size() -> usize { + DEFAULT_MAX_COMPACT_SIZE +} + +#[derive(Clone, Deserialize, Serialize)] +pub struct Config { + /// rebuild blobs whose used_ratio < min_used_ratio + /// used_ratio = (compress_size of all chunks which are referenced by bootstrap) / blob_compress_size + /// available value: 0-99, 0 means disable + /// hint: it's better to disable this option when there are some shared blobs + /// for example: build-cache + #[serde(default)] + min_used_ratio: u8, + /// we compact blobs whose size are less than compact_blob_size + #[serde(default = "default_compact_blob_size")] + compact_blob_size: usize, + /// size of compacted blobs should not be large than max_compact_size + #[serde(default = "default_max_compact_size")] + max_compact_size: usize, + /// if number of blobs >= layers_to_compact, do compact + /// 0 means always try compact + #[serde(default)] + layers_to_compact: usize, + /// local blobs dir, may haven't upload to backend yet + /// what's more, new blobs will output to this dir + /// name of blob file should be equal to blob_id + blobs_dir: String, +} + +#[derive(Debug, Clone, Copy, Hash, PartialEq, Eq)] +enum ChunkKey { + // for v5, v6 may support later + Digest(RafsDigest), + // blob_idx, compress_offset, for v6 only + Offset(u32, u64), +} + +impl ChunkKey { + fn from(c: &ChunkWrapper) -> Self { + let is_v5 = matches!(c, ChunkWrapper::V5(_)); + if is_v5 { + Self::Digest(*c.id()) + } else { + Self::Offset(c.blob_index(), c.compressed_offset()) + } + } +} + +#[derive(Clone, Debug)] +struct ChunkSet { + chunks: HashMap, + version: RafsVersion, + total_size: usize, +} + +impl ChunkSet { + fn new(version: RafsVersion) -> Self { + Self { + chunks: Default::default(), + version, + total_size: 0, + } + } + + fn add_chunk(&mut self, chunk: &ChunkWrapper) { + let key = ChunkKey::from(chunk); + let old = self.chunks.insert(key, chunk.clone()); + self.total_size += chunk.compressed_size() as usize; + if let Some(c) = old { + self.total_size -= c.compressed_size() as usize; + } + } + + fn get_chunk(&self, key: &ChunkKey) -> Option<&ChunkWrapper> { + self.chunks.get(key) + } + + fn merge(&mut self, other: Self) { + for (_, c) in other.chunks.iter() { + self.add_chunk(c); + } + } + + fn dump( + &self, + ori_blob_ids: &[String], + new_blob_ctx: &mut BlobContext, + new_blob_idx: u32, + aligned_chunk: bool, + backend: &Arc, + ) -> Result> { + // sort chunks first, don't break order in original blobs + let mut chunks = self.chunks.values().collect::>(); + chunks.sort_by(|a, b| { + if (*a).blob_index() == (*b).blob_index() { + (*a).compressed_offset().cmp(&(*b).compressed_offset()) + } else { + (*a).blob_index().cmp(&(*b).blob_index()) + } + }); + let mut chunks_change = Vec::new(); + for chunk in chunks { + let blob_idx = chunk.blob_index(); + // get data from backend + // todo: merge download requests + let reader = backend + .get_reader(&ori_blob_ids[blob_idx as usize]) + .expect("get blob err"); + let mut buf = alloc_buf(chunk.compressed_size() as usize); + reader + .read(&mut buf, chunk.compressed_offset()) + .expect("read blob data err"); + if let Some(w) = new_blob_ctx.writer.as_mut() { + w.write_all(&buf)?; + } + + let mut new_chunk = chunk.clone(); + // file offset field is useless + new_chunk.set_index(new_blob_ctx.chunk_count); + new_chunk.set_blob_index(new_blob_idx); + new_chunk.set_compressed_offset(new_blob_ctx.compress_offset); + new_chunk.set_uncompressed_offset(new_blob_ctx.decompress_offset); + new_blob_ctx.add_chunk_meta_info(&new_chunk)?; + // insert change ops + chunks_change.push((chunk.clone(), new_chunk)); + + new_blob_ctx.blob_hash.update(&buf); + new_blob_ctx.chunk_count += 1; + new_blob_ctx.compress_offset += chunk.compressed_size() as u64; + new_blob_ctx.compressed_blob_size += chunk.compressed_size() as u64; + + let aligned_size = if aligned_chunk { + try_round_up_4k(chunk.uncompressed_size()).unwrap() + } else { + chunk.uncompressed_size() as u64 + }; + new_blob_ctx.decompress_offset += aligned_size; + new_blob_ctx.decompressed_blob_size += aligned_size; + } + new_blob_ctx.blob_id = format!("{:x}", new_blob_ctx.blob_hash.clone().finalize()); + // dump blob meta for v6 + Blob::new().dump_meta_data(new_blob_ctx)?; + new_blob_ctx.flush()?; + Ok(chunks_change) + } +} + +#[derive(Clone, Debug)] +enum State { + Original(ChunkSet), + ChunkDict, + /// delete this blob + Delete, + /// output chunks as a new blob file + Rebuild(ChunkSet), +} + +impl State { + fn is_rebuild(&self) -> bool { + matches!(self, Self::Rebuild(_)) + } + + fn merge_blob(&mut self, other: Self) -> Result<()> { + let merge_cs = match other { + State::Original(cs) => cs, + State::Rebuild(cs) => cs, + _ => bail!("invalid state"), + }; + match self { + State::Rebuild(cs) => { + cs.merge(merge_cs); + } + _ => bail!("invalid state"), + } + Ok(()) + } + + fn chunk_total_size(&self) -> Result { + Ok(match self { + State::Original(cs) => cs.total_size, + State::Rebuild(cs) => cs.total_size, + _ => bail!("invalid state"), + }) + } +} + +#[inline] +fn apply_chunk_change(from: &ChunkWrapper, to: &mut ChunkWrapper) -> Result<()> { + ensure!( + to.uncompressed_size() == from.uncompressed_size(), + "different uncompress size" + ); + ensure!( + to.compressed_size() == from.compressed_size(), + "different compressed size" + ); + + to.set_blob_index(from.blob_index()); + to.set_index(from.index()); + to.set_uncompressed_offset(from.uncompressed_offset()); + to.set_compressed_offset(from.compressed_offset()); + Ok(()) +} + +pub struct BlobCompactor { + /// original blobs + ori_blob_mgr: BlobManager, + /// states + states: Vec>, + /// new blobs + new_blob_mgr: BlobManager, + /// inode list + nodes: Vec, + /// chunk --> list + c2nodes: HashMap>, + /// original blob index --> list + b2nodes: HashMap>, + /// v5 or v6 + version: RafsVersion, + /// blobs backend + backend: Arc, +} + +impl BlobCompactor { + pub fn new( + version: RafsVersion, + ori_blob_mgr: BlobManager, + nodes: Vec, + backend: Arc, + ) -> Result { + let ori_blobs_number = ori_blob_mgr.len(); + let mut compactor = Self { + ori_blob_mgr, + new_blob_mgr: BlobManager::new(), + states: vec![None; ori_blobs_number], + nodes, + c2nodes: HashMap::new(), + b2nodes: HashMap::new(), + version, + backend, + }; + compactor.load_chunk_dict_blobs(); + compactor.load_and_dedup_chunks()?; + Ok(compactor) + } + + pub fn is_v6(&self) -> bool { + self.version.is_v6() + } + + fn load_and_dedup_chunks(&mut self) -> Result<()> { + // tmp ChunkSet, for dedup + let mut all_chunks = ChunkSet::new(self.version); + let chunk_dict = self.get_chunk_dict(); + for node_idx in 0..self.nodes.len() { + let node = &mut self.nodes[node_idx]; + for chunk_idx in 0..node.chunks.len() { + let chunk = &mut node.chunks[chunk_idx]; + let chunk_key = ChunkKey::from(chunk); + if !matches!( + self.states[chunk.blob_index() as usize], + Some(State::ChunkDict) + ) { + // dedup by chunk dict + if let Some(c) = chunk_dict.get_chunk(chunk.id()) { + apply_chunk_change(c, chunk)?; + } else { + match all_chunks.get_chunk(&chunk_key) { + Some(c) => { + // do dedup + apply_chunk_change(c, chunk)?; + } + None => { + all_chunks.add_chunk(chunk); + // add to per blob ChunkSet + let blob_index = chunk.blob_index() as usize; + if self.states[blob_index].is_none() { + self.states[blob_index] + .replace(State::Original(ChunkSet::new(self.version))); + } + if let Some(State::Original(cs)) = &mut self.states[blob_index] { + cs.add_chunk(chunk); + } + } + }; + } + } + // construct blobs/chunk --> nodes index map + match self.c2nodes.get_mut(&chunk_key) { + None => { + self.c2nodes.insert(chunk_key, vec![(node_idx, chunk_idx)]); + } + Some(list) => { + list.push((node_idx, chunk_idx)); + } + }; + match self.b2nodes.get_mut(&chunk.blob_index()) { + None => { + self.b2nodes + .insert(chunk.blob_index(), vec![(node_idx, chunk_idx)]); + } + Some(list) => { + list.push((node_idx, chunk_idx)); + } + } + } + } + Ok(()) + } + + fn get_chunk_dict(&self) -> Arc { + self.ori_blob_mgr.get_chunk_dict() + } + + fn load_chunk_dict_blobs(&mut self) { + let chunk_dict = self.get_chunk_dict(); + let blobs = chunk_dict.get_blobs(); + for i in 0..blobs.len() { + let real_blob_idx = chunk_dict.get_real_blob_idx(i as u32) as usize; + self.states[real_blob_idx].replace(State::ChunkDict); + } + } + + fn apply_blob_move(&mut self, from: u32, to: u32) -> Result<()> { + if let Some(idx_list) = self.b2nodes.get(&from) { + for (node_idx, chunk_idx) in idx_list.iter() { + ensure!( + self.nodes[*node_idx].chunks[*chunk_idx].blob_index() == from, + "unexpected blob_index of chunk" + ); + self.nodes[*node_idx].chunks[*chunk_idx].set_blob_index(to); + } + } + Ok(()) + } + + fn apply_chunk_change(&mut self, c: &(ChunkWrapper, ChunkWrapper)) -> Result<()> { + if let Some(idx_list) = self.c2nodes.get(&ChunkKey::from(&c.0)) { + for (node_idx, chunk_idx) in idx_list.iter() { + apply_chunk_change(&c.1, &mut self.nodes[*node_idx].chunks[*chunk_idx])?; + } + } + Ok(()) + } + + fn delete_unused_blobs(&mut self) { + for i in 0..self.states.len() { + if self.states[i].is_none() { + info!( + "delete unused blob {}", + self.ori_blob_mgr.get_blob(i).unwrap().blob_id + ); + self.states[i].replace(State::Delete); + } + } + } + + fn prepare_to_rebuild(&mut self, idx: usize) -> Result<()> { + if self.states[idx].as_ref().unwrap().is_rebuild() { + return Ok(()); + } + if let Some(cs) = self.states[idx].take() { + match cs { + State::Original(cs) => { + self.states[idx].replace(State::Rebuild(cs)); + } + _ => bail!("invalid state"), + } + } + Ok(()) + } + + fn try_rebuild_blobs(&mut self, ratio: u8) { + for idx in 0..self.ori_blob_mgr.len() { + let blob_info = self.ori_blob_mgr.get_blob(idx).unwrap(); + // calculate ratio + let used_ratio = match self.states[idx].as_ref().unwrap() { + State::Original(cs) => { + let compressed_blob_size = if blob_info.compressed_blob_size == 0 { + // get blob compressed size of blob from backend + self.backend + .get_reader(&blob_info.blob_id) + .expect("get blob failed") + .blob_size() + .expect("get blob size failed") + } else { + blob_info.compressed_blob_size + }; + (cs.total_size * 100 / compressed_blob_size as usize) as u8 + } + _ => 100_u8, + }; + info!("blob {} used ratio {}%", blob_info.blob_id, used_ratio); + if used_ratio < ratio { + self.prepare_to_rebuild(idx).unwrap(); + } + } + } + + fn merge_blob(&mut self, from: usize, to: usize) -> Result<()> { + let s = self.states[from].replace(State::Delete).unwrap(); + self.states[to].as_mut().unwrap().merge_blob(s) + } + + /// use greedy algorithm to merge small blobs( Result<()> { + let mut need_merge_blobs = Vec::new(); + for idx in 0..self.states.len() { + let blob_info = self.ori_blob_mgr.get_blob(idx).unwrap(); + match self.states[idx].as_ref().unwrap() { + State::Original(cs) => { + let blob_size = if blob_info.compressed_blob_size == 0 { + cs.total_size + } else { + blob_info.compressed_blob_size as usize + }; + if blob_size < low { + info!("blob {} size {}, try merge", blob_info.blob_id, blob_size); + need_merge_blobs.push((idx, blob_size)); + } + } + State::Rebuild(cs) => { + if cs.total_size < low { + info!( + "blob {} size {}, try merge", + blob_info.blob_id, cs.total_size + ); + need_merge_blobs.push((idx, cs.total_size)); + } + } + _ => {} + } + } + // sort by size + need_merge_blobs.sort_by(|(_, len1), (_, len2)| len1.cmp(len2)); + // try merge + if need_merge_blobs.len() < 2 { + return Ok(()); + } + let mut merge_to = need_merge_blobs[0].0; + for (blob_idx, _) in need_merge_blobs.iter().skip(1) { + let before_size = self.states[merge_to].as_ref().unwrap().chunk_total_size()?; + let append_size = self.states[*blob_idx] + .as_ref() + .unwrap() + .chunk_total_size()?; + if before_size + append_size <= max { + self.prepare_to_rebuild(merge_to)?; + self.merge_blob(*blob_idx, merge_to)?; + } else { + merge_to = *blob_idx; + } + } + Ok(()) + } + + fn original_blob_ids(&self) -> Vec { + self.ori_blob_mgr + .get_blobs() + .into_iter() + .flatten() + .map(|blob| blob.blob_id.clone()) + .collect() + } + + pub fn dump_new_blobs(&mut self, dir: &str, aligned_chunk: bool) -> Result<()> { + let ori_blob_ids = self.original_blob_ids(); + ensure!(self.states.len() == self.ori_blob_mgr.len()); + for idx in 0..self.states.len() { + match self.states[idx].as_ref().unwrap() { + State::Original(_) | State::ChunkDict => { + info!("do nothing to blob {}", ori_blob_ids[idx]); + // already exists, no need to dump + if let Some(ctx) = self.ori_blob_mgr.take_blob(idx) { + let blob_idx = self.new_blob_mgr.alloc_index()?; + if blob_idx != idx as u32 { + self.apply_blob_move(idx as u32, blob_idx)?; + } + self.new_blob_mgr.add(Some(ctx)); + } + } + State::Delete => { + info!("delete blob {}", ori_blob_ids[idx]); + } + State::Rebuild(cs) => { + let blob_storage = ArtifactStorage::FileDir(PathBuf::from(dir)); + let mut blob_ctx = BlobContext::new(String::from(""), Some(blob_storage))?; + blob_ctx.set_meta_info_enabled(self.is_v6()); + let blob_idx = self.new_blob_mgr.alloc_index()?; + let new_chunks = cs.dump( + &ori_blob_ids, + &mut blob_ctx, + blob_idx, + aligned_chunk, + &self.backend, + )?; + for change_chunk in new_chunks.iter() { + self.apply_chunk_change(change_chunk)?; + } + info!("rebuild blob {} successfully", blob_ctx.blob_id); + self.new_blob_mgr.add(Some(blob_ctx)); + } + } + } + Ok(()) + } + + pub fn compact(&mut self, cfg: &Config) -> Result<()> { + self.delete_unused_blobs(); + self.try_rebuild_blobs(cfg.min_used_ratio); + self.try_merge_blobs(cfg.compact_blob_size, cfg.max_compact_size)?; + Ok(()) + } + + pub fn do_compact( + s_boostrap: PathBuf, + d_bootstrap: PathBuf, + chunk_dict: Option>, + backend: Arc, + cfg: &Config, + ) -> Result> { + let rs = + RafsSuper::load_from_metadata(s_boostrap.to_str().unwrap(), RafsMode::Direct, true)?; + info!("load bootstrap {:?} successfully", s_boostrap); + let mut build_ctx = BuildContext::new( + "".to_string(), + false, + rs.meta.get_compressor(), + rs.meta.get_digester(), + rs.meta.explicit_uidgid(), + // useless args + WhiteoutSpec::Oci, + SourceType::Directory, + PathBuf::from(""), + Default::default(), + None, + ); + let mut bootstrap_mgr = + BootstrapManager::new(ArtifactStorage::SingleFile(d_bootstrap), None); + let mut bootstrap_ctx = bootstrap_mgr.create_ctx()?; + let mut ori_blob_mgr = BlobManager::new(); + ori_blob_mgr.from_blob_table(rs.superblock.get_blob_infos()); + if let Some(dict) = chunk_dict { + ori_blob_mgr.set_chunk_dict(dict); + ori_blob_mgr.extend_blob_table_from_chunk_dict()?; + } + if ori_blob_mgr.len() < cfg.layers_to_compact { + return Ok(None); + } + let mut _dict = HashChunkDict::default(); + let mut tree = Tree::from_bootstrap(&rs, &mut _dict)?; + let mut bootstrap = Bootstrap::new()?; + bootstrap.build(&mut build_ctx, &mut bootstrap_ctx, &mut tree)?; + let mut nodes = Vec::new(); + // move out nodes + std::mem::swap(&mut bootstrap_ctx.nodes, &mut nodes); + let mut compactor = Self::new(build_ctx.fs_version, ori_blob_mgr, nodes, backend.clone())?; + compactor.compact(cfg)?; + compactor.dump_new_blobs(&cfg.blobs_dir, build_ctx.aligned_chunk)?; + if compactor.new_blob_mgr.len() == 0 { + info!("blobs of {:?} have already been optimized", s_boostrap); + return Ok(None); + } + info!("compact blob successfully"); + // give back nodes + std::mem::swap(&mut bootstrap_ctx.nodes, &mut compactor.nodes); + // blobs have already been dumped, dump bootstrap only + if compactor.is_v6() { + let blob_table = compactor.new_blob_mgr.to_blob_table_v6(&build_ctx, None)?; + bootstrap.dump_rafsv6(&mut build_ctx, &mut bootstrap_ctx, &blob_table)?; + } else { + let blob_table = compactor.new_blob_mgr.to_blob_table_v5(&build_ctx, None)?; + bootstrap.dump_rafsv5(&mut build_ctx, &mut bootstrap_ctx, &blob_table)?; + } + bootstrap_mgr.add(bootstrap_ctx); + Ok(Some(BuildOutput::new( + &compactor.new_blob_mgr, + &bootstrap_mgr, + )?)) + } +} diff --git a/src/bin/nydus-image/core/context.rs b/src/bin/nydus-image/core/context.rs index 54f75ec7d02..2705e52aa1a 100644 --- a/src/bin/nydus-image/core/context.rs +++ b/src/bin/nydus-image/core/context.rs @@ -439,6 +439,14 @@ impl BlobManager { self.blobs.iter().collect() } + pub fn get_blob(&self, idx: usize) -> Option<&BlobContext> { + self.blobs.get(idx).unwrap_or(&None).as_ref() + } + + pub fn take_blob(&mut self, idx: usize) -> Option { + self.blobs[idx].take() + } + pub fn get_last_blob(&self) -> Option<&BlobContext> { self.blobs.last().unwrap_or(&None).as_ref() } diff --git a/src/bin/nydus-image/core/mod.rs b/src/bin/nydus-image/core/mod.rs index bdf549436f6..fd037c3fd22 100644 --- a/src/bin/nydus-image/core/mod.rs +++ b/src/bin/nydus-image/core/mod.rs @@ -3,6 +3,7 @@ // SPDX-License-Identifier: Apache-2.0 pub(crate) mod blob; +pub(crate) mod blob_compact; pub(crate) mod bootstrap; pub(crate) mod chunk_dict; pub(crate) mod context; diff --git a/src/bin/nydus-image/core/node.rs b/src/bin/nydus-image/core/node.rs index 1bf7dda66dd..acbc4c21eb8 100644 --- a/src/bin/nydus-image/core/node.rs +++ b/src/bin/nydus-image/core/node.rs @@ -1544,6 +1544,13 @@ impl ChunkWrapper { } } + pub fn set_compressed_offset(&mut self, offset: u64) { + match self { + ChunkWrapper::V5(c) => c.compress_offset = offset, + ChunkWrapper::V6(c) => c.compress_offset = offset, + } + } + pub fn compressed_size(&self) -> u32 { match self { ChunkWrapper::V5(c) => c.compress_size, @@ -1558,6 +1565,13 @@ impl ChunkWrapper { } } + pub fn set_uncompressed_offset(&mut self, offset: u64) { + match self { + ChunkWrapper::V5(c) => c.uncompress_offset = offset, + ChunkWrapper::V6(c) => c.uncompress_offset = offset, + } + } + pub fn uncompressed_size(&self) -> u32 { match self { ChunkWrapper::V5(c) => c.uncompress_size, diff --git a/src/bin/nydus-image/main.rs b/src/bin/nydus-image/main.rs index 1c8cbc5f736..9173682d1ac 100644 --- a/src/bin/nydus-image/main.rs +++ b/src/bin/nydus-image/main.rs @@ -15,7 +15,7 @@ extern crate serde_json; #[macro_use] extern crate lazy_static; -use std::fs::{self, metadata, DirEntry, OpenOptions}; +use std::fs::{self, metadata, DirEntry, File, OpenOptions}; use std::path::{Path, PathBuf}; use anyhow::{bail, Context, Result}; @@ -29,6 +29,7 @@ use rafs::RafsIoReader; use storage::{compress, RAFS_DEFAULT_CHUNK_SIZE}; use crate::builder::{Builder, DiffBuilder, DirectoryBuilder, StargzBuilder}; +use crate::core::blob_compact::BlobCompactor; use crate::core::chunk_dict::import_chunk_dict; use crate::core::context::{ ArtifactStorage, BlobManager, BootstrapManager, BuildContext, BuildOutput, BuildOutputBlob, @@ -39,6 +40,7 @@ use crate::core::prefetch::Prefetch; use crate::core::tree; use crate::trace::{EventTracerClass, TimingTracerClass, TraceClass}; use crate::validator::Validator; +use storage::factory::{BackendConfig, BlobFactory}; #[macro_use] mod trace; @@ -198,8 +200,7 @@ fn main() -> Result<()> { .required_unless("source-type") .required_unless("blob-dir") .takes_value(true) - ) - .arg( + ).arg( Arg::with_name("blob-id") .long("blob-id") .help("blob id (as object id in backend/oss)") @@ -410,6 +411,60 @@ fn main() -> Result<()> { .takes_value(true) ) ) + .subcommand( + SubCommand::with_name("compact") + .about("(experimental)Compact specific nydus image, remove unused chunks in blobs, merge small blobs") + .arg( + Arg::with_name("bootstrap") + .long("bootstrap") + .short("B") + .help("bootstrap to compact") + .required(true) + .takes_value(true), + ) + .arg( + Arg::with_name("config") + .long("config") + .short("C") + .help("config to compactor") + .required(true) + .takes_value(true), + ) + .arg( + Arg::with_name("backend-type") + .long("backend-type") + .help("type of backend") + .required(true) + .takes_value(true), + ) + .arg( + Arg::with_name("backend-config-file") + .long("backend-config-file") + .help("config file of backend") + .required(true) + .takes_value(true), + ) + .arg( + Arg::with_name("chunk-dict") + .long("chunk-dict") + .short("M") + .help("Specify a chunk dictionary for chunk deduplication") + .takes_value(true), + ) + .arg( + Arg::with_name("output-bootstrap") + .long("output-bootstrap") + .short("O") + .help("bootstrap to output, default is source bootstrap add suffix .compact") + .takes_value(true), + ) + .arg( + Arg::with_name("output-json") + .long("output-json") + .short("J") + .help("path to JSON output file") + .takes_value(true)) + ) .arg( Arg::with_name("log-level") .long("log-level") @@ -438,6 +493,8 @@ fn main() -> Result<()> { Command::inspect(matches) } else if let Some(matches) = cmd.subcommand_matches("stat") { Command::stat(matches) + } else if let Some(matches) = cmd.subcommand_matches("compact") { + Command::compact(matches, &build_info) } else { println!("{}", cmd.usage()); Ok(()) @@ -574,6 +631,37 @@ impl Command { Ok(()) } + fn compact(matches: &clap::ArgMatches, build_info: &BuildTimeInfo) -> Result<()> { + let bootstrap_path = PathBuf::from(Self::get_bootstrap(matches)?); + let dst_bootstrap = match matches.value_of("output-bootstrap") { + None => bootstrap_path.with_extension("bootstrap.compact"), + Some(s) => PathBuf::from(s), + }; + + let chunk_dict = match matches.value_of("chunk-dict") { + None => None, + Some(args) => Some(import_chunk_dict(args)?), + }; + + let backend_type = matches.value_of("backend-type").unwrap(); + let backend_file = matches.value_of("backend-config-file").unwrap(); + let backend_config = BackendConfig::from_file(backend_type, backend_file)?; + let backend = BlobFactory::new_backend(backend_config, "compactor")?; + + let config_file_path = matches.value_of("config").unwrap(); + let file = File::open(config_file_path) + .with_context(|| format!("failed to open config file {}", config_file_path))?; + let config = serde_json::from_reader(file) + .with_context(|| format!("invalid config file {}", config_file_path))?; + + if let Some(build_output) = + BlobCompactor::do_compact(bootstrap_path, dst_bootstrap, chunk_dict, backend, &config)? + { + OutputSerializer::dump(matches, &build_output, &build_info)?; + } + Ok(()) + } + fn check(matches: &clap::ArgMatches, build_info: &BuildTimeInfo) -> Result<()> { let bootstrap_path = Self::get_bootstrap(matches)?; let verbose = matches.is_present("verbose"); diff --git a/storage/src/backend/oss.rs b/storage/src/backend/oss.rs index ecf5ff75bf9..ecb0befcaae 100644 --- a/storage/src/backend/oss.rs +++ b/storage/src/backend/oss.rs @@ -196,7 +196,6 @@ impl BlobReader for OssReader { .connection .call::<&[u8]>(Method::GET, url.as_str(), None, None, headers, true) .map_err(OssError::Request)?; - Ok(resp .copy_to(&mut buf) .map_err(OssError::Transport) diff --git a/storage/src/factory.rs b/storage/src/factory.rs index 9b82b4d373b..c82c742436d 100644 --- a/storage/src/factory.rs +++ b/storage/src/factory.rs @@ -170,7 +170,7 @@ impl BlobFactory { } /// Create a storage backend for the blob with id `blob_id`. - fn new_backend( + pub fn new_backend( config: BackendConfig, blob_id: &str, ) -> IOResult> {