From 1a0660fa38125a0aca0395c58845031c84b85f65 Mon Sep 17 00:00:00 2001 From: zyfjeff Date: Wed, 30 Aug 2023 20:52:31 +0800 Subject: [PATCH] Add --blob-cache-dir arg use to generate raw blob cache and meta MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit generate blob cache and blob meta through the —-blob-cache-dir parameters, so that nydusd can be started directly from these two files without going to the backend to download. this can improve the performance of data loading in localfs mode. Signed-off-by: zyfjeff --- builder/src/core/blob.rs | 19 ++++-- builder/src/core/context.rs | 48 +++++++++++++- builder/src/core/node.rs | 15 ++++- builder/src/lib.rs | 9 ++- smoke/tests/blobcache_test.go | 120 ++++++++++++++++++++++++++++++++++ smoke/tests/tool/layer.go | 6 +- src/bin/nydus-image/main.rs | 46 ++++++++++++- 7 files changed, 248 insertions(+), 15 deletions(-) create mode 100644 smoke/tests/blobcache_test.go diff --git a/builder/src/core/blob.rs b/builder/src/core/blob.rs index a58c1b35a5f..cefe274f8af 100644 --- a/builder/src/core/blob.rs +++ b/builder/src/core/blob.rs @@ -3,15 +3,16 @@ // SPDX-License-Identifier: Apache-2.0 use std::borrow::Cow; -use std::io::Write; +use std::io::{Seek, Write}; +use std::mem::size_of; use std::slice; use anyhow::{Context, Result}; use nydus_rafs::metadata::RAFS_MAX_CHUNK_SIZE; use nydus_storage::device::BlobFeatures; -use nydus_storage::meta::{toc, BlobMetaChunkArray}; +use nydus_storage::meta::{toc, BlobCompressionContextHeader, BlobMetaChunkArray}; use nydus_utils::digest::{self, DigestHasher, RafsDigest}; -use nydus_utils::{compress, crypt}; +use nydus_utils::{compress, crypt, try_round_up_4k}; use sha2::digest::Digest; use super::layout::BlobLayout; @@ -194,7 +195,6 @@ impl Blob { } else if ctx.blob_tar_reader.is_some() { header.set_separate_blob(true); }; - let mut compressor = Self::get_compression_algorithm_for_meta(ctx); let (compressed_data, compressed) = compress::compress(ci_data, compressor) .with_context(|| "failed to compress blob chunk info array".to_string())?; @@ -223,6 +223,17 @@ impl Blob { } blob_ctx.blob_meta_header = header; + if let Some(blob_cache) = ctx.blob_cache_generator.as_ref() { + let mut guard = blob_cache.lock().unwrap(); + let meta_write = guard.get_blob_meta_writer_mut(); + let aligned_uncompressed_size = try_round_up_4k(uncompressed_size as u64).unwrap(); + meta_write.set_len( + aligned_uncompressed_size + size_of::() as u64, + )?; + meta_write.write_all(ci_data)?; + meta_write.seek(std::io::SeekFrom::Start(aligned_uncompressed_size))?; + meta_write.write_all(header.as_bytes())?; + } let encrypted_header = crypt::encrypt_with_context(header.as_bytes(), cipher_obj, cipher_ctx, encrypt)?; let header_size = encrypted_header.len(); diff --git a/builder/src/core/context.rs b/builder/src/core/context.rs index 36f48a3c325..da4e8d0e278 100644 --- a/builder/src/core/context.rs +++ b/builder/src/core/context.rs @@ -193,7 +193,13 @@ impl Write for ArtifactMemoryWriter { } } -struct ArtifactFileWriter(ArtifactWriter); +pub struct ArtifactFileWriter(pub ArtifactWriter); + +impl ArtifactFileWriter { + pub fn finalize(&mut self, name: Option) -> Result<()> { + self.0.finalize(name) + } +} impl RafsIoWrite for ArtifactFileWriter { fn as_any(&self) -> &dyn Any { @@ -215,6 +221,12 @@ impl RafsIoWrite for ArtifactFileWriter { } } +impl ArtifactFileWriter { + pub fn set_len(&mut self, s: u64) -> std::io::Result<()> { + self.0.file.get_mut().set_len(s) + } +} + impl Seek for ArtifactFileWriter { fn seek(&mut self, pos: std::io::SeekFrom) -> std::io::Result { self.0.file.seek(pos) @@ -367,6 +379,35 @@ impl ArtifactWriter { } } +pub struct BlobCacheGenerator { + blob_data: ArtifactFileWriter, + blob_meta: ArtifactFileWriter, +} + +impl BlobCacheGenerator { + pub fn new(storage: ArtifactStorage) -> Result { + Ok(BlobCacheGenerator { + blob_data: ArtifactFileWriter(ArtifactWriter::new(storage.clone())?), + blob_meta: ArtifactFileWriter(ArtifactWriter::new(storage)?), + }) + } + + pub fn get_blob_data_writer_mut(&mut self) -> &mut ArtifactFileWriter { + &mut self.blob_data + } + + pub fn get_blob_meta_writer_mut(&mut self) -> &mut ArtifactFileWriter { + &mut self.blob_meta + } + + pub fn finalize(&mut self, name: &str) -> Result<()> { + let blob_data_name = format!("{}.blob.data", name); + self.blob_data.finalize(Some(blob_data_name))?; + let blob_meta_name = format!("{}.blob.meta", name); + self.blob_meta.finalize(Some(blob_meta_name)) + } +} + /// BlobContext is used to hold the blob information of a layer during build. pub struct BlobContext { /// Blob id (user specified or sha256(blob)). @@ -1182,6 +1223,8 @@ pub struct BuildContext { pub features: Features, pub configuration: Arc, + /// Generate the blob cache and blob meta + pub blob_cache_generator: Option>, } impl BuildContext { @@ -1221,7 +1264,6 @@ impl BuildContext { } else { crypt::Algorithm::None }; - BuildContext { blob_id, aligned_chunk, @@ -1250,6 +1292,7 @@ impl BuildContext { features, configuration: Arc::new(ConfigV2::default()), + blob_cache_generator: None, } } @@ -1299,6 +1342,7 @@ impl Default for BuildContext { blob_inline_meta: false, features: Features::new(), configuration: Arc::new(ConfigV2::default()), + blob_cache_generator: None, } } } diff --git a/builder/src/core/node.rs b/builder/src/core/node.rs index 5357d639a28..97eb8d821de 100644 --- a/builder/src/core/node.rs +++ b/builder/src/core/node.rs @@ -6,7 +6,7 @@ use std::ffi::{OsStr, OsString}; use std::fmt::{self, Display, Formatter, Result as FmtResult}; use std::fs::{self, File}; -use std::io::{Read, Write}; +use std::io::{Read, Seek, Write}; use std::ops::Deref; #[cfg(target_os = "linux")] use std::os::linux::fs::MetadataExt; @@ -462,6 +462,19 @@ impl Node { chunk.set_compressed(is_compressed); } + if let Some(blob_cache) = ctx.blob_cache_generator.as_ref() { + let mut guard = blob_cache.lock().unwrap(); + let write = guard.get_blob_data_writer_mut(); + let curr_pos = write.seek(std::io::SeekFrom::End(0))?; + if curr_pos < chunk.uncompressed_offset() + aligned_d_size as u64 { + write.set_len(chunk.uncompressed_offset() + aligned_d_size as u64)?; + } + + write.seek(std::io::SeekFrom::Start(chunk.uncompressed_offset()))?; + write + .write_all(&chunk_data) + .context("failed to write blob cache")?; + } event_tracer!("blob_uncompressed_size", +d_size); Ok(chunk_info) diff --git a/builder/src/lib.rs b/builder/src/lib.rs index d2409a16bc0..e65b7a99ac6 100644 --- a/builder/src/lib.rs +++ b/builder/src/lib.rs @@ -26,8 +26,8 @@ pub use self::compact::BlobCompactor; pub use self::core::bootstrap::Bootstrap; pub use self::core::chunk_dict::{parse_chunk_dict_arg, ChunkDict, HashChunkDict}; pub use self::core::context::{ - ArtifactStorage, ArtifactWriter, BlobContext, BlobManager, BootstrapContext, BootstrapManager, - BuildContext, BuildOutput, ConversionType, + ArtifactFileWriter, ArtifactStorage, ArtifactWriter, BlobCacheGenerator, BlobContext, + BlobManager, BootstrapContext, BootstrapManager, BuildContext, BuildOutput, ConversionType, }; pub use self::core::feature::{Feature, Features}; pub use self::core::node::{ChunkSource, NodeChunk}; @@ -237,6 +237,11 @@ fn finalize_blob( // blob file. if !is_tarfs { blob_writer.finalize(Some(blob_meta_id))?; + if let Some(blob_cache) = ctx.blob_cache_generator.as_ref() { + let mut guard = blob_cache.lock().unwrap(); + guard.finalize(&blob_ctx.blob_id)?; + drop(guard); + } } } diff --git a/smoke/tests/blobcache_test.go b/smoke/tests/blobcache_test.go new file mode 100644 index 00000000000..1aef9c6e097 --- /dev/null +++ b/smoke/tests/blobcache_test.go @@ -0,0 +1,120 @@ +package tests + +import ( + "fmt" + "io" + "io/fs" + "io/ioutil" + "os" + "path/filepath" + "testing" + + "github.com/containerd/containerd/log" + "github.com/dragonflyoss/image-service/smoke/tests/texture" + "github.com/dragonflyoss/image-service/smoke/tests/tool" + "github.com/dragonflyoss/image-service/smoke/tests/tool/test" + "github.com/opencontainers/go-digest" + "github.com/stretchr/testify/require" +) + +type BlobCacheTestSuite struct { + T *testing.T +} + +func (a *BlobCacheTestSuite) compareTwoFiles(t *testing.T, left, right string) { + + lf, err := os.Open(left) + require.NoError(t, err) + defer lf.Close() + leftDigester, err := digest.FromReader(lf) + require.NoError(t, err) + + + rf, err := os.Open(right) + require.NoError(t, err) + defer rf.Close() + rightDigester, err := digest.FromReader(rf) + require.NoError(t, err) + + require.Equal(t, leftDigester, rightDigester) +} + +func (a *BlobCacheTestSuite) TestGenerateBlobcache(t *testing.T) { + + ctx := tool.DefaultContext(t) + + ctx.PrepareWorkDir(t) + defer ctx.Destroy(t) + + rootFs := texture.MakeLowerLayer(t, filepath.Join(ctx.Env.WorkDir, "root-fs")) + + rootfsReader := rootFs.ToOCITar(t); + + ociBlobDigester := digest.Canonical.Digester() + ociBlob, err := ioutil.TempFile(ctx.Env.BlobDir, "oci-blob-") + require.NoError(t, err) + + _, err = io.Copy(io.MultiWriter(ociBlobDigester.Hash(), ociBlob), rootfsReader) + require.NoError(t, err) + + ociBlobDigest := ociBlobDigester.Digest() + err = os.Rename(ociBlob.Name(), filepath.Join(ctx.Env.BlobDir, ociBlobDigest.Hex())) + require.NoError(t, err) + + // use to generate blob.data and blob.meta + blobcacheDir := filepath.Join(ctx.Env.WorkDir, "blobcache") + err = os.MkdirAll(blobcacheDir, 0755) + require.NoError(t, err) + + ctx.Env.BootstrapPath = filepath.Join(ctx.Env.WorkDir, "bootstrap") + + + tool.Run(t, fmt.Sprintf("%s create -t targz-ref --bootstrap %s --blob-dir %s --blob-cache-dir %s %s", + ctx.Binary.Builder, ctx.Env.BootstrapPath, ctx.Env.BlobDir, blobcacheDir, + filepath.Join(ctx.Env.BlobDir, ociBlobDigest.Hex()))); + + nydusd, err := tool.NewNydusd(tool.NydusdConfig{ + NydusdPath: ctx.Binary.Nydusd, + BootstrapPath: ctx.Env.BootstrapPath, + ConfigPath: filepath.Join(ctx.Env.WorkDir, "nydusd-config.fusedev.json"), + MountPath: ctx.Env.MountDir, + APISockPath: filepath.Join(ctx.Env.WorkDir, "nydusd-api.sock"), + BackendType: "localfs", + BackendConfig: fmt.Sprintf(`{"dir": "%s"}`, ctx.Env.BlobDir), + EnablePrefetch: ctx.Runtime.EnablePrefetch, + BlobCacheDir: ctx.Env.CacheDir, + CacheType: ctx.Runtime.CacheType, + CacheCompressed: ctx.Runtime.CacheCompressed, + RafsMode: ctx.Runtime.RafsMode, + DigestValidate: false, + }) + require.NoError(t, err) + + err = nydusd.Mount() + require.NoError(t, err) + defer func() { + if err := nydusd.Umount(); err != nil { + log.L.WithError(err).Errorf("umount") + } + }() + + // make sure blobcache ready + err = filepath.WalkDir(ctx.Env.MountDir, func(path string, entry fs.DirEntry, err error) error { + require.Nil(t, err) + if entry.Type().IsRegular() { + targetPath, err := filepath.Rel(ctx.Env.MountDir, path) + require.NoError(t, err) + _, _ = os.ReadFile(targetPath) + } + return nil + }) + require.NoError(t, err) + + a.compareTwoFiles(t, filepath.Join(blobcacheDir,fmt.Sprintf("%s.blob.data", ociBlobDigest.Hex())), filepath.Join(ctx.Env.CacheDir, fmt.Sprintf("%s.blob.data", ociBlobDigest.Hex()))) + a.compareTwoFiles(t, filepath.Join(blobcacheDir, fmt.Sprintf("%s.blob.meta", ociBlobDigest.Hex())), filepath.Join(ctx.Env.CacheDir, fmt.Sprintf("%s.blob.meta", ociBlobDigest.Hex()))) +} + + +func TestBlobCache(t *testing.T) { + test.Run(t, &BlobCacheTestSuite{T: t}) +} \ No newline at end of file diff --git a/smoke/tests/tool/layer.go b/smoke/tests/tool/layer.go index 7902d5d38ae..452c6584f65 100644 --- a/smoke/tests/tool/layer.go +++ b/smoke/tests/tool/layer.go @@ -116,7 +116,7 @@ func (l *Layer) TargetPath(t *testing.T, path string) string { func (l *Layer) Pack(t *testing.T, packOption converter.PackOption, blobDir string) digest.Digest { // Output OCI tar stream - ociTar := l.toOCITar(t) + ociTar := l.ToOCITar(t) defer ociTar.Close() l.recordFileTree(t) @@ -141,7 +141,7 @@ func (l *Layer) Pack(t *testing.T, packOption converter.PackOption, blobDir stri func (l *Layer) PackRef(t *testing.T, ctx Context, blobDir string, compress bool) (digest.Digest, digest.Digest) { // Output OCI tar stream - ociTar := l.toOCITar(t) + ociTar := l.ToOCITar(t) defer ociTar.Close() l.recordFileTree(t) @@ -238,7 +238,7 @@ func (l *Layer) recordFileTree(t *testing.T) { }) } -func (l *Layer) toOCITar(t *testing.T) io.ReadCloser { +func (l *Layer) ToOCITar(t *testing.T) io.ReadCloser { return archive.Diff(context.Background(), "", l.workDir) } diff --git a/src/bin/nydus-image/main.rs b/src/bin/nydus-image/main.rs index 1767bd736e3..2a73474c002 100644 --- a/src/bin/nydus-image/main.rs +++ b/src/bin/nydus-image/main.rs @@ -27,9 +27,10 @@ use nix::unistd::{getegid, geteuid}; use nydus::{get_build_time_info, setup_logging}; use nydus_api::{BuildTimeInfo, ConfigV2, LocalFsConfig}; use nydus_builder::{ - parse_chunk_dict_arg, ArtifactStorage, BlobCompactor, BlobManager, BootstrapManager, - BuildContext, BuildOutput, Builder, ConversionType, DirectoryBuilder, Feature, Features, - HashChunkDict, Merger, Prefetch, PrefetchPolicy, StargzBuilder, TarballBuilder, WhiteoutSpec, + parse_chunk_dict_arg, ArtifactStorage, BlobCacheGenerator, BlobCompactor, BlobManager, + BootstrapManager, BuildContext, BuildOutput, Builder, ConversionType, DirectoryBuilder, + Feature, Features, HashChunkDict, Merger, Prefetch, PrefetchPolicy, StargzBuilder, + TarballBuilder, WhiteoutSpec, }; use nydus_rafs::metadata::{RafsSuper, RafsSuperConfig, RafsVersion}; use nydus_storage::backend::localfs::LocalFs; @@ -356,6 +357,13 @@ fn prepare_cmd_args(bti_string: &'static str) -> App { .action(ArgAction::SetTrue) .required(false) ) + .arg( + Arg::new("blob-cache-dir") + .long("blob-cache-dir") + .help("Directory path to save generated blob cache files(blob meta and blob data)") + .value_parser(clap::value_parser!(PathBuf)) + .required(false) + ) ); let app = app.subcommand( @@ -801,6 +809,7 @@ impl Command { let version = Self::get_fs_version(matches)?; let chunk_size = Self::get_chunk_size(matches, conversion_type)?; let batch_size = Self::get_batch_size(matches, version, conversion_type, chunk_size)?; + let blob_cache_storage = Self::get_blob_cache_storage(matches, conversion_type)?; let aligned_chunk = if version.is_v6() && conversion_type != ConversionType::TarToTarfs { true } else { @@ -1041,6 +1050,12 @@ impl Command { build_ctx.set_chunk_size(chunk_size); build_ctx.set_batch_size(batch_size); + let blob_cache_generator = match blob_cache_storage { + Some(storage) => Some(Mutex::new(BlobCacheGenerator::new(storage)?)), + None => None, + }; + build_ctx.blob_cache_generator = blob_cache_generator; + let mut config = Self::get_configuration(matches)?; if let Some(cache) = Arc::get_mut(&mut config).unwrap().cache.as_mut() { cache.cache_validate = true; @@ -1479,6 +1494,31 @@ impl Command { } } + fn get_blob_cache_storage( + matches: &ArgMatches, + conversion_type: ConversionType, + ) -> Result> { + if let Some(p) = matches.get_one::("blob-cache-dir") { + if conversion_type == ConversionType::TarToTarfs + || conversion_type == ConversionType::EStargzIndexToRef + || conversion_type == ConversionType::EStargzToRafs + || conversion_type == ConversionType::EStargzToRef + { + bail!( + "conversion type `{}` conflicts with `--blob-cache-dir`", + conversion_type + ); + } + + if !p.exists() { + bail!("directory to store blob cache does not exist") + } + Ok(Some(ArtifactStorage::FileDir(p.to_owned()))) + } else { + Ok(None) + } + } + // Must specify a path to blob file. // For cli/binary interface compatibility sake, keep option `backend-config`, but // it only receives "localfs" backend type and it will be REMOVED in the future